diff --git a/server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java index 5439f587c910c..6282728da55f9 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java @@ -64,6 +64,15 @@ private void ensureCapacity(int size) { assert overflow.size() >= overflowSize; } + /** + * Returns the current size of the buffer. + * + * @return the number of bytes in this output stream. + */ + public int size() { + return position; + } + @Override public void writeBytes(byte[] b, int offset, int length) { if (position < buffer.length) { diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index bc982dc02aa99..9dd730b558d9b 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.tasks.Task; @@ -51,6 +52,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction requestToAggReduceContextBuilder; private final TransportSearchAction searchAction; private final ThreadContext threadContext; @@ -58,6 +60,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction searchService.aggReduceContextBuilder(request).forFinalReduction(); this.searchAction = searchAction; this.threadContext = transportService.getThreadPool().getThreadContext(); @@ -92,6 +96,7 @@ public void onResponse(AsyncSearchResponse searchResponse) { // TODO: store intermediate results ? AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId()); store.createResponse(docId, searchTask.getOriginHeaders(), initialResp, + circuitBreakerService, new ActionListener<>() { @Override public void onResponse(IndexResponse r) { @@ -175,7 +180,7 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul private void onFinalResponse(AsyncSearchTask searchTask, AsyncSearchResponse response, Runnable nextAction) { - store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response, + store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(), response, circuitBreakerService, ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction), exc -> { Throwable cause = ExceptionsHelper.unwrapCause(exc); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 2ff2201792364..184ff64d8acb6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -21,19 +21,23 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriFunction; -import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; @@ -180,16 +184,23 @@ public Authentication getAuthentication() { public void createResponse(String docId, Map headers, R response, - ActionListener listener) throws IOException { - Map source = new HashMap<>(); - source.put(HEADERS_FIELD, headers); - source.put(EXPIRATION_TIME_FIELD, response.getExpirationTime()); - source.put(RESULT_FIELD, encodeResponse(response)); - IndexRequest indexRequest = new IndexRequest(index) - .create(true) - .id(docId) - .source(source, XContentType.JSON); - clientWithOrigin.index(indexRequest, listener); + CircuitBreakerService circuitBreakerService, + ActionListener listener0) throws IOException { + AsyncResponseUpdateContext updateContext = new AsyncResponseUpdateContext(circuitBreakerService); + ActionListener listener = ActionListener.runAfter(listener0, () -> updateContext.close()); + try { + Map source = new HashMap<>(); + source.put(HEADERS_FIELD, headers); + source.put(EXPIRATION_TIME_FIELD, response.getExpirationTime()); + source.put(RESULT_FIELD, encodeResponse(response, updateContext)); + IndexRequest indexRequest = new IndexRequest(index) + .create(true) + .id(docId) + .source(source, XContentType.JSON); + clientWithOrigin.index(indexRequest, listener); + } catch(Exception e) { + listener.onFailure(e); + } } /** @@ -198,11 +209,14 @@ public void createResponse(String docId, public void updateResponse(String docId, Map> responseHeaders, R response, - ActionListener listener) { + CircuitBreakerService circuitBreakerService, + ActionListener listener0) { + AsyncResponseUpdateContext updateContext = new AsyncResponseUpdateContext(circuitBreakerService); + ActionListener listener = ActionListener.runAfter(listener0, () -> updateContext.close()); try { Map source = new HashMap<>(); source.put(RESPONSE_HEADERS_FIELD, responseHeaders); - source.put(RESULT_FIELD, encodeResponse(response)); + source.put(RESULT_FIELD, encodeResponse(response, updateContext)); UpdateRequest request = new UpdateRequest() .index(index) .id(docId) @@ -453,13 +467,27 @@ boolean ensureAuthenticatedUserIsSame(Map originHeaders, Authent } /** - * Encode the provided response in a binary form using base64 encoding. + * Encodes the provided response in a binary form using base64 encoding. + * Needs approximately up to 3.3X extra memory, where X is the original response size: + * - extra X bytes - for RecyclingBytesStreamOutput that encodes the response in an array of bytes, + * this memory allocation will be tracked automatically by BigArrays with circuitBreaker + * - up to X bytes – for converting bytes stream to bytes array + * - up to 1.3X bytes for encoded string, as Base64 adds around 33% overhead + * @throws CircuitBreakingException */ - String encodeResponse(R response) throws IOException { - try (BytesStreamOutput out = new BytesStreamOutput()) { + String encodeResponse(R response, AsyncResponseUpdateContext updateContext) throws IOException { + BigArrays bigArrays = new BigArrays( + null, updateContext.circuitBreakerService(), CircuitBreaker.REQUEST).withCircuitBreaking(); + // using RecyclingBytesStreamOutput allows to supply BigArrays with a circuit breaker + try (RecyclingBytesStreamOutput out = new RecyclingBytesStreamOutput(new byte[0], bigArrays)) { Version.writeVersion(Version.CURRENT, out); response.writeTo(out); - return Base64.getEncoder().encodeToString(BytesReference.toBytes(out.bytes())); + + // need to check from circuitBreaker if additional 2.3X size is available + long estimatedSize = Math.round(out.size() * 2.3); + updateContext.addCircuitBreakerBytes(estimatedSize); + + return Base64.getEncoder().encodeToString(out.toBytesRef().bytes); } } @@ -485,4 +513,32 @@ public static void restoreResponseHeadersContext(ThreadContext threadContext, Ma } } } + + /** + * A helper class for updating async search responses to track the memory usage + */ + static class AsyncResponseUpdateContext implements Releasable { + private long circuitBreakerBytes = 0L; + private CircuitBreakerService circuitBreakerService; + + AsyncResponseUpdateContext(CircuitBreakerService circuitBreakerService) { + assert circuitBreakerService != null : "Circuit breaker service must be provided when storing async search response!"; + this.circuitBreakerService = circuitBreakerService; + } + + public CircuitBreakerService circuitBreakerService() { + return circuitBreakerService; + } + + public void addCircuitBreakerBytes(long estimatedSize) { + circuitBreakerService.getBreaker(CircuitBreaker.REQUEST) + .addEstimateBytesAndMaybeBreak(estimatedSize, ""); + circuitBreakerBytes += estimatedSize; + } + + @Override + public void close() { + circuitBreakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-circuitBreakerBytes); + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java index a9d1f27e16630..02fa32af6a67b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -39,6 +40,7 @@ public class AsyncResultsServiceTests extends ESSingleNodeTestCase { private ClusterService clusterService; + private CircuitBreakerService circuitBreakerService; private TaskManager taskManager; private AsyncTaskIndexService indexService; @@ -122,6 +124,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, @Before public void setup() { clusterService = getInstanceFromNode(ClusterService.class); + circuitBreakerService = getInstanceFromNode(CircuitBreakerService.class); TransportService transportService = getInstanceFromNode(TransportService.class); taskManager = transportService.getTaskManager(); indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(), @@ -162,7 +165,7 @@ public void testRetrieveFromMemoryWithExpiration() throws Exception { // we need to store initial result PlainActionFuture future = new PlainActionFuture<>(); indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(), - new TestAsyncResponse(null, task.getExpirationTime()), future); + new TestAsyncResponse(null, task.getExpirationTime()), circuitBreakerService, future); future.actionGet(TimeValue.timeValueSeconds(10)); } @@ -204,7 +207,7 @@ public void testAssertExpirationPropagation() throws Exception { // we need to store initial result PlainActionFuture future = new PlainActionFuture<>(); indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(), - new TestAsyncResponse(null, task.getExpirationTime()), future); + new TestAsyncResponse(null, task.getExpirationTime()), circuitBreakerService, future); future.actionGet(TimeValue.timeValueSeconds(10)); } @@ -242,17 +245,17 @@ public void testRetrieveFromDisk() throws Exception { // we need to store initial result PlainActionFuture futureCreate = new PlainActionFuture<>(); indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(), - new TestAsyncResponse(null, task.getExpirationTime()), futureCreate); + new TestAsyncResponse(null, task.getExpirationTime()), circuitBreakerService, futureCreate); futureCreate.actionGet(TimeValue.timeValueSeconds(10)); PlainActionFuture futureUpdate = new PlainActionFuture<>(); indexService.updateResponse(task.getExecutionId().getDocId(), emptyMap(), - new TestAsyncResponse("final_response", task.getExpirationTime()), futureUpdate); + new TestAsyncResponse("final_response", task.getExpirationTime()), circuitBreakerService, futureUpdate); futureUpdate.actionGet(TimeValue.timeValueSeconds(10)); } else { PlainActionFuture futureCreate = new PlainActionFuture<>(); indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(), - new TestAsyncResponse("final_response", task.getExpirationTime()), futureCreate); + new TestAsyncResponse("final_response", task.getExpirationTime()), circuitBreakerService, futureCreate); futureCreate.actionGet(TimeValue.timeValueSeconds(10)); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java index 8474d2ef1112a..0f4cc4294914b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.transport.TransportService; import org.junit.Before; @@ -22,6 +23,7 @@ // TODO: test CRUD operations public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase { private AsyncTaskIndexService indexService; + private CircuitBreakerService circuitBreakerService; public static class TestAsyncResponse implements AsyncResponse { public final String test; @@ -72,6 +74,7 @@ public int hashCode() { public void setup() { ClusterService clusterService = getInstanceFromNode(ClusterService.class); TransportService transportService = getInstanceFromNode(TransportService.class); + circuitBreakerService = getInstanceFromNode(CircuitBreakerService.class); indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(), client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry()); } @@ -79,9 +82,12 @@ public void setup() { public void testEncodeSearchResponse() throws IOException { for (int i = 0; i < 10; i++) { TestAsyncResponse response = new TestAsyncResponse(randomAlphaOfLength(10), randomLong()); - String encoded = indexService.encodeResponse(response); - TestAsyncResponse same = indexService.decodeResponse(encoded); - assertThat(same, equalTo(response)); + try (AsyncTaskIndexService.AsyncResponseUpdateContext updateContext = + new AsyncTaskIndexService.AsyncResponseUpdateContext(circuitBreakerService)) { + String encoded = indexService.encodeResponse(response, updateContext); + TestAsyncResponse same = indexService.decodeResponse(encoded); + assertThat(same, equalTo(response)); + } } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java index 35c163c7bc5f2..8537326d40598 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.tasks.TaskId; @@ -36,6 +37,7 @@ // TODO: test CRUD operations public class AsyncTaskServiceTests extends ESSingleNodeTestCase { private AsyncTaskIndexService indexService; + private CircuitBreakerService circuitBreakerService; public String index = ".async-search"; @@ -43,6 +45,7 @@ public class AsyncTaskServiceTests extends ESSingleNodeTestCase { public void setup() { ClusterService clusterService = getInstanceFromNode(ClusterService.class); TransportService transportService = getInstanceFromNode(TransportService.class); + circuitBreakerService = getInstanceFromNode(CircuitBreakerService.class); indexService = new AsyncTaskIndexService<>(index, clusterService, transportService.getThreadPool().getThreadContext(), client(), "test_origin", AsyncSearchResponse::new, writableRegistry()); @@ -138,7 +141,7 @@ public void testAutoCreateIndex() throws Exception { AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L); { PlainActionFuture future = PlainActionFuture.newFuture(); - indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future); + indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, circuitBreakerService, future); future.get(); assertSettings(); } @@ -157,7 +160,7 @@ public void testAutoCreateIndex() throws Exception { // So do updates { PlainActionFuture future = PlainActionFuture.newFuture(); - indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, future); + indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, circuitBreakerService, future); expectThrows(Exception.class, future::get); assertSettings(); } @@ -173,7 +176,7 @@ public void testAutoCreateIndex() throws Exception { // But the index is still auto-created { PlainActionFuture future = PlainActionFuture.newFuture(); - indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future); + indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, circuitBreakerService, future); future.get(); assertSettings(); } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java index 079d2d728f9e4..886f23b696306 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; @@ -52,6 +53,7 @@ public class AsyncTaskManagementService operation; private final ThreadPool threadPool; private final ClusterService clusterService; + private final CircuitBreakerService circuitBreakerService; private final Class taskClass; public interface AsyncOperation operation, Class taskClass, ClusterService clusterService, + CircuitBreakerService circuitBreakerService, ThreadPool threadPool) { this.taskManager = taskManager; this.action = action; @@ -115,6 +118,7 @@ public AsyncTaskManagementService(String index, Client client, String origin, Na this.asyncTaskIndexService = new AsyncTaskIndexService<>(index, clusterService, threadPool.getThreadContext(), client, origin, i -> new StoredAsyncResponse<>(operation::readResponse, i), registry); this.clusterService = clusterService; + this.circuitBreakerService = circuitBreakerService; this.threadPool = threadPool; } @@ -196,7 +200,7 @@ private void storeResults(T searchTask, StoredAsyncResponse storedResp private void storeResults(T searchTask, StoredAsyncResponse storedResponse, ActionListener finalListener) { try { asyncTaskIndexService.createResponse(searchTask.getExecutionId().getDocId(), - searchTask.getOriginHeaders(), storedResponse, ActionListener.wrap( + searchTask.getOriginHeaders(), storedResponse, circuitBreakerService, ActionListener.wrap( // We should only unregister after the result is saved resp -> { logger.trace(() -> new ParameterizedMessage("stored eql search results for [{}]", diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index b2cf7f51e94b6..38e6ce78428f0 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.time.DateUtils; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.fetch.subphase.FieldAndFormat; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -62,11 +63,11 @@ public class TransportEqlSearchAction extends HandledTransportAction asyncTaskManagementService; @Inject - public TransportEqlSearchAction(Settings settings, ClusterService clusterService, TransportService transportService, + public TransportEqlSearchAction(Settings settings, ClusterService clusterService, CircuitBreakerService circuitBreakerService, + TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, PlanExecutor planExecutor, NamedWriteableRegistry registry, Client client) { super(EqlSearchAction.NAME, transportService, actionFilters, EqlSearchRequest::new); - this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ? new SecurityContext(settings, threadPool.getThreadContext()) : null; this.clusterService = clusterService; @@ -75,7 +76,8 @@ public TransportEqlSearchAction(Settings settings, ClusterService clusterService this.transportService = transportService; this.asyncTaskManagementService = new AsyncTaskManagementService<>(XPackPlugin.ASYNC_RESULTS_INDEX, client, ASYNC_SEARCH_ORIGIN, - registry, taskManager, EqlSearchAction.INSTANCE.name(), this, EqlSearchTask.class, clusterService, threadPool); + registry, taskManager, EqlSearchAction.INSTANCE.name(), this, EqlSearchTask.class, clusterService, + circuitBreakerService, threadPool); } @Override diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java index 0164afd7ce850..a5258e13e9c0c 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -42,6 +43,7 @@ public class AsyncTaskManagementServiceTests extends ESSingleNodeTestCase { private ClusterService clusterService; private TransportService transportService; + private CircuitBreakerService circuitBreakerService; private AsyncResultsService> results; private final ExecutorService executorService = Executors.newFixedThreadPool(1); @@ -129,6 +131,7 @@ public TestResponse readResponse(StreamInput inputStream) throws IOException { public void setup() { clusterService = getInstanceFromNode(ClusterService.class); transportService = getInstanceFromNode(TransportService.class); + circuitBreakerService = getInstanceFromNode(CircuitBreakerService.class); AsyncTaskIndexService> store = new AsyncTaskIndexService<>(index, clusterService, transportService.getThreadPool().getThreadContext(), client(), "test", in -> new StoredAsyncResponse<>(TestResponse::new, in), writableRegistry()); @@ -148,7 +151,8 @@ public void shutdownExec() { private AsyncTaskManagementService createManagementService( AsyncTaskManagementService.AsyncOperation operation) { return new AsyncTaskManagementService<>(index, client(), "test_origin", writableRegistry(), - transportService.getTaskManager(), "test_action", operation, TestTask.class, clusterService, transportService.getThreadPool()); + transportService.getTaskManager(), "test_action", operation, TestTask.class, clusterService, circuitBreakerService, + transportService.getThreadPool()); } public void testReturnBeforeTimeout() throws Exception {