Skip to content

Commit bbbaee6

Browse files
Serialize Monitoring Bulk Request Compressed (#56410)
Even with changes from #48854 we're still seeing significant (as in tens and hundreds of MB) buffer usage for bulk exports in some cases which destabilizes master nodes. Since we need to know the serialized length of the bulk body we can't do the serialization in a streaming manner. (also it's not easily doable with the HTTP client API we're using anyway). => let's at least serialize on heap in compressed form and decompress as we're streaming to the HTTP connection. For small requests this adds negligible overhead but for large requests this reduces the size of the payload field by about an order of magnitude (empirically determined) which is a massive reduction in size when considering O(100MB) bulk requests.
1 parent b4c87f8 commit bbbaee6

File tree

1 file changed

+55
-8
lines changed
  • x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http

1 file changed

+55
-8
lines changed

x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java

+55-8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.client.ResponseListener;
1616
import org.elasticsearch.client.RestClient;
1717
import org.elasticsearch.common.bytes.BytesReference;
18+
import org.elasticsearch.common.compress.CompressorFactory;
1819
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1920
import org.elasticsearch.common.io.stream.StreamOutput;
2021
import org.elasticsearch.common.time.DateFormatter;
@@ -28,7 +29,9 @@
2829
import org.elasticsearch.xpack.monitoring.exporter.ExportBulk;
2930
import org.elasticsearch.xpack.monitoring.exporter.ExportException;
3031

32+
import java.io.FilterOutputStream;
3133
import java.io.IOException;
34+
import java.io.OutputStream;
3235
import java.time.format.DateTimeFormatter;
3336
import java.util.Collection;
3437
import java.util.Map;
@@ -56,10 +59,15 @@ class HttpExportBulk extends ExportBulk {
5659
private final DateFormatter formatter;
5760

5861
/**
59-
* The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
62+
* The compressed bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
6063
*/
6164
private BytesReference payload = null;
6265

66+
/**
67+
* Uncompressed length of {@link #payload} contents.
68+
*/
69+
private long payloadLength = -1L;
70+
6371
HttpExportBulk(final String name, final RestClient client, final Map<String, String> parameters,
6472
final DateFormatter dateTimeFormatter, final ThreadContext threadContext) {
6573
super(name, threadContext);
@@ -73,14 +81,17 @@ class HttpExportBulk extends ExportBulk {
7381
public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
7482
try {
7583
if (docs != null && docs.isEmpty() == false) {
76-
try (BytesStreamOutput payload = new BytesStreamOutput()) {
84+
final BytesStreamOutput scratch = new BytesStreamOutput();
85+
final CountingOutputStream countingStream;
86+
try (StreamOutput payload = CompressorFactory.COMPRESSOR.streamOutput(scratch)) {
87+
countingStream = new CountingOutputStream(payload);
7788
for (MonitoringDoc monitoringDoc : docs) {
78-
writeDocument(monitoringDoc, payload);
89+
writeDocument(monitoringDoc, countingStream);
7990
}
80-
81-
// store the payload until we flush
82-
this.payload = payload.bytes();
8391
}
92+
payloadLength = countingStream.bytesWritten;
93+
// store the payload until we flush
94+
this.payload = scratch.bytes();
8495
}
8596
} catch (Exception e) {
8697
throw new ExportException("failed to add documents to export bulk [{}]", e, name);
@@ -97,7 +108,8 @@ public void doFlush(ActionListener<Void> listener) throws ExportException {
97108
request.addParameter(param.getKey(), param.getValue());
98109
}
99110
try {
100-
request.setEntity(new InputStreamEntity(payload.streamInput(), payload.length(), ContentType.APPLICATION_JSON));
111+
request.setEntity(new InputStreamEntity(
112+
CompressorFactory.COMPRESSOR.streamInput(payload.streamInput()), payloadLength, ContentType.APPLICATION_JSON));
101113
} catch (IOException e) {
102114
listener.onFailure(e);
103115
return;
@@ -127,7 +139,7 @@ public void onFailure(Exception exception) {
127139
}
128140
}
129141

130-
private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOException {
142+
private void writeDocument(MonitoringDoc doc, OutputStream out) throws IOException {
131143
final XContentType xContentType = XContentType.JSON;
132144
final XContent xContent = xContentType.xContent();
133145

@@ -166,4 +178,39 @@ private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOExcepti
166178
name, index, id, doc.getType()
167179
);
168180
}
181+
182+
// Counting input stream used to record the uncompressed size of the bulk payload when writing it to a compressed stream
183+
private static final class CountingOutputStream extends FilterOutputStream {
184+
private long bytesWritten = 0;
185+
186+
CountingOutputStream(final OutputStream out) {
187+
super(out);
188+
}
189+
190+
@Override
191+
public void write(final int b) throws IOException {
192+
out.write(b);
193+
count(1);
194+
}
195+
@Override
196+
public void write(final byte[] b) throws IOException {
197+
write(b, 0, b.length);
198+
}
199+
@Override
200+
public void write(final byte[] b, final int off, final int len) throws IOException {
201+
out.write(b, off, len);
202+
count(len);
203+
}
204+
205+
@Override
206+
public void close() {
207+
// don't close nested stream
208+
}
209+
210+
protected void count(final long written) {
211+
if (written != -1) {
212+
bytesWritten += written;
213+
}
214+
}
215+
}
169216
}

0 commit comments

Comments
 (0)