Skip to content

Commit 29b5643

Browse files
authored
Increase search.max_buckets to 65,535 (#57042)
Increases the default search.max_buckets limit to 65,535, and only counts buckets during reduce phase. Closes #51731
1 parent 69b79d2 commit 29b5643

31 files changed

+92
-179
lines changed

docs/reference/aggregations/bucket.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ define fixed number of multiple buckets, and others dynamically create the bucke
1515

1616
NOTE: The maximum number of buckets allowed in a single response is limited by a
1717
dynamic cluster setting named
18-
<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 10,000,
18+
<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 65,535,
1919
requests that try to return more than the limit will fail with an exception.
2020

2121
include::bucket/adjacency-matrix-aggregation.asciidoc[]

server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@
3535
* An aggregation service that creates instances of {@link MultiBucketConsumer}.
3636
* The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
3737
* in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
38-
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
38+
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 65535.
3939
*/
4040
public class MultiBucketConsumerService {
41-
public static final int DEFAULT_MAX_BUCKETS = 10000;
41+
public static final int DEFAULT_MAX_BUCKETS = 65535;
4242
public static final Setting<Integer> MAX_BUCKET_SETTING =
4343
Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);
4444

@@ -102,6 +102,7 @@ public static class MultiBucketConsumer implements IntConsumer {
102102

103103
// aggregations execute in a single thread so no atomic here
104104
private int count;
105+
private int callCount = 0;
105106

106107
public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
107108
this.limit = limit;
@@ -110,15 +111,17 @@ public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
110111

111112
@Override
112113
public void accept(int value) {
113-
count += value;
114-
if (count > limit) {
115-
throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit
116-
+ "] but was [" + count + "]. This limit can be set by changing the [" +
117-
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
114+
if (value != 0) {
115+
count += value;
116+
if (count > limit) {
117+
throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit
118+
+ "] but was [" + count + "]. This limit can be set by changing the [" +
119+
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
120+
}
118121
}
119-
120-
// check parent circuit breaker every 1024 buckets
121-
if (value > 0 && (count & 0x3FF) == 0) {
122+
// check parent circuit breaker every 1024 calls
123+
callCount++;
124+
if ((callCount & 0x3FF) == 0) {
122125
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
123126
}
124127
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ public BucketsAggregator(String name, AggregatorFactories factories, SearchConte
5757
Map<String, Object> metadata) throws IOException {
5858
super(name, factories, context, parent, metadata);
5959
bigArrays = context.bigArrays();
60-
docCounts = bigArrays.newIntArray(1, true);
6160
if (context.aggregations() != null) {
6261
multiBucketConsumer = context.aggregations().multiBucketConsumer();
6362
} else {
6463
multiBucketConsumer = (count) -> {};
6564
}
65+
docCounts = bigArrays.newIntArray(1, true);
6666
}
6767

6868
/**
@@ -91,7 +91,12 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long
9191
* Same as {@link #collectBucket(LeafBucketCollector, int, long)}, but doesn't check if the docCounts needs to be re-sized.
9292
*/
9393
public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException {
94-
docCounts.increment(bucketOrd, 1);
94+
if (docCounts.increment(bucketOrd, 1) == 1) {
95+
// We calculate the final number of buckets only during the reduce phase. But we still need to
96+
// trigger bucket consumer from time to time in order to give it a chance to check available memory and break
97+
// the execution if we are running out. To achieve that we are passing 0 as a bucket count.
98+
multiBucketConsumer.accept(0);
99+
}
95100
subCollector.collect(doc, bucketOrd);
96101
}
97102

@@ -137,14 +142,6 @@ public final int bucketDocCount(long bucketOrd) {
137142
}
138143
}
139144

140-
/**
141-
* Adds {@code count} buckets to the global count for the request and fails if this number is greater than
142-
* the maximum number of buckets allowed in a response
143-
*/
144-
protected final void consumeBucketsAndMaybeBreak(int count) {
145-
multiBucketConsumer.accept(count);
146-
}
147-
148145
/**
149146
* Hook to allow taking an action before building buckets.
150147
*/
@@ -186,7 +183,7 @@ public InternalAggregation get(int index) {
186183
public int size() {
187184
return aggregations.length;
188185
}
189-
});
186+
});
190187
}
191188
return result;
192189
}
@@ -267,7 +264,6 @@ protected final <B> void buildSubAggsForBuckets(List<B> buckets,
267264
protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(long[] owningBucketOrds, int bucketsPerOwningBucketOrd,
268265
BucketBuilderForFixedCount<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
269266
int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
270-
consumeBucketsAndMaybeBreak(totalBuckets);
271267
long[] bucketOrdsToCollect = new long[totalBuckets];
272268
int bucketOrdIdx = 0;
273269
for (long owningBucketOrd : owningBucketOrds) {
@@ -299,7 +295,7 @@ protected interface BucketBuilderForFixedCount<B> {
299295
* @param owningBucketOrds owning bucket ordinals for which to build the results
300296
* @param resultBuilder how to build a result from the sub aggregation results
301297
*/
302-
protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds,
298+
protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds,
303299
SingleBucketResultBuilder resultBuilder) throws IOException {
304300
/*
305301
* It'd be entirely reasonable to call
@@ -328,7 +324,6 @@ protected interface SingleBucketResultBuilder {
328324
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongHash bucketOrds,
329325
BucketBuilderForVariable<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
330326
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
331-
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
332327
long[] bucketOrdsToCollect = new long[(int) bucketOrds.size()];
333328
for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
334329
bucketOrdsToCollect[bucketOrd] = bucketOrd;
@@ -360,7 +355,6 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(lo
360355
throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE
361356
+ "] buckets but attempted [" + totalOrdsToCollect + "]");
362357
}
363-
consumeBucketsAndMaybeBreak((int) totalOrdsToCollect);
364358
long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
365359
int b = 0;
366360
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
184184
totalBucketsToBuild++;
185185
}
186186
}
187-
consumeBucketsAndMaybeBreak(totalBucketsToBuild);
188187
long[] bucketOrdsToBuild = new long[totalBucketsToBuild];
189188
int builtBucketIndex = 0;
190189
for (int ord = 0; ord < maxOrd; ord++) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,10 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
196196
for (List<InternalBucket> sameRangeList : bucketsMap.values()) {
197197
InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext);
198198
if(reducedBucket.docCount >= 1){
199-
reduceContext.consumeBucketsAndMaybeBreak(1);
200199
reducedBuckets.add(reducedBucket);
201-
} else {
202-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reducedBucket));
203200
}
204201
}
202+
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
205203
Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey));
206204

207205
InternalAdjacencyMatrix reduced = new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata());

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ protected void doPostCollection() throws IOException {
136136
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
137137
// Composite aggregator must be at the top of the aggregation tree
138138
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
139-
consumeBucketsAndMaybeBreak(queue.size());
140139
if (deferredCollectors != NO_OP_COLLECTOR) {
141140
// Replay all documents that contain at least one top bucket (collected during the first pass).
142141
runDeferredCollections();

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
178178
if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
179179
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
180180
buckets.clear();
181-
reduceContext.consumeBucketsAndMaybeBreak(1);
182181
result.add(reduceBucket);
183182
if (result.size() >= size) {
184183
break;
@@ -192,7 +191,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
192191
}
193192
if (buckets.size() > 0) {
194193
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
195-
reduceContext.consumeBucketsAndMaybeBreak(1);
196194
result.add(reduceBucket);
197195
}
198196

@@ -205,6 +203,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
205203
reducedFormats = lastBucket.formats;
206204
lastKey = lastBucket.getRawKey();
207205
}
206+
reduceContext.consumeBucketsAndMaybeBreak(result.size());
208207
return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls,
209208
earlyTerminated, metadata);
210209
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,24 +109,23 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
109109
InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
110110
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
111111
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
112-
consumeBucketsAndMaybeBreak(size);
113-
112+
114113
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
115114
InternalGeoGridBucket spare = null;
116115
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
117116
while (ordsEnum.next()) {
118117
if (spare == null) {
119118
spare = newEmptyBucket();
120119
}
121-
120+
122121
// need a special function to keep the source bucket
123122
// up-to-date so it can get the appropriate key
124123
spare.hashAsLong = ordsEnum.value();
125124
spare.docCount = bucketDocCount(ordsEnum.ord());
126125
spare.bucketOrd = ordsEnum.ord();
127126
spare = ordered.insertWithOverflow(spare);
128127
}
129-
128+
130129
topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()];
131130
for (int i = ordered.size() - 1; i >= 0; --i) {
132131
topBucketsPerOrd[ordIdx][i] = ordered.pop();

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,18 +100,14 @@ public InternalGeoGrid reduce(List<InternalAggregation> aggregations, ReduceCont
100100
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
101101
for (LongObjectPagedHashMap.Cursor<List<InternalGeoGridBucket>> cursor : buckets) {
102102
List<InternalGeoGridBucket> sameCellBuckets = cursor.value;
103-
InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
104-
if (removed != null) {
105-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
106-
} else {
107-
reduceContext.consumeBucketsAndMaybeBreak(1);
108-
}
103+
ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
109104
}
110105
buckets.close();
111106
InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
112107
for (int i = ordered.size() - 1; i >= 0; i--) {
113108
list[i] = ordered.pop();
114109
}
110+
reduceContext.consumeBucketsAndMaybeBreak(list.length);
115111
return create(getName(), requiredSize, Arrays.asList(list), getMetadata());
116112
}
117113

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
328328
if (reduceRounding.round(top.current.key) != key) {
329329
// the key changes, reduce what we already buffered and reset the buffer for current buckets
330330
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
331-
reduceContext.consumeBucketsAndMaybeBreak(1);
332331
reducedBuckets.add(reduced);
333332
currentBuckets.clear();
334333
key = reduceRounding.round(top.current.key);
@@ -348,7 +347,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
348347

349348
if (currentBuckets.isEmpty() == false) {
350349
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
351-
reduceContext.consumeBucketsAndMaybeBreak(1);
352350
reducedBuckets.add(reduced);
353351
}
354352
}
@@ -376,22 +374,17 @@ private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRo
376374
long roundedBucketKey = reduceRounding.round(bucket.key);
377375
if (Double.isNaN(key)) {
378376
key = roundedBucketKey;
379-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
380377
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
381378
} else if (roundedBucketKey == key) {
382-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
383379
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
384380
} else {
385-
reduceContext.consumeBucketsAndMaybeBreak(1);
386381
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
387382
sameKeyedBuckets.clear();
388383
key = roundedBucketKey;
389-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
390384
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
391385
}
392386
}
393387
if (sameKeyedBuckets.isEmpty() == false) {
394-
reduceContext.consumeBucketsAndMaybeBreak(1);
395388
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
396389
}
397390
reducedBuckets = mergedBuckets;
@@ -449,7 +442,6 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
449442
if (lastBucket != null) {
450443
long key = rounding.nextRoundingValue(lastBucket.key);
451444
while (key < nextBucket.key) {
452-
reduceContext.consumeBucketsAndMaybeBreak(1);
453445
iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs));
454446
key = rounding.nextRoundingValue(key);
455447
}
@@ -515,7 +507,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
515507
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
516508
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
517509
}
518-
510+
reduceContext.consumeBucketsAndMaybeBreak(reducedBucketsResult.buckets.size());
519511
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
520512
this.bucketInfo.emptySubAggregations);
521513

@@ -551,16 +543,13 @@ private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets,
551543
for (int i = 0; i < reducedBuckets.size(); i++) {
552544
Bucket bucket = reducedBuckets.get(i);
553545
if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) {
554-
reduceContext.consumeBucketsAndMaybeBreak(1);
555546
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
556547
sameKeyedBuckets.clear();
557548
key = roundingInfo.rounding.round(bucket.key);
558549
}
559-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
560550
sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
561551
}
562552
if (sameKeyedBuckets.isEmpty() == false) {
563-
reduceContext.consumeBucketsAndMaybeBreak(1);
564553
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
565554
}
566555
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);

0 commit comments

Comments
 (0)