Skip to content

Commit df52c2b

Browse files
committed
Early detection of circuit breaker exception in the coordinating node (#67431)
In #62884 we added the support for the request circuit breaker in search coordinating nodes. Today the circuit breaker is strictly checked only when a partial or final reduce occurs. With this commit, we also check the circuit breaker strictly when a shard response is received and we cancel the request early if an exception is thrown at this point.
1 parent cf5fd53 commit df52c2b

File tree

3 files changed

+27
-24
lines changed

3 files changed

+27
-24
lines changed

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,17 @@ public void consume(QuerySearchResult result, Runnable next) {
316316
emptyResults.add(new SearchShard(target.getClusterAlias(), target.getShardId()));
317317
}
318318
} else {
319+
if (hasAggs) {
320+
long aggsSize = ramBytesUsedQueryResult(result);
321+
try {
322+
addEstimateAndMaybeBreak(aggsSize);
323+
} catch (Exception exc) {
324+
onMergeFailure(exc);
325+
next.run();
326+
return;
327+
}
328+
aggsCurrentBufferSize += aggsSize;
329+
}
319330
// add one if a partial merge is pending
320331
int size = buffer.size() + (hasPartialReduce ? 1 : 0);
321332
if (size >= batchReduceSize) {
@@ -329,11 +340,6 @@ public void consume(QuerySearchResult result, Runnable next) {
329340
queue.add(task);
330341
tryExecuteNext();
331342
}
332-
if (hasAggs) {
333-
long aggsSize = ramBytesUsedQueryResult(result);
334-
addWithoutBreaking(aggsSize);
335-
aggsCurrentBufferSize += aggsSize;
336-
}
337343
buffer.add(result);
338344
}
339345
}

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

+2
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ protected void onShardResult(SearchPhaseResult result, SearchShardIterator shard
9393
if (queryResult.isNull() == false
9494
// disable sort optims for scroll requests because they keep track of the last bottom doc locally (per shard)
9595
&& getRequest().scroll() == null
96+
// top docs are already consumed if the query was cancelled or in error.
97+
&& queryResult.hasConsumedTopDocs() == false
9698
&& queryResult.topDocs() != null
9799
&& queryResult.topDocs().topDocs.getClass() == TopFieldDocs.class) {
98100
TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs;

server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

+14-19
Original file line numberDiff line numberDiff line change
@@ -938,23 +938,16 @@ public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, Interna
938938
}
939939
}
940940

941-
public void testPartialReduce() throws Exception {
942-
for (int i = 0; i < 10; i++) {
943-
testReduceCase(false);
944-
}
945-
}
946-
947-
public void testPartialReduceWithFailure() throws Exception {
948-
for (int i = 0; i < 10; i++) {
949-
testReduceCase(true);
950-
}
941+
public void testCoordCircuitBreaker() throws Exception {
942+
int numShards = randomIntBetween(20, 200);
943+
testReduceCase(numShards, numShards, true);
944+
testReduceCase(numShards, numShards, false);
945+
testReduceCase(numShards, randomIntBetween(2, numShards-1), true);
946+
testReduceCase(numShards, randomIntBetween(2, numShards-1), false);
951947
}
952948

953-
private void testReduceCase(boolean shouldFail) throws Exception {
954-
int expectedNumResults = randomIntBetween(20, 200);
955-
int bufferSize = randomIntBetween(2, expectedNumResults - 1);
949+
private void testReduceCase(int numShards, int bufferSize, boolean shouldFail) throws Exception {
956950
SearchRequest request = new SearchRequest();
957-
958951
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
959952
request.setBatchedReduceSize(bufferSize);
960953
AtomicBoolean hasConsumedFailure = new AtomicBoolean();
@@ -965,10 +958,10 @@ private void testReduceCase(boolean shouldFail) throws Exception {
965958
}
966959
QueryPhaseResultConsumer consumer = searchPhaseController.newSearchPhaseResults(fixedExecutor,
967960
circuitBreaker, SearchProgressListener.NOOP,
968-
request, expectedNumResults, exc -> hasConsumedFailure.set(true));
969-
CountDownLatch latch = new CountDownLatch(expectedNumResults);
970-
Thread[] threads = new Thread[expectedNumResults];
971-
for (int i = 0; i < expectedNumResults; i++) {
961+
request, numShards, exc -> hasConsumedFailure.set(true));
962+
CountDownLatch latch = new CountDownLatch(numShards);
963+
Thread[] threads = new Thread[numShards];
964+
for (int i = 0; i < numShards; i++) {
972965
final int index = i;
973966
threads[index] = new Thread(() -> {
974967
QuerySearchResult result = new QuerySearchResult(new ShardSearchContextId(UUIDs.randomBase64UUID(), index),
@@ -987,13 +980,15 @@ private void testReduceCase(boolean shouldFail) throws Exception {
987980
});
988981
threads[index].start();
989982
}
990-
for (int i = 0; i < expectedNumResults; i++) {
983+
for (int i = 0; i < numShards; i++) {
991984
threads[i].join();
992985
}
993986
latch.await();
994987
if (shouldFail) {
995988
if (shouldFailPartial == false) {
996989
circuitBreaker.shouldBreak.set(true);
990+
} else {
991+
circuitBreaker.shouldBreak.set(false);
997992
}
998993
CircuitBreakingException exc = expectThrows(CircuitBreakingException.class, () -> consumer.reduce());
999994
assertEquals(shouldFailPartial, hasConsumedFailure.get());

0 commit comments

Comments
 (0)