Skip to content

Commit b5cae0a

Browse files
authored
Enforce max_buckets limit only in the final reduction phase (#36152)
Given that we check the max buckets limit on each shard when collecting the buckets, and that non final reduction cannot add buckets (see #35921), there is no point in counting and checking the number of buckets as part of non final reduction phases. Such check is still needed though in the final reduction phases to make sure that the number of returned buckets is not above the allowed threshold. Relates somehow to #32125 as we will make use of non final reduction phases in CCS alternate execution mode and that increases the chance that this check trips for nothing when reducing aggs in each remote cluster.
1 parent 36ddca7 commit b5cae0a

File tree

2 files changed

+24
-11
lines changed

2 files changed

+24
-11
lines changed

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,8 @@ public IndicesService getIndicesService() {
10901090
}
10911091

10921092
public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce) {
1093-
return new InternalAggregation.ReduceContext(bigArrays, scriptService, multiBucketConsumerService.create(), finalReduce);
1093+
return new InternalAggregation.ReduceContext(bigArrays, scriptService,
1094+
finalReduce ? multiBucketConsumerService.create() : bucketCount -> {}, finalReduce);
10941095
}
10951096

10961097
public static final class CanMatchResponse extends SearchPhaseResult {

server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.elasticsearch.search;
2020

2121
import com.carrotsearch.hppc.IntArrayList;
22-
2322
import org.apache.lucene.search.Query;
2423
import org.apache.lucene.store.AlreadyClosedException;
2524
import org.elasticsearch.action.ActionListener;
@@ -59,6 +58,8 @@
5958
import org.elasticsearch.script.MockScriptPlugin;
6059
import org.elasticsearch.script.Script;
6160
import org.elasticsearch.script.ScriptType;
61+
import org.elasticsearch.search.aggregations.InternalAggregation;
62+
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
6263
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
6364
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
6465
import org.elasticsearch.search.aggregations.support.ValueType;
@@ -155,7 +156,7 @@ protected Settings nodeSettings() {
155156
return Settings.builder().put("search.default_search_timeout", "5s").build();
156157
}
157158

158-
public void testClearOnClose() throws ExecutionException, InterruptedException {
159+
public void testClearOnClose() {
159160
createIndex("index");
160161
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
161162
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
@@ -167,7 +168,7 @@ public void testClearOnClose() throws ExecutionException, InterruptedException {
167168
assertEquals(0, service.getActiveContexts());
168169
}
169170

170-
public void testClearOnStop() throws ExecutionException, InterruptedException {
171+
public void testClearOnStop() {
171172
createIndex("index");
172173
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
173174
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
@@ -179,7 +180,7 @@ public void testClearOnStop() throws ExecutionException, InterruptedException {
179180
assertEquals(0, service.getActiveContexts());
180181
}
181182

182-
public void testClearIndexDelete() throws ExecutionException, InterruptedException {
183+
public void testClearIndexDelete() {
183184
createIndex("index");
184185
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
185186
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
@@ -208,7 +209,7 @@ public void testCloseSearchContextOnRewriteException() {
208209
assertEquals(activeRefs, indexShard.store().refCount());
209210
}
210211

211-
public void testSearchWhileIndexDeleted() throws IOException, InterruptedException {
212+
public void testSearchWhileIndexDeleted() throws InterruptedException {
212213
createIndex("index");
213214
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
214215

@@ -443,15 +444,15 @@ protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) {
443444
}
444445

445446
@Override
446-
protected void doWriteTo(StreamOutput out) throws IOException {
447+
protected void doWriteTo(StreamOutput out) {
447448
}
448449

449450
@Override
450-
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
451+
protected void doXContent(XContentBuilder builder, Params params) {
451452
}
452453

453454
@Override
454-
protected Query doToQuery(QueryShardContext context) throws IOException {
455+
protected Query doToQuery(QueryShardContext context) {
455456
return null;
456457
}
457458

@@ -501,7 +502,6 @@ public void testCanMatch() throws IOException {
501502
assertFalse(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
502503
new SearchSourceBuilder().query(new MatchNoneQueryBuilder()), Strings.EMPTY_ARRAY, false,
503504
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null)));
504-
505505
}
506506

507507
public void testCanRewriteToMatchNone() {
@@ -519,7 +519,6 @@ public void testCanRewriteToMatchNone() {
519519
.suggest(new SuggestBuilder())));
520520
assertFalse(SearchService.canRewriteToMatchNone(new SearchSourceBuilder().query(new TermQueryBuilder("foo", "bar"))
521521
.suggest(new SuggestBuilder())));
522-
523522
}
524523

525524
public void testSetSearchThrottled() {
@@ -568,4 +567,17 @@ public void testExpandSearchThrottled() {
568567
assertHitCount(client().prepareSearch().get(), 0L);
569568
assertHitCount(client().prepareSearch().setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED).get(), 1L);
570569
}
570+
571+
public void testCreateReduceContext() {
572+
final SearchService service = getInstanceFromNode(SearchService.class);
573+
{
574+
InternalAggregation.ReduceContext reduceContext = service.createReduceContext(true);
575+
expectThrows(MultiBucketConsumerService.TooManyBucketsException.class,
576+
() -> reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1));
577+
}
578+
{
579+
InternalAggregation.ReduceContext reduceContext = service.createReduceContext(false);
580+
reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1);
581+
}
582+
}
571583
}

0 commit comments

Comments
 (0)