diff --git a/docs/reference/aggregations/bucket.asciidoc b/docs/reference/aggregations/bucket.asciidoc index 30cb7d604ff4f..134c6a848a57d 100644 --- a/docs/reference/aggregations/bucket.asciidoc +++ b/docs/reference/aggregations/bucket.asciidoc @@ -15,7 +15,7 @@ define fixed number of multiple buckets, and others dynamically create the bucke NOTE: The maximum number of buckets allowed in a single response is limited by a dynamic cluster setting named -<>. It defaults to 10,000, +<>. It defaults to 65,535, requests that try to return more than the limit will fail with an exception. include::bucket/adjacency-matrix-aggregation.asciidoc[] diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java index a3a7e52167a7f..9cbc4eeab9c9a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java @@ -35,10 +35,10 @@ * An aggregation service that creates instances of {@link MultiBucketConsumer}. * The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created * in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}. - * The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000. + * The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 65535. */ public class MultiBucketConsumerService { - public static final int DEFAULT_MAX_BUCKETS = 10000; + public static final int DEFAULT_MAX_BUCKETS = 65535; public static final Setting MAX_BUCKET_SETTING = Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); @@ -102,6 +102,7 @@ public static class MultiBucketConsumer implements IntConsumer { // aggregations execute in a single thread so no atomic here private int count; + private int callCount = 0; public MultiBucketConsumer(int limit, CircuitBreaker breaker) { this.limit = limit; @@ -110,15 +111,17 @@ public MultiBucketConsumer(int limit, CircuitBreaker breaker) { @Override public void accept(int value) { - count += value; - if (count > limit) { - throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit - + "] but was [" + count + "]. This limit can be set by changing the [" + - MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); + if (value != 0) { + count += value; + if (count > limit) { + throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit + + "] but was [" + count + "]. This limit can be set by changing the [" + + MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); + } } - - // check parent circuit breaker every 1024 buckets - if (value > 0 && (count & 0x3FF) == 0) { + // check parent circuit breaker every 1024 calls + callCount++; + if ((callCount & 0x3FF) == 0) { breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index dca1c94feab0f..2f088b98e1d04 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -57,12 +57,12 @@ public BucketsAggregator(String name, AggregatorFactories factories, SearchConte Map metadata) throws IOException { super(name, factories, context, parent, metadata); bigArrays = context.bigArrays(); - docCounts = bigArrays.newIntArray(1, true); if (context.aggregations() != null) { multiBucketConsumer = context.aggregations().multiBucketConsumer(); } else { multiBucketConsumer = (count) -> {}; } + docCounts = bigArrays.newIntArray(1, true); } /** @@ -91,7 +91,12 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long * Same as {@link #collectBucket(LeafBucketCollector, int, long)}, but doesn't check if the docCounts needs to be re-sized. */ public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException { - docCounts.increment(bucketOrd, 1); + if (docCounts.increment(bucketOrd, 1) == 1) { + // We calculate the final number of buckets only during the reduce phase. But we still need to + // trigger bucket consumer from time to time in order to give it a chance to check available memory and break + // the execution if we are running out. To achieve that we are passing 0 as a bucket count. + multiBucketConsumer.accept(0); + } subCollector.collect(doc, bucketOrd); } @@ -137,14 +142,6 @@ public final int bucketDocCount(long bucketOrd) { } } - /** - * Adds {@code count} buckets to the global count for the request and fails if this number is greater than - * the maximum number of buckets allowed in a response - */ - protected final void consumeBucketsAndMaybeBreak(int count) { - multiBucketConsumer.accept(count); - } - /** * Hook to allow taking an action before building buckets. */ @@ -186,7 +183,7 @@ public InternalAggregation get(int index) { public int size() { return aggregations.length; } - }); + }); } return result; } @@ -267,7 +264,6 @@ protected final void buildSubAggsForBuckets(List buckets, protected final InternalAggregation[] buildAggregationsForFixedBucketCount(long[] owningBucketOrds, int bucketsPerOwningBucketOrd, BucketBuilderForFixedCount bucketBuilder, Function, InternalAggregation> resultBuilder) throws IOException { int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd; - consumeBucketsAndMaybeBreak(totalBuckets); long[] bucketOrdsToCollect = new long[totalBuckets]; int bucketOrdIdx = 0; for (long owningBucketOrd : owningBucketOrds) { @@ -299,7 +295,7 @@ protected interface BucketBuilderForFixedCount { * @param owningBucketOrds owning bucket ordinals for which to build the results * @param resultBuilder how to build a result from the sub aggregation results */ - protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds, + protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds, SingleBucketResultBuilder resultBuilder) throws IOException { /* * It'd be entirely reasonable to call @@ -328,7 +324,6 @@ protected interface SingleBucketResultBuilder { protected final InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongHash bucketOrds, BucketBuilderForVariable bucketBuilder, Function, InternalAggregation> resultBuilder) throws IOException { assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; - consumeBucketsAndMaybeBreak((int) bucketOrds.size()); long[] bucketOrdsToCollect = new long[(int) bucketOrds.size()]; for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) { bucketOrdsToCollect[bucketOrd] = bucketOrd; @@ -360,7 +355,6 @@ protected final InternalAggregation[] buildAggregationsForVariableBuckets(lo throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]"); } - consumeBucketsAndMaybeBreak((int) totalOrdsToCollect); long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect]; int b = 0; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java index fed6916b2fcca..993085cba57f0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java @@ -184,7 +184,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I totalBucketsToBuild++; } } - consumeBucketsAndMaybeBreak(totalBucketsToBuild); long[] bucketOrdsToBuild = new long[totalBucketsToBuild]; int builtBucketIndex = 0; for (int ord = 0; ord < maxOrd; ord++) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index deba1a10c1ea4..6a9d51c3ef32f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -196,12 +196,10 @@ public InternalAggregation reduce(List aggregations, Reduce for (List sameRangeList : bucketsMap.values()) { InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext); if(reducedBucket.docCount >= 1){ - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reducedBucket); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reducedBucket)); } } + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey)); InternalAdjacencyMatrix reduced = new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java index ed8f2d20427ac..8cee8087a072a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -136,7 +136,6 @@ protected void doPostCollection() throws IOException { public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { // Composite aggregator must be at the top of the aggregation tree assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L; - consumeBucketsAndMaybeBreak(queue.size()); if (deferredCollectors != NO_OP_COLLECTOR) { // Replay all documents that contain at least one top bucket (collected during the first pass). runDeferredCollections(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index 9f6a1da9c8a49..bb9e7f3b6d49d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -178,7 +178,6 @@ public InternalAggregation reduce(List aggregations, Reduce if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) { InternalBucket reduceBucket = reduceBucket(buckets, reduceContext); buckets.clear(); - reduceContext.consumeBucketsAndMaybeBreak(1); result.add(reduceBucket); if (result.size() >= size) { break; @@ -192,7 +191,6 @@ public InternalAggregation reduce(List aggregations, Reduce } if (buckets.size() > 0) { InternalBucket reduceBucket = reduceBucket(buckets, reduceContext); - reduceContext.consumeBucketsAndMaybeBreak(1); result.add(reduceBucket); } @@ -205,6 +203,7 @@ public InternalAggregation reduce(List aggregations, Reduce reducedFormats = lastBucket.formats; lastKey = lastBucket.getRawKey(); } + reduceContext.consumeBucketsAndMaybeBreak(result.size()); return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls, earlyTerminated, metadata); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java index f44ec9263adc7..16ebd6a7db338 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java @@ -109,8 +109,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize); - consumeBucketsAndMaybeBreak(size); - + BucketPriorityQueue ordered = new BucketPriorityQueue<>(size); InternalGeoGridBucket spare = null; LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); @@ -118,7 +117,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I if (spare == null) { spare = newEmptyBucket(); } - + // need a special function to keep the source bucket // up-to-date so it can get the appropriate key spare.hashAsLong = ordsEnum.value(); @@ -126,7 +125,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I spare.bucketOrd = ordsEnum.ord(); spare = ordered.insertWithOverflow(spare); } - + topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()]; for (int i = ordered.size() - 1; i >= 0; --i) { topBucketsPerOrd[ordIdx][i] = ordered.pop(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java index c967787ed67f2..a1bd27d1f2994 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java @@ -100,18 +100,14 @@ public InternalGeoGrid reduce(List aggregations, ReduceCont BucketPriorityQueue ordered = new BucketPriorityQueue<>(size); for (LongObjectPagedHashMap.Cursor> cursor : buckets) { List sameCellBuckets = cursor.value; - InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext)); - if (removed != null) { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); - } else { - reduceContext.consumeBucketsAndMaybeBreak(1); - } + ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext)); } buckets.close(); InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()]; for (int i = ordered.size() - 1; i >= 0; i--) { list[i] = ordered.pop(); } + reduceContext.consumeBucketsAndMaybeBreak(list.length); return create(getName(), requiredSize, Arrays.asList(list), getMetadata()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index 7465ae27de70a..630e4ced5f8b0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -328,7 +328,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (reduceRounding.round(top.current.key) != key) { // the key changes, reduce what we already buffered and reset the buffer for current buckets final Bucket reduced = reduceBucket(currentBuckets, reduceContext); - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); currentBuckets.clear(); key = reduceRounding.round(top.current.key); @@ -348,7 +347,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (currentBuckets.isEmpty() == false) { final Bucket reduced = reduceBucket(currentBuckets, reduceContext); - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); } } @@ -376,22 +374,17 @@ private List mergeBuckets(List reducedBuckets, Rounding reduceRo long roundedBucketKey = reduceRounding.round(bucket.key); if (Double.isNaN(key)) { key = roundedBucketKey; - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); } else if (roundedBucketKey == key) { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); } else { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); sameKeyedBuckets.clear(); key = roundedBucketKey; - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); } } if (sameKeyedBuckets.isEmpty() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); } reducedBuckets = mergedBuckets; @@ -449,7 +442,6 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red if (lastBucket != null) { long key = rounding.nextRoundingValue(lastBucket.key); while (key < nextBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs)); key = rounding.nextRoundingValue(key); } @@ -515,7 +507,7 @@ public InternalAggregation reduce(List aggregations, Reduce // Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext); } - + reduceContext.consumeBucketsAndMaybeBreak(reducedBucketsResult.buckets.size()); BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx, this.bucketInfo.emptySubAggregations); @@ -551,16 +543,13 @@ private BucketReduceResult mergeConsecutiveBuckets(List reducedBuckets, for (int i = 0; i < reducedBuckets.size(); i++) { Bucket bucket = reducedBuckets.get(i); if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); sameKeyedBuckets.clear(); key = roundingInfo.rounding.round(bucket.key); } - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations)); } if (sameKeyedBuckets.isEmpty() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); } return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 0513fcafd912d..7efb5d630ae97 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -329,10 +329,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { // the key changes, reduce what we already buffered and reset the buffer for current buckets final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } currentBuckets.clear(); key = top.current.key; @@ -353,10 +350,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (currentBuckets.isEmpty() == false) { final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } } } @@ -396,7 +390,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = bounds.getMin() + offset; long max = bounds.getMax() + offset; while (key <= max) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -406,7 +399,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = bounds.getMin() + offset; if (key < firstBucket.key) { while (key < firstBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -422,7 +414,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { if (lastBucket != null) { long key = nextKey(lastBucket.key).longValue(); while (key < nextBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -436,7 +427,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = nextKey(lastBucket.key).longValue(); long max = bounds.getMax() + offset; while (key <= max) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -462,6 +452,7 @@ public InternalAggregation reduce(List aggregations, Reduce CollectionUtil.introSort(reducedBuckets, order.comparator()); } } + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo, format, keyed, getMetadata()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index f525eecce88e5..a3eaedaca7d71 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -313,10 +313,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { // Using Double.compare instead of != to handle NaN correctly. final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } currentBuckets.clear(); key = top.current.key; @@ -337,10 +334,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (currentBuckets.isEmpty() == false) { final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } } } @@ -380,7 +374,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { if (iter.hasNext() == false) { // fill with empty buckets for (double key = round(emptyBucketInfo.minBound); key <= emptyBucketInfo.maxBound; key = nextKey(key)) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); } } else { @@ -388,7 +381,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { if (Double.isFinite(emptyBucketInfo.minBound)) { // fill with empty buckets until the first key for (double key = round(emptyBucketInfo.minBound); key < first.key; key = nextKey(key)) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); } } @@ -401,7 +393,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { if (lastBucket != null) { double key = nextKey(lastBucket.key); while (key < nextBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key); } @@ -412,7 +403,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { // finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user) for (double key = nextKey(lastBucket.key); key <= emptyBucketInfo.maxBound; key = nextKey(key)) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); } } @@ -437,6 +427,7 @@ public InternalAggregation reduce(List aggregations, Reduce CollectionUtil.introSort(reducedBuckets, order.comparator()); } } + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 34875393085bd..bd5d37e846070 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -392,7 +392,7 @@ abstract class CollectionStrategy implements Releasable { * Iterate all of the buckets. Implementations take into account * the {@link BucketCountThresholds}. In particular, * if the {@link BucketCountThresholds#getMinDocCount()} is 0 then - * they'll make sure to iterate a bucket even if it was never + * they'll make sure to iterate a bucket even if it was never * {{@link #collectGlobalOrd(int, long, LeafBucketCollector) collected}. * If {@link BucketCountThresholds#getMinDocCount()} is not 0 then * they'll skip all global ords that weren't collected. @@ -500,7 +500,7 @@ void forEach(BucketInfoConsumer consumer) throws IOException { } } } - + @Override public void close() { @@ -543,9 +543,6 @@ public void accept(long globalOrd, long bucketOrd, long docCount) throws IOExcep } updateBucket(spare, globalOrd, bucketOrd, docCount); spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } } }); @@ -653,7 +650,7 @@ void updateBucket(OrdBucket spare, long globalOrd, long bucketOrd, long docCount spare.docCount = docCount; otherDocCount += docCount; } - + @Override PriorityQueue buildPriorityQueue(int size) { return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java index fde74735c9e1a..52327d02ad9b7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java @@ -113,8 +113,6 @@ private List buildSketch() { LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(oldKey, docCount, null, format); bucket.bucketOrd = newBucketOrd; buckets.add(bucket); - - consumeBucketsAndMaybeBreak(1); } else { // Make a note when one of the ords has been deleted deletionCount += 1; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 76a4d0d2e16d4..07d85c2c242b2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -172,9 +172,6 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws if (bucketCountThresholds.getShardMinDocCount() <= docCount) { updateBucket(spare, ordsEnum, docCount); spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java index 64bd514409058..4aa28538cb490 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java @@ -104,9 +104,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I spare.bucketOrd = i; spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()]; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java index 1882cf1c38e5a..87036bf69a987 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java @@ -216,9 +216,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I spare.bucketOrd = i; spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()]; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java index 56c664f265848..d40399fc66499 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java @@ -119,8 +119,6 @@ private List buildSketch() { StringRareTerms.Bucket bucket = new StringRareTerms.Bucket(BytesRef.deepCopyOf(oldKey), docCount, null, format); bucket.bucketOrd = newBucketOrd; buckets.add(bucket); - - consumeBucketsAndMaybeBreak(1); } else { // Make a note when one of the ords has been deleted deletionCount += 1; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java index 3b64a6979977b..5f26476294852 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java @@ -147,9 +147,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I spare.bucketOrd = i; if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) { spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } } diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index 02afb98cf9e7f..8a73b7d7aeaab 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -323,7 +323,10 @@ public void testAllocationBucketsBreaker() { // make sure used bytes is greater than the total circuit breaker limit breaker.addWithoutBreaking(200); - + // make sure that we check on the the following call + for (int i = 0; i < 1023; i++) { + multiBucketConsumer.accept(0); + } CircuitBreakingException exception = expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(1024)); assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [allocated_buckets] would be")); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java index e1dd49b40bf31..f5833e9f0036a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java @@ -36,9 +36,11 @@ import org.elasticsearch.common.geo.GeoUtils; import org.elasticsearch.index.mapper.GeoPointFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; @@ -304,4 +306,11 @@ private void testCase(Query query, int precision, GeoBoundingBox geoBoundingBox, indexReader.close(); directory.close(); } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index f6e28f7615fbc..8e2239d18ed3d 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; @@ -869,4 +870,11 @@ private void indexSampleData(List dataset, RandomIndexWriter inde i += 1; } } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index a605d92cc90b3..84b1ef2352e86 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -38,10 +38,8 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.search.aggregations.AggregationBuilder; -import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.MultiBucketConsumerService.TooManyBucketsException; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; @@ -967,71 +965,6 @@ public void testMinDocCount() throws IOException { ); } - public void testMaxBucket() throws IOException { - Query query = new MatchAllDocsQuery(); - List timestamps = Arrays.asList( - "2010-01-01T00:00:00.000Z", - "2011-01-01T00:00:00.000Z", - "2017-01-01T00:00:00.000Z" - ); - - expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps, - aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE).minDocCount(0L), - histogram -> {}, 100, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> - aggregation.fixedInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - .subAggregation( - AggregationBuilders.dateHistogram("1") - .fixedInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - ), - histogram -> {}, 5, false)); - } - - public void testMaxBucketDeprecated() throws IOException { - Query query = new MatchAllDocsQuery(); - List timestamps = Arrays.asList( - "2010-01-01T00:00:00.000Z", - "2011-01-01T00:00:00.000Z", - "2017-01-01T00:00:00.000Z" - ); - - expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps, - aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE).minDocCount(0L), - histogram -> {}, 100, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> - aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - .subAggregation( - AggregationBuilders.dateHistogram("1") - .dateHistogramInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - ), - histogram -> {}, 5, false)); - assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future."); - } - public void testFixedWithCalendar() throws IOException { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testSearchCase(new MatchAllDocsQuery(), Arrays.asList( diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java index 4e4076fbeadc0..2dffec48b2ff4 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java @@ -147,7 +147,7 @@ protected void assertReduced(InternalAutoDateHistogram reduced, List= 0; j--) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java index 2fb281170b732..14f9d9871b411 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java @@ -36,7 +36,9 @@ import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import java.io.IOException; @@ -304,4 +306,11 @@ private void testCase(DateRangeAggregationBuilder aggregationBuilder, } } } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index a1218f6c4c998..13b6b81ef3150 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -422,7 +422,6 @@ protected A search(IndexSe a.postCollection(); @SuppressWarnings("unchecked") A result = (A) a.buildTopLevel(); - InternalAggregationTestCase.assertMultiBucketConsumer(result, bucketConsumer); return result; } @@ -493,7 +492,6 @@ protected A searchAndReduc a.postCollection(); InternalAggregation agg = a.buildTopLevel(); aggs.add(agg); - InternalAggregationTestCase.assertMultiBucketConsumer(agg, shardBucketConsumer); } if (aggs.isEmpty()) { return (A) root.buildEmptyAggregation(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index ff690d54da885..ca62062bca3a7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -44,6 +44,7 @@ import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder; @@ -82,15 +83,15 @@ import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler; import org.elasticsearch.search.aggregations.bucket.sampler.ParsedSampler; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms; -import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; @@ -387,10 +388,15 @@ public void testReduceRandom() throws IOException { bigArrays, mockScriptService, bucketConsumer, PipelineTree.EMPTY); @SuppressWarnings("unchecked") T reduced = (T) inputs.get(0).reduce(toReduce, context); - assertMultiBucketConsumer(reduced, bucketConsumer); + doAssertReducedMultiBucketConsumer(reduced, bucketConsumer); assertReduced(reduced, inputs); } + protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + InternalAggregationTestCase.assertMultiBucketConsumer(agg, bucketConsumer); + } + + /** * overwrite in tests that need it */ diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java index c8d1fbee2f03b..ec8204b362aed 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java @@ -24,6 +24,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; @@ -184,4 +185,11 @@ protected void assertBucket(MultiBucketsAggregation.Bucket expected, MultiBucket } } } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/JdbcErrorsTestCase.java b/x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/JdbcErrorsTestCase.java index b885dedcfec80..1790c0fc49598 100644 --- a/x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/JdbcErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/JdbcErrorsTestCase.java @@ -135,9 +135,9 @@ public void testHardLimitForSortOnAggregate() throws IOException, SQLException { try (Connection c = esJdbc()) { SQLException e = expectThrows( SQLException.class, - () -> c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000").executeQuery() + () -> c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000").executeQuery() ); - assertEquals("The maximum LIMIT for aggregate sorting is [10000], received [12000]", e.getMessage()); + assertEquals("The maximum LIMIT for aggregate sorting is [65535], received [120000]", e.getMessage()); } } } diff --git a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java index 5cae19fc69bdc..5fb92b197412d 100644 --- a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java @@ -117,9 +117,9 @@ public void testSelectScoreInScalar() throws Exception { @Override public void testHardLimitForSortOnAggregate() throws Exception { index("test", body -> body.field("a", 1).field("b", 2)); - String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000"); + String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000"); assertEquals( - START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [10000], received [12000]" + END, + START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [65535], received [120000]" + END, commandResult ); } diff --git a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index 0bfadf56812a9..04dc94cae73fb 100644 --- a/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -439,8 +439,8 @@ public void testSelectScoreInScalar() throws Exception { public void testHardLimitForSortOnAggregate() throws Exception { index("{\"a\": 1, \"b\": 2}"); expectBadRequest( - () -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000"), - containsString("The maximum LIMIT for aggregate sorting is [10000], received [12000]") + () -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000"), + containsString("The maximum LIMIT for aggregate sorting is [65535], received [120000]") ); }