Skip to content

Commit 5d361b2

Browse files
Add circuit breaker to storing async response
Related to #67594
1 parent c13384c commit 5d361b2

File tree

9 files changed

+127
-35
lines changed

9 files changed

+127
-35
lines changed

server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@ private void ensureCapacity(int size) {
6464
assert overflow.size() >= overflowSize;
6565
}
6666

67+
/**
68+
* Returns the current size of the buffer.
69+
*
70+
* @return the number of bytes in this output stream.
71+
*/
72+
public int size() {
73+
return position;
74+
}
75+
6776
@Override
6877
public void writeBytes(byte[] b, int offset, int length) {
6978
if (position < buffer.length) {

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.common.util.concurrent.ThreadContext;
2929
import org.elasticsearch.index.engine.DocumentMissingException;
3030
import org.elasticsearch.index.engine.VersionConflictEngineException;
31+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
3132
import org.elasticsearch.search.SearchService;
3233
import org.elasticsearch.search.aggregations.InternalAggregation;
3334
import org.elasticsearch.tasks.Task;
@@ -51,13 +52,15 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
5152
private static final Logger logger = LogManager.getLogger(TransportSubmitAsyncSearchAction.class);
5253

5354
private final NodeClient nodeClient;
55+
private final CircuitBreakerService circuitBreakerService;
5456
private final Function<SearchRequest, InternalAggregation.ReduceContext> requestToAggReduceContextBuilder;
5557
private final TransportSearchAction searchAction;
5658
private final ThreadContext threadContext;
5759
private final AsyncTaskIndexService<AsyncSearchResponse> store;
5860

5961
@Inject
6062
public TransportSubmitAsyncSearchAction(ClusterService clusterService,
63+
CircuitBreakerService circuitBreakerService,
6164
TransportService transportService,
6265
ActionFilters actionFilters,
6366
NamedWriteableRegistry registry,
@@ -67,6 +70,7 @@ public TransportSubmitAsyncSearchAction(ClusterService clusterService,
6770
TransportSearchAction searchAction) {
6871
super(SubmitAsyncSearchAction.NAME, transportService, actionFilters, SubmitAsyncSearchRequest::new);
6972
this.nodeClient = nodeClient;
73+
this.circuitBreakerService = circuitBreakerService;
7074
this.requestToAggReduceContextBuilder = request -> searchService.aggReduceContextBuilder(request).forFinalReduction();
7175
this.searchAction = searchAction;
7276
this.threadContext = transportService.getThreadPool().getThreadContext();
@@ -92,6 +96,7 @@ public void onResponse(AsyncSearchResponse searchResponse) {
9296
// TODO: store intermediate results ?
9397
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
9498
store.createResponse(docId, searchTask.getOriginHeaders(), initialResp,
99+
circuitBreakerService,
95100
new ActionListener<>() {
96101
@Override
97102
public void onResponse(IndexResponse r) {
@@ -175,7 +180,7 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul
175180
private void onFinalResponse(AsyncSearchTask searchTask,
176181
AsyncSearchResponse response,
177182
Runnable nextAction) {
178-
store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
183+
store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(), response, circuitBreakerService,
179184
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
180185
exc -> {
181186
Throwable cause = ExceptionsHelper.unwrapCause(exc);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 74 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,23 @@
2121
import org.elasticsearch.cluster.metadata.IndexMetadata;
2222
import org.elasticsearch.cluster.service.ClusterService;
2323
import org.elasticsearch.common.TriFunction;
24-
import org.elasticsearch.common.bytes.BytesReference;
24+
import org.elasticsearch.common.breaker.CircuitBreaker;
25+
import org.elasticsearch.common.breaker.CircuitBreakingException;
26+
import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput;
2527
import org.elasticsearch.common.collect.Tuple;
2628
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
27-
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2829
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
2930
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
3031
import org.elasticsearch.common.io.stream.StreamInput;
3132
import org.elasticsearch.common.io.stream.Writeable;
33+
import org.elasticsearch.common.lease.Releasable;
3234
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.common.util.BigArrays;
3336
import org.elasticsearch.common.util.concurrent.ThreadContext;
3437
import org.elasticsearch.common.xcontent.XContentBuilder;
3538
import org.elasticsearch.common.xcontent.XContentType;
3639
import org.elasticsearch.indices.SystemIndexDescriptor;
40+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
3741
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
3842
import org.elasticsearch.tasks.Task;
3943
import org.elasticsearch.tasks.TaskManager;
@@ -180,16 +184,23 @@ public Authentication getAuthentication() {
180184
public void createResponse(String docId,
181185
Map<String, String> headers,
182186
R response,
183-
ActionListener<IndexResponse> listener) throws IOException {
184-
Map<String, Object> source = new HashMap<>();
185-
source.put(HEADERS_FIELD, headers);
186-
source.put(EXPIRATION_TIME_FIELD, response.getExpirationTime());
187-
source.put(RESULT_FIELD, encodeResponse(response));
188-
IndexRequest indexRequest = new IndexRequest(index)
189-
.create(true)
190-
.id(docId)
191-
.source(source, XContentType.JSON);
192-
clientWithOrigin.index(indexRequest, listener);
187+
CircuitBreakerService circuitBreakerService,
188+
ActionListener<IndexResponse> listener0) throws IOException {
189+
AsyncResponseUpdateContext updateContext = new AsyncResponseUpdateContext(circuitBreakerService);
190+
ActionListener<IndexResponse> listener = ActionListener.runAfter(listener0, () -> updateContext.close());
191+
try {
192+
Map<String, Object> source = new HashMap<>();
193+
source.put(HEADERS_FIELD, headers);
194+
source.put(EXPIRATION_TIME_FIELD, response.getExpirationTime());
195+
source.put(RESULT_FIELD, encodeResponse(response, updateContext));
196+
IndexRequest indexRequest = new IndexRequest(index)
197+
.create(true)
198+
.id(docId)
199+
.source(source, XContentType.JSON);
200+
clientWithOrigin.index(indexRequest, listener);
201+
} catch(Exception e) {
202+
listener.onFailure(e);
203+
}
193204
}
194205

195206
/**
@@ -198,11 +209,14 @@ public void createResponse(String docId,
198209
public void updateResponse(String docId,
199210
Map<String, List<String>> responseHeaders,
200211
R response,
201-
ActionListener<UpdateResponse> listener) {
212+
CircuitBreakerService circuitBreakerService,
213+
ActionListener<UpdateResponse> listener0) {
214+
AsyncResponseUpdateContext updateContext = new AsyncResponseUpdateContext(circuitBreakerService);
215+
ActionListener<UpdateResponse> listener = ActionListener.runAfter(listener0, () -> updateContext.close());
202216
try {
203217
Map<String, Object> source = new HashMap<>();
204218
source.put(RESPONSE_HEADERS_FIELD, responseHeaders);
205-
source.put(RESULT_FIELD, encodeResponse(response));
219+
source.put(RESULT_FIELD, encodeResponse(response, updateContext));
206220
UpdateRequest request = new UpdateRequest()
207221
.index(index)
208222
.id(docId)
@@ -453,13 +467,27 @@ boolean ensureAuthenticatedUserIsSame(Map<String, String> originHeaders, Authent
453467
}
454468

455469
/**
456-
* Encode the provided response in a binary form using base64 encoding.
470+
* Encodes the provided response in a binary form using base64 encoding.
471+
* Needs approximately up to 3.3X extra memory, where X is the original response size:
472+
* - extra X bytes - for RecyclingBytesStreamOutput that encodes the response in an array of bytes,
473+
* this memory allocation will be tracked automatically by BigArrays with circuitBreaker
474+
* - up to X bytes – for converting bytes stream to bytes array
475+
* - up to 1.3X bytes for encoded string, as Base64 adds around 33% overhead
476+
* @throws CircuitBreakingException
457477
*/
458-
String encodeResponse(R response) throws IOException {
459-
try (BytesStreamOutput out = new BytesStreamOutput()) {
478+
String encodeResponse(R response, AsyncResponseUpdateContext updateContext) throws IOException {
479+
BigArrays bigArrays = new BigArrays(
480+
null, updateContext.circuitBreakerService(), CircuitBreaker.REQUEST).withCircuitBreaking();
481+
// using RecyclingBytesStreamOutput allows to supply BigArrays with a circuit breaker
482+
try (RecyclingBytesStreamOutput out = new RecyclingBytesStreamOutput(new byte[0], bigArrays)) {
460483
Version.writeVersion(Version.CURRENT, out);
461484
response.writeTo(out);
462-
return Base64.getEncoder().encodeToString(BytesReference.toBytes(out.bytes()));
485+
486+
// need to check from circuitBreaker if additional 2.3X size is available
487+
long estimatedSize = Math.round(out.size() * 2.3);
488+
updateContext.addCircuitBreakerBytes(estimatedSize);
489+
490+
return Base64.getEncoder().encodeToString(out.toBytesRef().bytes);
463491
}
464492
}
465493

@@ -485,4 +513,32 @@ public static void restoreResponseHeadersContext(ThreadContext threadContext, Ma
485513
}
486514
}
487515
}
516+
517+
/**
518+
* A helper class for updating async search responses to track the memory usage
519+
*/
520+
static class AsyncResponseUpdateContext implements Releasable {
521+
private long circuitBreakerBytes = 0L;
522+
private CircuitBreakerService circuitBreakerService;
523+
524+
AsyncResponseUpdateContext(CircuitBreakerService circuitBreakerService) {
525+
assert circuitBreakerService != null : "Circuit breaker service must be provided when storing async search response!";
526+
this.circuitBreakerService = circuitBreakerService;
527+
}
528+
529+
public CircuitBreakerService circuitBreakerService() {
530+
return circuitBreakerService;
531+
}
532+
533+
public void addCircuitBreakerBytes(long estimatedSize) {
534+
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)
535+
.addEstimateBytesAndMaybeBreak(estimatedSize, "<storing_async_search_response>");
536+
circuitBreakerBytes += estimatedSize;
537+
}
538+
539+
@Override
540+
public void close() {
541+
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-circuitBreakerBytes);
542+
}
543+
}
488544
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.update.UpdateResponse;
1515
import org.elasticsearch.cluster.service.ClusterService;
1616
import org.elasticsearch.common.unit.TimeValue;
17+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
1718
import org.elasticsearch.tasks.CancellableTask;
1819
import org.elasticsearch.tasks.Task;
1920
import org.elasticsearch.tasks.TaskId;
@@ -39,6 +40,7 @@
3940

4041
public class AsyncResultsServiceTests extends ESSingleNodeTestCase {
4142
private ClusterService clusterService;
43+
private CircuitBreakerService circuitBreakerService;
4244
private TaskManager taskManager;
4345
private AsyncTaskIndexService<TestAsyncResponse> indexService;
4446

@@ -122,6 +124,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
122124
@Before
123125
public void setup() {
124126
clusterService = getInstanceFromNode(ClusterService.class);
127+
circuitBreakerService = getInstanceFromNode(CircuitBreakerService.class);
125128
TransportService transportService = getInstanceFromNode(TransportService.class);
126129
taskManager = transportService.getTaskManager();
127130
indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(),
@@ -162,7 +165,7 @@ public void testRetrieveFromMemoryWithExpiration() throws Exception {
162165
// we need to store initial result
163166
PlainActionFuture<IndexResponse> future = new PlainActionFuture<>();
164167
indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(),
165-
new TestAsyncResponse(null, task.getExpirationTime()), future);
168+
new TestAsyncResponse(null, task.getExpirationTime()), circuitBreakerService, future);
166169
future.actionGet(TimeValue.timeValueSeconds(10));
167170
}
168171

@@ -204,7 +207,7 @@ public void testAssertExpirationPropagation() throws Exception {
204207
// we need to store initial result
205208
PlainActionFuture<IndexResponse> future = new PlainActionFuture<>();
206209
indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(),
207-
new TestAsyncResponse(null, task.getExpirationTime()), future);
210+
new TestAsyncResponse(null, task.getExpirationTime()), circuitBreakerService, future);
208211
future.actionGet(TimeValue.timeValueSeconds(10));
209212
}
210213

@@ -242,17 +245,17 @@ public void testRetrieveFromDisk() throws Exception {
242245
// we need to store initial result
243246
PlainActionFuture<IndexResponse> futureCreate = new PlainActionFuture<>();
244247
indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(),
245-
new TestAsyncResponse(null, task.getExpirationTime()), futureCreate);
248+
new TestAsyncResponse(null, task.getExpirationTime()), circuitBreakerService, futureCreate);
246249
futureCreate.actionGet(TimeValue.timeValueSeconds(10));
247250

248251
PlainActionFuture<UpdateResponse> futureUpdate = new PlainActionFuture<>();
249252
indexService.updateResponse(task.getExecutionId().getDocId(), emptyMap(),
250-
new TestAsyncResponse("final_response", task.getExpirationTime()), futureUpdate);
253+
new TestAsyncResponse("final_response", task.getExpirationTime()), circuitBreakerService, futureUpdate);
251254
futureUpdate.actionGet(TimeValue.timeValueSeconds(10));
252255
} else {
253256
PlainActionFuture<IndexResponse> futureCreate = new PlainActionFuture<>();
254257
indexService.createResponse(task.getExecutionId().getDocId(), task.getOriginHeaders(),
255-
new TestAsyncResponse("final_response", task.getExpirationTime()), futureCreate);
258+
new TestAsyncResponse("final_response", task.getExpirationTime()), circuitBreakerService, futureCreate);
256259
futureCreate.actionGet(TimeValue.timeValueSeconds(10));
257260
}
258261

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.cluster.service.ClusterService;
1010
import org.elasticsearch.common.io.stream.StreamInput;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
1213
import org.elasticsearch.test.ESSingleNodeTestCase;
1314
import org.elasticsearch.transport.TransportService;
1415
import org.junit.Before;
@@ -22,6 +23,7 @@
2223
// TODO: test CRUD operations
2324
public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
2425
private AsyncTaskIndexService<TestAsyncResponse> indexService;
26+
private CircuitBreakerService circuitBreakerService;
2527

2628
public static class TestAsyncResponse implements AsyncResponse<TestAsyncResponse> {
2729
public final String test;
@@ -72,16 +74,20 @@ public int hashCode() {
7274
public void setup() {
7375
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
7476
TransportService transportService = getInstanceFromNode(TransportService.class);
77+
circuitBreakerService = getInstanceFromNode(CircuitBreakerService.class);
7578
indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(),
7679
client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry());
7780
}
7881

7982
public void testEncodeSearchResponse() throws IOException {
8083
for (int i = 0; i < 10; i++) {
8184
TestAsyncResponse response = new TestAsyncResponse(randomAlphaOfLength(10), randomLong());
82-
String encoded = indexService.encodeResponse(response);
83-
TestAsyncResponse same = indexService.decodeResponse(encoded);
84-
assertThat(same, equalTo(response));
85+
try (AsyncTaskIndexService.AsyncResponseUpdateContext updateContext =
86+
new AsyncTaskIndexService.AsyncResponseUpdateContext(circuitBreakerService)) {
87+
String encoded = indexService.encodeResponse(response, updateContext);
88+
TestAsyncResponse same = indexService.decodeResponse(encoded);
89+
assertThat(same, equalTo(response));
90+
}
8591
}
8692
}
8793
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.settings.Settings;
1818
import org.elasticsearch.common.util.concurrent.ThreadContext;
1919
import org.elasticsearch.indices.SystemIndexDescriptor;
20+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
2021
import org.elasticsearch.plugins.Plugin;
2122
import org.elasticsearch.plugins.SystemIndexPlugin;
2223
import org.elasticsearch.tasks.TaskId;
@@ -36,13 +37,15 @@
3637
// TODO: test CRUD operations
3738
public class AsyncTaskServiceTests extends ESSingleNodeTestCase {
3839
private AsyncTaskIndexService<AsyncSearchResponse> indexService;
40+
private CircuitBreakerService circuitBreakerService;
3941

4042
public String index = ".async-search";
4143

4244
@Before
4345
public void setup() {
4446
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
4547
TransportService transportService = getInstanceFromNode(TransportService.class);
48+
circuitBreakerService = getInstanceFromNode(CircuitBreakerService.class);
4649
indexService = new AsyncTaskIndexService<>(index, clusterService,
4750
transportService.getThreadPool().getThreadContext(),
4851
client(), "test_origin", AsyncSearchResponse::new, writableRegistry());
@@ -138,7 +141,7 @@ public void testAutoCreateIndex() throws Exception {
138141
AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L);
139142
{
140143
PlainActionFuture<IndexResponse> future = PlainActionFuture.newFuture();
141-
indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future);
144+
indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, circuitBreakerService, future);
142145
future.get();
143146
assertSettings();
144147
}
@@ -157,7 +160,7 @@ public void testAutoCreateIndex() throws Exception {
157160
// So do updates
158161
{
159162
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
160-
indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, future);
163+
indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, circuitBreakerService, future);
161164
expectThrows(Exception.class, future::get);
162165
assertSettings();
163166
}
@@ -173,7 +176,7 @@ public void testAutoCreateIndex() throws Exception {
173176
// But the index is still auto-created
174177
{
175178
PlainActionFuture<IndexResponse> future = PlainActionFuture.newFuture();
176-
indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future);
179+
indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, circuitBreakerService, future);
177180
future.get();
178181
assertSettings();
179182
}

0 commit comments

Comments
 (0)