Skip to content

Commit 6b7fea0

Browse files
authored
Write async response directly to XContent to reduce memory usage (#73707)
This change tries to write an async response directly to XContent in Base64 to avoid using multiple buffers. Relates to #67594
1 parent 0e5c3fc commit 6b7fea0

File tree

2 files changed

+87
-31
lines changed

2 files changed

+87
-31
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,17 @@
2121
import org.elasticsearch.cluster.metadata.IndexMetadata;
2222
import org.elasticsearch.cluster.service.ClusterService;
2323
import org.elasticsearch.common.TriFunction;
24-
import org.elasticsearch.common.bytes.BytesReference;
2524
import org.elasticsearch.common.collect.Tuple;
2625
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
27-
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2826
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
2927
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
28+
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
3029
import org.elasticsearch.common.io.stream.StreamInput;
3130
import org.elasticsearch.common.io.stream.Writeable;
3231
import org.elasticsearch.common.settings.Settings;
3332
import org.elasticsearch.common.util.concurrent.ThreadContext;
3433
import org.elasticsearch.common.xcontent.XContentBuilder;
34+
import org.elasticsearch.common.xcontent.XContentFactory;
3535
import org.elasticsearch.common.xcontent.XContentType;
3636
import org.elasticsearch.indices.SystemIndexDescriptor;
3737
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
@@ -45,11 +45,11 @@
4545
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;
4646

4747
import java.io.IOException;
48+
import java.io.OutputStream;
4849
import java.io.UncheckedIOException;
4950
import java.nio.ByteBuffer;
5051
import java.util.Base64;
5152
import java.util.Collections;
52-
import java.util.HashMap;
5353
import java.util.List;
5454
import java.util.Map;
5555
import java.util.function.Function;
@@ -181,15 +181,23 @@ public void createResponse(String docId,
181181
Map<String, String> headers,
182182
R response,
183183
ActionListener<IndexResponse> listener) throws IOException {
184-
Map<String, Object> source = new HashMap<>();
185-
source.put(HEADERS_FIELD, headers);
186-
source.put(EXPIRATION_TIME_FIELD, response.getExpirationTime());
187-
source.put(RESULT_FIELD, encodeResponse(response));
188-
IndexRequest indexRequest = new IndexRequest(index)
189-
.create(true)
190-
.id(docId)
191-
.source(source, XContentType.JSON);
192-
clientWithOrigin.index(indexRequest, listener);
184+
try {
185+
// TODO: Integrate with circuit breaker
186+
final XContentBuilder source = XContentFactory.jsonBuilder()
187+
.startObject()
188+
.field(HEADERS_FIELD, headers)
189+
.field(EXPIRATION_TIME_FIELD, response.getExpirationTime())
190+
.directFieldAsBase64(RESULT_FIELD, os -> writeResponse(response, os))
191+
.endObject();
192+
193+
final IndexRequest indexRequest = new IndexRequest(index)
194+
.create(true)
195+
.id(docId)
196+
.source(source);
197+
clientWithOrigin.index(indexRequest, listener);
198+
} catch (Exception e) {
199+
listener.onFailure(e);
200+
}
193201
}
194202

195203
/**
@@ -200,16 +208,19 @@ public void updateResponse(String docId,
200208
R response,
201209
ActionListener<UpdateResponse> listener) {
202210
try {
203-
Map<String, Object> source = new HashMap<>();
204-
source.put(RESPONSE_HEADERS_FIELD, responseHeaders);
205-
source.put(RESULT_FIELD, encodeResponse(response));
211+
// TODO: Integrate with circuit breaker
212+
final XContentBuilder source = XContentFactory.jsonBuilder()
213+
.startObject()
214+
.field(RESPONSE_HEADERS_FIELD, responseHeaders)
215+
.directFieldAsBase64(RESULT_FIELD, os -> writeResponse(response, os))
216+
.endObject();
206217
UpdateRequest request = new UpdateRequest()
207218
.index(index)
208219
.id(docId)
209-
.doc(source, XContentType.JSON)
220+
.doc(source)
210221
.retryOnConflict(5);
211222
clientWithOrigin.update(request, listener);
212-
} catch(Exception e) {
223+
} catch (Exception e) {
213224
listener.onFailure(e);
214225
}
215226
}
@@ -452,21 +463,17 @@ boolean ensureAuthenticatedUserIsSame(Map<String, String> originHeaders, Authent
452463
return origin.canAccessResourcesOf(current);
453464
}
454465

455-
/**
456-
* Encode the provided response in a binary form using base64 encoding.
457-
*/
458-
String encodeResponse(R response) throws IOException {
459-
try (BytesStreamOutput out = new BytesStreamOutput()) {
460-
Version.writeVersion(Version.CURRENT, out);
461-
response.writeTo(out);
462-
return Base64.getEncoder().encodeToString(BytesReference.toBytes(out.bytes()));
463-
}
466+
private void writeResponse(R response, OutputStream os) throws IOException {
467+
final OutputStreamStreamOutput out = new OutputStreamStreamOutput(os);
468+
Version.writeVersion(Version.CURRENT, out);
469+
response.writeTo(out);
464470
}
465471

466472
/**
467473
* Decode the provided base-64 bytes into a {@link AsyncSearchResponse}.
468474
*/
469475
R decodeResponse(String value) throws IOException {
476+
// TODO: Integrate with the circuit breaker
470477
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
471478
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
472479
in.setVersion(Version.readVersion(in));

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,20 @@
66
*/
77
package org.elasticsearch.xpack.core.async;
88

9+
import org.elasticsearch.action.DocWriteResponse;
10+
import org.elasticsearch.action.index.IndexResponse;
11+
import org.elasticsearch.action.support.PlainActionFuture;
12+
import org.elasticsearch.action.update.UpdateResponse;
913
import org.elasticsearch.cluster.service.ClusterService;
1014
import org.elasticsearch.common.io.stream.StreamInput;
1115
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.tasks.TaskId;
1217
import org.elasticsearch.test.ESSingleNodeTestCase;
1318
import org.elasticsearch.transport.TransportService;
1419
import org.junit.Before;
1520

1621
import java.io.IOException;
22+
import java.util.Map;
1723
import java.util.Objects;
1824

1925
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
@@ -66,6 +72,14 @@ public boolean equals(Object o) {
6672
public int hashCode() {
6773
return Objects.hash(test, expirationTimeMillis);
6874
}
75+
76+
@Override
77+
public String toString() {
78+
return "TestAsyncResponse{" +
79+
"test='" + test + '\'' +
80+
", expirationTimeMillis=" + expirationTimeMillis +
81+
'}';
82+
}
6983
}
7084

7185
@Before
@@ -77,11 +91,46 @@ public void setup() {
7791
}
7892

7993
public void testEncodeSearchResponse() throws IOException {
80-
for (int i = 0; i < 10; i++) {
81-
TestAsyncResponse response = new TestAsyncResponse(randomAlphaOfLength(10), randomLong());
82-
String encoded = indexService.encodeResponse(response);
83-
TestAsyncResponse same = indexService.decodeResponse(encoded);
84-
assertThat(same, equalTo(response));
94+
final int iterations = iterations(1, 20);
95+
for (int i = 0; i < iterations; i++) {
96+
long expirationTime = randomLong();
97+
String testMessage = randomAlphaOfLength(10);
98+
TestAsyncResponse initialResponse = new TestAsyncResponse(testMessage, expirationTime);
99+
AsyncExecutionId executionId = new AsyncExecutionId(
100+
Long.toString(randomNonNegativeLong()),
101+
new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()));
102+
103+
PlainActionFuture<IndexResponse> createFuture = new PlainActionFuture<>();
104+
indexService.createResponse(executionId.getDocId(), Map.of(), initialResponse, createFuture);
105+
assertThat(createFuture.actionGet().getResult(), equalTo(DocWriteResponse.Result.CREATED));
106+
107+
if (randomBoolean()) {
108+
PlainActionFuture<TestAsyncResponse> getFuture = new PlainActionFuture<>();
109+
indexService.getResponse(executionId, randomBoolean(), getFuture);
110+
assertThat(getFuture.actionGet(), equalTo(initialResponse));
111+
}
112+
113+
int updates = randomIntBetween(1, 5);
114+
for (int u = 0; u < updates; u++) {
115+
if (randomBoolean()) {
116+
testMessage = randomAlphaOfLength(10);
117+
TestAsyncResponse updateResponse = new TestAsyncResponse(testMessage, randomLong());
118+
PlainActionFuture<UpdateResponse> updateFuture = new PlainActionFuture<>();
119+
indexService.updateResponse(executionId.getDocId(), Map.of(), updateResponse, updateFuture);
120+
updateFuture.actionGet();
121+
} else {
122+
expirationTime = randomLong();
123+
PlainActionFuture<UpdateResponse> updateFuture = new PlainActionFuture<>();
124+
indexService.updateExpirationTime(executionId.getDocId(), expirationTime, updateFuture);
125+
updateFuture.actionGet();
126+
}
127+
if (randomBoolean()) {
128+
PlainActionFuture<TestAsyncResponse> getFuture = new PlainActionFuture<>();
129+
indexService.getResponse(executionId, randomBoolean(), getFuture);
130+
assertThat(getFuture.actionGet().test, equalTo(testMessage));
131+
assertThat(getFuture.actionGet().expirationTimeMillis, equalTo(expirationTime));
132+
}
133+
}
85134
}
86135
}
87136
}

0 commit comments

Comments
 (0)