Skip to content

Increase search.max_buckets to 65,535 #57042

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/aggregations/bucket.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ define fixed number of multiple buckets, and others dynamically create the bucke

NOTE: The maximum number of buckets allowed in a single response is limited by a
dynamic cluster setting named
<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 10,000,
<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 65,535,
requests that try to return more than the limit will fail with an exception.

include::bucket/adjacency-matrix-aggregation.asciidoc[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
* An aggregation service that creates instances of {@link MultiBucketConsumer}.
* The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
* in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 65535.
*/
public class MultiBucketConsumerService {
public static final int DEFAULT_MAX_BUCKETS = 10000;
public static final int DEFAULT_MAX_BUCKETS = 65535;
public static final Setting<Integer> MAX_BUCKET_SETTING =
Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.IntArray;
Expand All @@ -44,25 +45,20 @@
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.ToLongFunction;

public abstract class BucketsAggregator extends AggregatorBase {

private final BigArrays bigArrays;
private final IntConsumer multiBucketConsumer;
private IntArray docCounts;
private final CircuitBreaker breaker;

public BucketsAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
Map<String, Object> metadata) throws IOException {
super(name, factories, context, parent, metadata);
bigArrays = context.bigArrays();
breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
docCounts = bigArrays.newIntArray(1, true);
if (context.aggregations() != null) {
multiBucketConsumer = context.aggregations().multiBucketConsumer();
} else {
multiBucketConsumer = (count) -> {};
}
}

/**
Expand Down Expand Up @@ -91,6 +87,9 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long
* Same as {@link #collectBucket(LeafBucketCollector, int, long)}, but doesn't check if the docCounts needs to be re-sized.
*/
public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException {
if (doc == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think this isn't quite right. doc is the doc ID, so that could be all over the place, and also all going into the same bucket. I think there are two options here:

  1. Down below, we do if (docCounts.increment(bucketOrd, 1) == 1) { <breaker stuff> } which I think will work because the increment method returns the count after incrementing. So if we have a doc count of 1, it's the first doc and a new bucket so we can account it

  2. Alternatively, we could just account for it up in collectBucket without a conditional, since theoretically that should only be called on new buckets. It's not guaranteed by the API but in practice that's how aggs use it.

There are two other issues we need to address though:

  1. The old breaker logic only checked every 1024 buckets, since checking the real-memory breaker has a certain amount of overhead. So we should re-implement that somehow
  2. Trickier situation which I didn't think about when suggesting BucketsAggregator... if we add the 1024 threshold back, it's only a local count so aggs with 1023 buckets will never trigger the breaker even if the overall query has millions of buckets.

Perhaps we continue to use the MultiBucketConsumer service thing, but move the breaker accounting to a different method? That way it could maintain the global count and BucketsAggregator just calls a method on it or something? Not sure, we can discuss more offline

breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}
docCounts.increment(bucketOrd, 1);
subCollector.collect(doc, bucketOrd);
}
Expand Down Expand Up @@ -137,14 +136,6 @@ public final int bucketDocCount(long bucketOrd) {
}
}

/**
* Adds {@code count} buckets to the global count for the request and fails if this number is greater than
* the maximum number of buckets allowed in a response
*/
protected final void consumeBucketsAndMaybeBreak(int count) {
multiBucketConsumer.accept(count);
}

/**
* Hook to allow taking an action before building buckets.
*/
Expand Down Expand Up @@ -186,7 +177,7 @@ public InternalAggregation get(int index) {
public int size() {
return aggregations.length;
}
});
});
}
return result;
}
Expand Down Expand Up @@ -267,7 +258,6 @@ protected final <B> void buildSubAggsForBuckets(List<B> buckets,
protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(long[] owningBucketOrds, int bucketsPerOwningBucketOrd,
BucketBuilderForFixedCount<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
consumeBucketsAndMaybeBreak(totalBuckets);
long[] bucketOrdsToCollect = new long[totalBuckets];
int bucketOrdIdx = 0;
for (long owningBucketOrd : owningBucketOrds) {
Expand Down Expand Up @@ -299,7 +289,7 @@ protected interface BucketBuilderForFixedCount<B> {
* @param owningBucketOrds owning bucket ordinals for which to build the results
* @param resultBuilder how to build a result from the sub aggregation results
*/
protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds,
protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds,
SingleBucketResultBuilder resultBuilder) throws IOException {
/*
* It'd be entirely reasonable to call
Expand Down Expand Up @@ -328,7 +318,6 @@ protected interface SingleBucketResultBuilder {
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongHash bucketOrds,
BucketBuilderForVariable<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
long[] bucketOrdsToCollect = new long[(int) bucketOrds.size()];
for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
bucketOrdsToCollect[bucketOrd] = bucketOrd;
Expand Down Expand Up @@ -360,7 +349,6 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(lo
throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE
+ "] buckets but attempted [" + totalOrdsToCollect + "]");
}
consumeBucketsAndMaybeBreak((int) totalOrdsToCollect);
long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
int b = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
totalBucketsToBuild++;
}
}
consumeBucketsAndMaybeBreak(totalBucketsToBuild);
long[] bucketOrdsToBuild = new long[totalBucketsToBuild];
int builtBucketIndex = 0;
for (int ord = 0; ord < maxOrd; ord++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,10 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
for (List<InternalBucket> sameRangeList : bucketsMap.values()) {
InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext);
if(reducedBucket.docCount >= 1){
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reducedBucket);
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reducedBucket));
}
}
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey));

InternalAdjacencyMatrix reduced = new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ protected void doPostCollection() throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
// Composite aggregator must be at the top of the aggregation tree
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
consumeBucketsAndMaybeBreak(queue.size());
if (deferredCollectors != NO_OP_COLLECTOR) {
// Replay all documents that contain at least one top bucket (collected during the first pass).
runDeferredCollections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
buckets.clear();
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
if (result.size() >= size) {
break;
Expand All @@ -192,7 +191,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
}
if (buckets.size() > 0) {
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
}

Expand All @@ -205,6 +203,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
reducedFormats = lastBucket.formats;
lastKey = lastBucket.getRawKey();
}
reduceContext.consumeBucketsAndMaybeBreak(result.size());
return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls,
earlyTerminated, metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ public void collect(int doc, long bucket) throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
final int size = (int) Math.min(bucketOrds.size(), shardSize);
consumeBucketsAndMaybeBreak(size);

BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
InternalGeoGridBucket spare = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,14 @@ public InternalGeoGrid reduce(List<InternalAggregation> aggregations, ReduceCont
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
for (LongObjectPagedHashMap.Cursor<List<InternalGeoGridBucket>> cursor : buckets) {
List<InternalGeoGridBucket> sameCellBuckets = cursor.value;
InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
if (removed != null) {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
} else {
reduceContext.consumeBucketsAndMaybeBreak(1);
}
ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
}
buckets.close();
InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
reduceContext.consumeBucketsAndMaybeBreak(list.length);
return create(getName(), requiredSize, Arrays.asList(list), getMetadata());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
if (reduceRounding.round(top.current.key) != key) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
currentBuckets.clear();
key = reduceRounding.round(top.current.key);
Expand All @@ -348,7 +347,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {

if (currentBuckets.isEmpty() == false) {
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
}
}
Expand Down Expand Up @@ -376,22 +374,17 @@ private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRo
long roundedBucketKey = reduceRounding.round(bucket.key);
if (Double.isNaN(key)) {
key = roundedBucketKey;
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
} else if (roundedBucketKey == key) {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
} else {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
sameKeyedBuckets.clear();
key = roundedBucketKey;
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
}
}
if (sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
}
reducedBuckets = mergedBuckets;
Expand Down Expand Up @@ -449,7 +442,6 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
if (lastBucket != null) {
long key = rounding.nextRoundingValue(lastBucket.key);
while (key < nextBucket.key) {
reduceContext.consumeBucketsAndMaybeBreak(1);
iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs));
key = rounding.nextRoundingValue(key);
}
Expand Down Expand Up @@ -515,7 +507,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
}

reduceContext.consumeBucketsAndMaybeBreak(reducedBucketsResult.buckets.size());
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
this.bucketInfo.emptySubAggregations);

Expand Down Expand Up @@ -551,16 +543,13 @@ private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets,
for (int i = 0; i < reducedBuckets.size(); i++) {
Bucket bucket = reducedBuckets.get(i);
if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
sameKeyedBuckets.clear();
key = roundingInfo.rounding.round(bucket.key);
}
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
}
if (sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
}
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
}
currentBuckets.clear();
key = top.current.key;
Expand All @@ -353,10 +350,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
if (currentBuckets.isEmpty() == false) {
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
}
}
}
Expand Down Expand Up @@ -396,7 +390,6 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
long key = bounds.getMin() + offset;
long max = bounds.getMax() + offset;
while (key <= max) {
reduceContext.consumeBucketsAndMaybeBreak(1);
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
key = nextKey(key).longValue();
}
Expand All @@ -406,7 +399,6 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
long key = bounds.getMin() + offset;
if (key < firstBucket.key) {
while (key < firstBucket.key) {
reduceContext.consumeBucketsAndMaybeBreak(1);
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
key = nextKey(key).longValue();
}
Expand All @@ -422,7 +414,6 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
if (lastBucket != null) {
long key = nextKey(lastBucket.key).longValue();
while (key < nextBucket.key) {
reduceContext.consumeBucketsAndMaybeBreak(1);
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
key = nextKey(key).longValue();
}
Expand All @@ -436,7 +427,6 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
long key = nextKey(lastBucket.key).longValue();
long max = bounds.getMax() + offset;
while (key <= max) {
reduceContext.consumeBucketsAndMaybeBreak(1);
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
key = nextKey(key).longValue();
}
Expand All @@ -462,6 +452,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
CollectionUtil.introSort(reducedBuckets, order.comparator());
}
}
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,
format, keyed, getMetadata());
}
Expand Down
Loading