Skip to content

Commit 6ab5da4

Browse files
Integrate circuit breaker in AsyncTaskIndexService (#73862)
This change integrates the circuit breaker in AsyncTaskIndexService to make sure that we won't hit OOM when serializing a large response of an async search. Related to #67594 Supersedes #73638 Co-authored-by: Mayya Sharipova <[email protected]>
1 parent fed630b commit 6ab5da4

File tree

16 files changed

+231
-61
lines changed

16 files changed

+231
-61
lines changed

server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.core.Nullable;
2121
import org.elasticsearch.common.xcontent.ParseField;
2222
import org.elasticsearch.common.Strings;
23+
import org.elasticsearch.common.bytes.BytesReference;
2324
import org.elasticsearch.common.io.stream.StreamInput;
2425
import org.elasticsearch.common.io.stream.StreamOutput;
2526
import org.elasticsearch.common.lucene.uid.Versions;
@@ -728,6 +729,14 @@ public UpdateRequest doc(Object... source) {
728729
return this;
729730
}
730731

732+
/**
733+
* Sets the doc to use for updates when a script is not specified. The doc is provided in a bytes form.
734+
*/
735+
public UpdateRequest doc(BytesReference source, XContentType contentType) {
736+
safeDoc().source(source, contentType);
737+
return this;
738+
}
739+
731740
/**
732741
* Sets the doc to use for updates when a script is not specified, the doc provided
733742
* is a field and value pairs.

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.cluster.service.ClusterService;
1616
import org.elasticsearch.common.inject.Inject;
1717
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
18+
import org.elasticsearch.common.util.BigArrays;
1819
import org.elasticsearch.tasks.Task;
1920
import org.elasticsearch.threadpool.ThreadPool;
2021
import org.elasticsearch.transport.TransportService;
@@ -37,19 +38,21 @@ public TransportGetAsyncSearchAction(TransportService transportService,
3738
ClusterService clusterService,
3839
NamedWriteableRegistry registry,
3940
Client client,
40-
ThreadPool threadPool) {
41+
ThreadPool threadPool,
42+
BigArrays bigArrays) {
4143
super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new);
4244
this.transportService = transportService;
43-
this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool);
45+
this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool, bigArrays);
4446
}
4547

4648
static AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> createResultsService(TransportService transportService,
4749
ClusterService clusterService,
4850
NamedWriteableRegistry registry,
4951
Client client,
50-
ThreadPool threadPool) {
52+
ThreadPool threadPool,
53+
BigArrays bigArrays) {
5154
AsyncTaskIndexService<AsyncSearchResponse> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
52-
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
55+
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry, bigArrays);
5356
return new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener,
5457
transportService.getTaskManager(), clusterService);
5558
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.cluster.service.ClusterService;
1616
import org.elasticsearch.common.inject.Inject;
1717
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
18+
import org.elasticsearch.common.util.BigArrays;
1819
import org.elasticsearch.tasks.Task;
1920
import org.elasticsearch.threadpool.ThreadPool;
2021
import org.elasticsearch.transport.TransportService;
@@ -37,16 +38,17 @@ public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsy
3738

3839
@Inject
3940
public TransportGetAsyncStatusAction(TransportService transportService,
40-
ActionFilters actionFilters,
41-
ClusterService clusterService,
42-
NamedWriteableRegistry registry,
43-
Client client,
44-
ThreadPool threadPool) {
41+
ActionFilters actionFilters,
42+
ClusterService clusterService,
43+
NamedWriteableRegistry registry,
44+
Client client,
45+
ThreadPool threadPool,
46+
BigArrays bigArrays) {
4547
super(GetAsyncStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new);
4648
this.transportService = transportService;
4749
this.clusterService = clusterService;
4850
this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
49-
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
51+
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry, bigArrays);
5052
}
5153

5254
@Override

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.UUIDs;
2525
import org.elasticsearch.common.inject.Inject;
2626
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
27+
import org.elasticsearch.common.util.BigArrays;
2728
import org.elasticsearch.core.TimeValue;
2829
import org.elasticsearch.common.util.concurrent.ThreadContext;
2930
import org.elasticsearch.index.engine.DocumentMissingException;
@@ -64,14 +65,15 @@ public TransportSubmitAsyncSearchAction(ClusterService clusterService,
6465
Client client,
6566
NodeClient nodeClient,
6667
SearchService searchService,
67-
TransportSearchAction searchAction) {
68+
TransportSearchAction searchAction,
69+
BigArrays bigArrays) {
6870
super(SubmitAsyncSearchAction.NAME, transportService, actionFilters, SubmitAsyncSearchRequest::new);
6971
this.nodeClient = nodeClient;
7072
this.requestToAggReduceContextBuilder = request -> searchService.aggReduceContextBuilder(request).forFinalReduction();
7173
this.searchAction = searchAction;
7274
this.threadContext = transportService.getThreadPool().getThreadContext();
7375
this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadContext, client,
74-
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
76+
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry, bigArrays);
7577
}
7678

7779
@Override

x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.async;
99

1010
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.client.OriginSettingClient;
1112
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1213
import org.elasticsearch.cluster.node.DiscoveryNode;
1314
import org.elasticsearch.cluster.service.ClusterService;
@@ -24,9 +25,7 @@
2425
import org.elasticsearch.threadpool.ThreadPool;
2526
import org.elasticsearch.watcher.ResourceWatcherService;
2627
import org.elasticsearch.xpack.core.XPackPlugin;
27-
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
2828
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
29-
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
3029

3130
import java.util.ArrayList;
3231
import java.util.Collection;
@@ -76,21 +75,12 @@ public Collection<Object> createComponents(
7675
List<Object> components = new ArrayList<>();
7776
if (DiscoveryNode.canContainData(environment.settings())) {
7877
// only data nodes should be eligible to run the maintenance service.
79-
AsyncTaskIndexService<AsyncSearchResponse> indexService = new AsyncTaskIndexService<>(
80-
XPackPlugin.ASYNC_RESULTS_INDEX,
81-
clusterService,
82-
threadPool.getThreadContext(),
83-
client,
84-
ASYNC_SEARCH_ORIGIN,
85-
AsyncSearchResponse::new,
86-
namedWriteableRegistry
87-
);
8878
AsyncTaskMaintenanceService maintenanceService = new AsyncTaskMaintenanceService(
8979
clusterService,
9080
nodeEnvironment.nodeId(),
9181
settings,
9282
threadPool,
93-
indexService
83+
new OriginSettingClient(client, ASYNC_SEARCH_ORIGIN)
9484
);
9585
components.add(maintenanceService);
9686
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.elasticsearch.cluster.service.ClusterService;
2727
import org.elasticsearch.common.TriFunction;
2828
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
29+
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
30+
import org.elasticsearch.common.util.BigArrays;
2931
import org.elasticsearch.common.xcontent.XContentFactory;
3032
import org.elasticsearch.core.Tuple;
3133
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
@@ -116,6 +118,7 @@ static XContentBuilder mappings() throws IOException {
116118
private final SecurityContext securityContext;
117119
private final NamedWriteableRegistry registry;
118120
private final Writeable.Reader<R> reader;
121+
private final BigArrays bigArrays;
119122

120123

121124
public AsyncTaskIndexService(String index,
@@ -124,14 +127,16 @@ public AsyncTaskIndexService(String index,
124127
Client client,
125128
String origin,
126129
Writeable.Reader<R> reader,
127-
NamedWriteableRegistry registry) {
130+
NamedWriteableRegistry registry,
131+
BigArrays bigArrays) {
128132
this.index = index;
129133
this.clusterService = clusterService;
130134
this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext);
131135
this.client = client;
132136
this.clientWithOrigin = new OriginSettingClient(client, origin);
133137
this.registry = registry;
134138
this.reader = reader;
139+
this.bigArrays = bigArrays;
135140
}
136141

137142
/**
@@ -190,20 +195,25 @@ public Authentication getAuthentication() {
190195
public void createResponse(String docId,
191196
Map<String, String> headers,
192197
R response,
193-
ActionListener<IndexResponse> listener) throws IOException {
194-
createIndexIfNecessary(listener.delegateFailure((ignored, l) -> {
195-
// TODO: Integrate with circuit breaker
198+
ActionListener<IndexResponse> outerListener) throws IOException {
199+
createIndexIfNecessary(outerListener.delegateFailure((listener, ignored) -> {
196200
try {
197-
final XContentBuilder source = XContentFactory.jsonBuilder()
201+
final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking());
202+
final XContentBuilder source = XContentFactory.jsonBuilder(buffer);
203+
listener = ActionListener.runBefore(listener, source::close);
204+
source
198205
.startObject()
199206
.field(HEADERS_FIELD, headers)
200207
.field(EXPIRATION_TIME_FIELD, response.getExpirationTime())
201208
.directFieldAsBase64(RESULT_FIELD, os -> writeResponse(response, os))
202209
.endObject();
210+
// do not close the buffer or the XContentBuilder until the IndexRequest is completed (i.e., listener is notified);
211+
// otherwise, we underestimate the memory usage in case the circuit breaker does not use the real memory usage.
212+
source.flush();
203213
final IndexRequest indexRequest = new IndexRequest(index)
204214
.create(true)
205215
.id(docId)
206-
.source(source);
216+
.source(buffer.bytes(), source.contentType());
207217
clientWithOrigin.index(indexRequest, listener);
208218
} catch (Exception e) {
209219
listener.onFailure(e);
@@ -217,19 +227,24 @@ public void createResponse(String docId,
217227
public void updateResponse(String docId,
218228
Map<String, List<String>> responseHeaders,
219229
R response,
220-
ActionListener<UpdateResponse> listener) {
221-
createIndexIfNecessary(listener.delegateFailure((ignored, l) -> {
230+
ActionListener<UpdateResponse> outerListener) {
231+
createIndexIfNecessary(outerListener.delegateFailure((listener, ignored) -> {
222232
try {
223-
// TODO: Integrate with circuit breaker
224-
final XContentBuilder source = XContentFactory.jsonBuilder()
233+
final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking());
234+
final XContentBuilder source = XContentFactory.jsonBuilder(buffer);
235+
listener = ActionListener.runBefore(listener, source::close);
236+
source
225237
.startObject()
226238
.field(RESPONSE_HEADERS_FIELD, responseHeaders)
227239
.directFieldAsBase64(RESULT_FIELD, os -> writeResponse(response, os))
228240
.endObject();
241+
// do not close the buffer or the XContentBuilder until the UpdateRequest is completed (i.e., listener is notified);
242+
// otherwise, we underestimate the memory usage in case the circuit breaker does not use the real memory usage.
243+
source.flush();
229244
final UpdateRequest request = new UpdateRequest()
230245
.index(index)
231246
.id(docId)
232-
.doc(source)
247+
.doc(buffer.bytes(), source.contentType())
233248
.retryOnConflict(5);
234249
clientWithOrigin.update(request, listener);
235250
} catch (Exception e) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.client.Client;
1314
import org.elasticsearch.cluster.ClusterChangedEvent;
1415
import org.elasticsearch.cluster.ClusterState;
1516
import org.elasticsearch.cluster.ClusterStateListener;
@@ -55,7 +56,7 @@ public class AsyncTaskMaintenanceService extends AbstractLifecycleComponent impl
5556
private final String index;
5657
private final String localNodeId;
5758
private final ThreadPool threadPool;
58-
private final AsyncTaskIndexService<?> indexService;
59+
private final Client clientWithOrigin;
5960
private final TimeValue delay;
6061

6162
private boolean isCleanupRunning;
@@ -65,12 +66,12 @@ public AsyncTaskMaintenanceService(ClusterService clusterService,
6566
String localNodeId,
6667
Settings nodeSettings,
6768
ThreadPool threadPool,
68-
AsyncTaskIndexService<?> indexService) {
69+
Client clientWithOrigin) {
6970
this.clusterService = clusterService;
7071
this.index = XPackPlugin.ASYNC_RESULTS_INDEX;
7172
this.localNodeId = localNodeId;
7273
this.threadPool = threadPool;
73-
this.indexService = indexService;
74+
this.clientWithOrigin = clientWithOrigin;
7475
this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings);
7576
}
7677

@@ -125,8 +126,7 @@ synchronized void executeNextCleanup() {
125126
long nowInMillis = System.currentTimeMillis();
126127
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(index)
127128
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
128-
indexService.getClientWithOrigin()
129-
.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(this::scheduleNextCleanup));
129+
clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(this::scheduleNextCleanup));
130130
}
131131
}
132132

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/TransportDeleteAsyncResultAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.service.ClusterService;
1717
import org.elasticsearch.common.inject.Inject;
1818
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
19+
import org.elasticsearch.common.util.BigArrays;
1920
import org.elasticsearch.tasks.Task;
2021
import org.elasticsearch.threadpool.ThreadPool;
2122
import org.elasticsearch.transport.TransportService;
@@ -34,13 +35,14 @@ public TransportDeleteAsyncResultAction(TransportService transportService,
3435
ClusterService clusterService,
3536
NamedWriteableRegistry registry,
3637
Client client,
37-
ThreadPool threadPool) {
38+
ThreadPool threadPool,
39+
BigArrays bigArrays) {
3840
super(DeleteAsyncResultAction.NAME, transportService, actionFilters, DeleteAsyncResultRequest::new);
3941
this.transportService = transportService;
4042
this.clusterService = clusterService;
4143
AsyncTaskIndexService<?> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
4244
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN,
43-
(in) -> {throw new UnsupportedOperationException("Reading is not supported during deletion");}, registry);
45+
(in) -> {throw new UnsupportedOperationException("Reading is not supported during deletion");}, registry, bigArrays);
4446
this.deleteResultsService = new DeleteAsyncResultsService(store, transportService.getTaskManager());
4547
}
4648

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.update.UpdateResponse;
1515
import org.elasticsearch.cluster.service.ClusterService;
1616
import org.elasticsearch.core.TimeValue;
17+
import org.elasticsearch.common.util.BigArrays;
1718
import org.elasticsearch.tasks.CancellableTask;
1819
import org.elasticsearch.tasks.Task;
1920
import org.elasticsearch.tasks.TaskId;
@@ -123,9 +124,10 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
123124
public void setup() {
124125
clusterService = getInstanceFromNode(ClusterService.class);
125126
TransportService transportService = getInstanceFromNode(TransportService.class);
127+
BigArrays bigArrays = getInstanceFromNode(BigArrays.class);
126128
taskManager = transportService.getTaskManager();
127129
indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(),
128-
client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry());
130+
client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry(), bigArrays);
129131

130132
}
131133

0 commit comments

Comments
 (0)