Skip to content

Commit 9114e82

Browse files
authored
Early detection of circuit breaker exception in the coordinating node (#67431)
In #62884 we added the support for the request circuit breaker in search coordinating nodes. Today the circuit breaker is strictly checked only when a partial or final reduce occurs. With this commit, we also check the circuit breaker strictly when a shard response is received and we cancel the request early if an exception is thrown at this point.
1 parent 03f5670 commit 9114e82

File tree

3 files changed

+27
-24
lines changed

3 files changed

+27
-24
lines changed

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,17 @@ public void consume(QuerySearchResult result, Runnable next) {
316316
emptyResults.add(new SearchShard(target.getClusterAlias(), target.getShardId()));
317317
}
318318
} else {
319+
if (hasAggs) {
320+
long aggsSize = ramBytesUsedQueryResult(result);
321+
try {
322+
addEstimateAndMaybeBreak(aggsSize);
323+
} catch (Exception exc) {
324+
onMergeFailure(exc);
325+
next.run();
326+
return;
327+
}
328+
aggsCurrentBufferSize += aggsSize;
329+
}
319330
// add one if a partial merge is pending
320331
int size = buffer.size() + (hasPartialReduce ? 1 : 0);
321332
if (size >= batchReduceSize) {
@@ -329,11 +340,6 @@ public void consume(QuerySearchResult result, Runnable next) {
329340
queue.add(task);
330341
tryExecuteNext();
331342
}
332-
if (hasAggs) {
333-
long aggsSize = ramBytesUsedQueryResult(result);
334-
addWithoutBreaking(aggsSize);
335-
aggsCurrentBufferSize += aggsSize;
336-
}
337343
buffer.add(result);
338344
}
339345
}

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

+2
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ protected void onShardResult(SearchPhaseResult result, SearchShardIterator shard
9393
if (queryResult.isNull() == false
9494
// disable sort optims for scroll requests because they keep track of the last bottom doc locally (per shard)
9595
&& getRequest().scroll() == null
96+
// top docs are already consumed if the query was cancelled or in error.
97+
&& queryResult.hasConsumedTopDocs() == false
9698
&& queryResult.topDocs() != null
9799
&& queryResult.topDocs().topDocs.getClass() == TopFieldDocs.class) {
98100
TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs;

server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

+14-19
Original file line numberDiff line numberDiff line change
@@ -936,23 +936,16 @@ public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, Interna
936936
}
937937
}
938938

939-
public void testPartialReduce() throws Exception {
940-
for (int i = 0; i < 10; i++) {
941-
testReduceCase(false);
942-
}
943-
}
944-
945-
public void testPartialReduceWithFailure() throws Exception {
946-
for (int i = 0; i < 10; i++) {
947-
testReduceCase(true);
948-
}
939+
public void testCoordCircuitBreaker() throws Exception {
940+
int numShards = randomIntBetween(20, 200);
941+
testReduceCase(numShards, numShards, true);
942+
testReduceCase(numShards, numShards, false);
943+
testReduceCase(numShards, randomIntBetween(2, numShards-1), true);
944+
testReduceCase(numShards, randomIntBetween(2, numShards-1), false);
949945
}
950946

951-
private void testReduceCase(boolean shouldFail) throws Exception {
952-
int expectedNumResults = randomIntBetween(20, 200);
953-
int bufferSize = randomIntBetween(2, expectedNumResults - 1);
947+
private void testReduceCase(int numShards, int bufferSize, boolean shouldFail) throws Exception {
954948
SearchRequest request = new SearchRequest();
955-
956949
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
957950
request.setBatchedReduceSize(bufferSize);
958951
AtomicBoolean hasConsumedFailure = new AtomicBoolean();
@@ -963,10 +956,10 @@ private void testReduceCase(boolean shouldFail) throws Exception {
963956
}
964957
QueryPhaseResultConsumer consumer = searchPhaseController.newSearchPhaseResults(fixedExecutor,
965958
circuitBreaker, SearchProgressListener.NOOP,
966-
request, expectedNumResults, exc -> hasConsumedFailure.set(true));
967-
CountDownLatch latch = new CountDownLatch(expectedNumResults);
968-
Thread[] threads = new Thread[expectedNumResults];
969-
for (int i = 0; i < expectedNumResults; i++) {
959+
request, numShards, exc -> hasConsumedFailure.set(true));
960+
CountDownLatch latch = new CountDownLatch(numShards);
961+
Thread[] threads = new Thread[numShards];
962+
for (int i = 0; i < numShards; i++) {
970963
final int index = i;
971964
threads[index] = new Thread(() -> {
972965
QuerySearchResult result = new QuerySearchResult(new ShardSearchContextId(UUIDs.randomBase64UUID(), index),
@@ -985,13 +978,15 @@ private void testReduceCase(boolean shouldFail) throws Exception {
985978
});
986979
threads[index].start();
987980
}
988-
for (int i = 0; i < expectedNumResults; i++) {
981+
for (int i = 0; i < numShards; i++) {
989982
threads[i].join();
990983
}
991984
latch.await();
992985
if (shouldFail) {
993986
if (shouldFailPartial == false) {
994987
circuitBreaker.shouldBreak.set(true);
988+
} else {
989+
circuitBreaker.shouldBreak.set(false);
995990
}
996991
CircuitBreakingException exc = expectThrows(CircuitBreakingException.class, () -> consumer.reduce());
997992
assertEquals(shouldFailPartial, hasConsumedFailure.get());

0 commit comments

Comments
 (0)