From 5ab18c669ce81096b2febde5eeebf00a944cae45 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 20 May 2020 13:58:07 -0400 Subject: [PATCH 1/9] Increase search.max_buckets to 65,535 Increases the default search.max_buckets limit to 65,535, and only counts buckets during reduce phase. Closes #51731 --- docs/reference/aggregations/bucket.asciidoc | 2 +- .../MultiBucketConsumerService.java | 4 +-- .../bucket/BucketsAggregator.java | 22 ++---------- .../adjacency/AdjacencyMatrixAggregator.java | 1 - .../bucket/composite/CompositeAggregator.java | 1 - .../bucket/geogrid/GeoGridAggregator.java | 1 - ...balOrdinalsSignificantTermsAggregator.java | 3 -- .../GlobalOrdinalsStringTermsAggregator.java | 1 - .../bucket/terms/LongRareTermsAggregator.java | 2 -- .../bucket/terms/LongTermsAggregator.java | 3 -- .../terms/SignificantLongTermsAggregator.java | 5 +-- .../SignificantStringTermsAggregator.java | 3 -- .../terms/SignificantTextAggregator.java | 3 -- .../terms/StringRareTermsAggregator.java | 2 -- .../bucket/terms/StringTermsAggregator.java | 3 -- .../DateHistogramAggregatorTests.java | 12 ++++--- .../aggregations/AggregatorTestCase.java | 4 +-- .../xpack/sql/qa/cli/ErrorsTestCase.java | 6 ++-- .../xpack/sql/qa/jdbc/ErrorsTestCase.java | 4 +-- .../xpack/sql/qa/rest/RestSqlTestCase.java | 34 +++++++++---------- 20 files changed, 36 insertions(+), 80 deletions(-) diff --git a/docs/reference/aggregations/bucket.asciidoc b/docs/reference/aggregations/bucket.asciidoc index 30cb7d604ff4f..134c6a848a57d 100644 --- a/docs/reference/aggregations/bucket.asciidoc +++ b/docs/reference/aggregations/bucket.asciidoc @@ -15,7 +15,7 @@ define fixed number of multiple buckets, and others dynamically create the bucke NOTE: The maximum number of buckets allowed in a single response is limited by a dynamic cluster setting named -<>. It defaults to 10,000, +<>. It defaults to 65,535, requests that try to return more than the limit will fail with an exception. include::bucket/adjacency-matrix-aggregation.asciidoc[] diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java index a3a7e52167a7f..f29c6ef169d94 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java @@ -35,10 +35,10 @@ * An aggregation service that creates instances of {@link MultiBucketConsumer}. * The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created * in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}. - * The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000. + * The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 65535. */ public class MultiBucketConsumerService { - public static final int DEFAULT_MAX_BUCKETS = 10000; + public static final int DEFAULT_MAX_BUCKETS = 65535; public static final Setting MAX_BUCKET_SETTING = Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index dca1c94feab0f..50873c8b257cb 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -44,13 +44,11 @@ import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; -import java.util.function.IntConsumer; import java.util.function.ToLongFunction; public abstract class BucketsAggregator extends AggregatorBase { private final BigArrays bigArrays; - private final IntConsumer multiBucketConsumer; private IntArray docCounts; public BucketsAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, @@ -58,11 +56,6 @@ public BucketsAggregator(String name, AggregatorFactories factories, SearchConte super(name, factories, context, parent, metadata); bigArrays = context.bigArrays(); docCounts = bigArrays.newIntArray(1, true); - if (context.aggregations() != null) { - multiBucketConsumer = context.aggregations().multiBucketConsumer(); - } else { - multiBucketConsumer = (count) -> {}; - } } /** @@ -137,14 +130,6 @@ public final int bucketDocCount(long bucketOrd) { } } - /** - * Adds {@code count} buckets to the global count for the request and fails if this number is greater than - * the maximum number of buckets allowed in a response - */ - protected final void consumeBucketsAndMaybeBreak(int count) { - multiBucketConsumer.accept(count); - } - /** * Hook to allow taking an action before building buckets. */ @@ -186,7 +171,7 @@ public InternalAggregation get(int index) { public int size() { return aggregations.length; } - }); + }); } return result; } @@ -267,7 +252,6 @@ protected final void buildSubAggsForBuckets(List buckets, protected final InternalAggregation[] buildAggregationsForFixedBucketCount(long[] owningBucketOrds, int bucketsPerOwningBucketOrd, BucketBuilderForFixedCount bucketBuilder, Function, InternalAggregation> resultBuilder) throws IOException { int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd; - consumeBucketsAndMaybeBreak(totalBuckets); long[] bucketOrdsToCollect = new long[totalBuckets]; int bucketOrdIdx = 0; for (long owningBucketOrd : owningBucketOrds) { @@ -299,7 +283,7 @@ protected interface BucketBuilderForFixedCount { * @param owningBucketOrds owning bucket ordinals for which to build the results * @param resultBuilder how to build a result from the sub aggregation results */ - protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds, + protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds, SingleBucketResultBuilder resultBuilder) throws IOException { /* * It'd be entirely reasonable to call @@ -328,7 +312,6 @@ protected interface SingleBucketResultBuilder { protected final InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongHash bucketOrds, BucketBuilderForVariable bucketBuilder, Function, InternalAggregation> resultBuilder) throws IOException { assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; - consumeBucketsAndMaybeBreak((int) bucketOrds.size()); long[] bucketOrdsToCollect = new long[(int) bucketOrds.size()]; for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) { bucketOrdsToCollect[bucketOrd] = bucketOrd; @@ -360,7 +343,6 @@ protected final InternalAggregation[] buildAggregationsForVariableBuckets(lo throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]"); } - consumeBucketsAndMaybeBreak((int) totalOrdsToCollect); long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect]; int b = 0; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java index fed6916b2fcca..993085cba57f0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java @@ -184,7 +184,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I totalBucketsToBuild++; } } - consumeBucketsAndMaybeBreak(totalBucketsToBuild); long[] bucketOrdsToBuild = new long[totalBucketsToBuild]; int builtBucketIndex = 0; for (int ord = 0; ord < maxOrd; ord++) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java index ed8f2d20427ac..8cee8087a072a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -136,7 +136,6 @@ protected void doPostCollection() throws IOException { public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { // Composite aggregator must be at the top of the aggregation tree assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L; - consumeBucketsAndMaybeBreak(queue.size()); if (deferredCollectors != NO_OP_COLLECTOR) { // Replay all documents that contain at least one top bucket (collected during the first pass). runDeferredCollections(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java index 8b6e44c25360b..12f40c9b42622 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java @@ -110,7 +110,6 @@ public void collect(int doc, long bucket) throws IOException { public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; final int size = (int) Math.min(bucketOrds.size(), shardSize); - consumeBucketsAndMaybeBreak(size); BucketPriorityQueue ordered = new BucketPriorityQueue<>(size); InternalGeoGridBucket spare = null; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsSignificantTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsSignificantTermsAggregator.java index ec7a47937b4e2..b2ca4406b9775 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsSignificantTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsSignificantTermsAggregator.java @@ -136,9 +136,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I // global stats spare.updateScore(significanceHeuristic); spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()]; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 689f4f712d2b1..0805266cf60bd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -208,7 +208,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) { spare = ordered.insertWithOverflow(spare); if (spare == null) { - consumeBucketsAndMaybeBreak(1); spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java index fde74735c9e1a..52327d02ad9b7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java @@ -113,8 +113,6 @@ private List buildSketch() { LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(oldKey, docCount, null, format); bucket.bucketOrd = newBucketOrd; buckets.add(bucket); - - consumeBucketsAndMaybeBreak(1); } else { // Make a note when one of the ords has been deleted deletionCount += 1; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java index 109a318f67fd6..1c226a65bc4df 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java @@ -146,9 +146,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I spare.bucketOrd = ordsEnum.ord(); if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) { spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java index 0d21206515bec..34406e58df5be 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantLongTermsAggregator.java @@ -100,9 +100,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I spare.bucketOrd = ordsEnum.ord(); spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()]; @@ -110,7 +107,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I list[i] = ordered.pop(); } - buildSubAggsForBuckets(list, bucket -> bucket.bucketOrd, (bucket, aggs) -> bucket.aggregations = aggs); + buildSubAggsForBuckets(list, bucket -> bucket.bucketOrd, (bucket, aggs) -> bucket.aggregations = aggs); return new InternalAggregation[] { new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java index 64bd514409058..4aa28538cb490 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantStringTermsAggregator.java @@ -104,9 +104,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I spare.bucketOrd = i; spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()]; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java index 1882cf1c38e5a..87036bf69a987 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTextAggregator.java @@ -216,9 +216,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I spare.bucketOrd = i; spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()]; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java index 56c664f265848..d40399fc66499 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java @@ -119,8 +119,6 @@ private List buildSketch() { StringRareTerms.Bucket bucket = new StringRareTerms.Bucket(BytesRef.deepCopyOf(oldKey), docCount, null, format); bucket.bucketOrd = newBucketOrd; buckets.add(bucket); - - consumeBucketsAndMaybeBreak(1); } else { // Make a note when one of the ords has been deleted deletionCount += 1; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java index 84ada76cff83a..32c7e6cd43403 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java @@ -147,9 +147,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I spare.bucketOrd = i; if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) { spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 3dba9219ab182..ee8f4d836999a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -230,7 +230,7 @@ public void testAsSubAgg() throws IOException { assertThat(ak1adh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()), equalTo(List.of( "2020-01-01T00:00Z", "2021-01-01T00:00Z" ))); - + StringTerms.Bucket b = terms.getBucketByKey("b"); StringTerms bk1 = b.getAggregations().get("k1"); StringTerms.Bucket bk1a = bk1.getBucketByKey("a"); @@ -975,9 +975,10 @@ public void testMaxBucket() throws IOException { "2017-01-01T00:00:00.000Z" ); - expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps, + // Shouldn't throw until reduction + testSearchCase(query, timestamps, aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); + histogram -> {}, 2, false); expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), @@ -1007,9 +1008,10 @@ public void testMaxBucketDeprecated() throws IOException { "2017-01-01T00:00:00.000Z" ); - expectThrows(TooManyBucketsException.class, () -> testSearchCase(query, timestamps, + // Shouldn't throw until reduction + testSearchCase(query, timestamps, aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); + histogram -> {}, 2, false); expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 8ca7278b6e7b6..2ed8d95313993 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -426,7 +426,6 @@ protected A search(IndexSe a.postCollection(); @SuppressWarnings("unchecked") A result = (A) a.buildTopLevel(); - InternalAggregationTestCase.assertMultiBucketConsumer(result, bucketConsumer); return result; } @@ -495,9 +494,8 @@ protected A searchAndReduc a.preCollection(); subSearcher.search(weight, a); a.postCollection(); - InternalAggregation agg = a.buildTopLevel(); + InternalAggregation agg = a.buildTopLevel(); aggs.add(agg); - InternalAggregationTestCase.assertMultiBucketConsumer(agg, shardBucketConsumer); } if (aggs.isEmpty()) { return (A) root.buildEmptyAggregation(); diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java index 6e2ebc87dbe94..3405439331390 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/ErrorsTestCase.java @@ -58,7 +58,7 @@ public void testSelectColumnFromEmptyIndex() throws Exception { Request request = new Request("PUT", "/test"); request.setJsonEntity("{}"); client().performRequest(request); - + assertFoundOneProblem(command("SELECT abc FROM test")); assertEquals("line 1:8: Unknown column [abc]" + END, readLine()); } @@ -115,8 +115,8 @@ public void testSelectScoreInScalar() throws Exception { @Override public void testHardLimitForSortOnAggregate() throws Exception { index("test", body -> body.field("a", 1).field("b", 2)); - String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000"); - assertEquals(START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [10000], received [12000]" + END, + String commandResult = command("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000"); + assertEquals(START + "Bad request [[3;33;22mThe maximum LIMIT for aggregate sorting is [65535], received [120000]" + END, commandResult); } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java index 9f7eca0c90aeb..3637a2478602a 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/ErrorsTestCase.java @@ -140,8 +140,8 @@ public void testHardLimitForSortOnAggregate() throws Exception { index("test", body -> body.field("a", 1).field("b", 2)); try (Connection c = esJdbc()) { SQLException e = expectThrows(SQLException.class, () -> - c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000").executeQuery()); - assertEquals("The maximum LIMIT for aggregate sorting is [10000], received [12000]", e.getMessage()); + c.prepareStatement("SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000").executeQuery()); + assertEquals("The maximum LIMIT for aggregate sorting is [65535], received [120000]", e.getMessage()); } } } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index 4c34be00b254d..857eccf39557d 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -55,7 +55,7 @@ * user rather than to the JDBC driver or CLI. */ public abstract class RestSqlTestCase extends BaseRestSqlTestCase implements ErrorsTestCase { - + public static String SQL_QUERY_REST_ENDPOINT = org.elasticsearch.xpack.sql.proto.Protocol.SQL_QUERY_REST_ENDPOINT; private static String SQL_TRANSLATE_REST_ENDPOINT = org.elasticsearch.xpack.sql.proto.Protocol.SQL_TRANSLATE_REST_ENDPOINT; /** @@ -78,7 +78,7 @@ public void testBasicQuery() throws IOException { Map expected = new HashMap<>(); String mode = randomMode(); boolean columnar = randomBoolean(); - + expected.put("columns", singletonList(columnInfo(mode, "test", "text", JDBCType.VARCHAR, Integer.MAX_VALUE))); if (columnar) { expected.put("values", singletonList(Arrays.asList("test", "test"))); @@ -99,7 +99,7 @@ public void testNextPage() throws IOException { } request.setJsonEntity(bulk.toString()); client().performRequest(request); - + boolean columnar = randomBoolean(); String sqlRequest = query( "SELECT text, number, SQRT(number) AS s, SCORE()" @@ -126,7 +126,7 @@ public void testNextPage() throws IOException { columnInfo(mode, "s", "double", JDBCType.DOUBLE, 25), columnInfo(mode, "SCORE()", "float", JDBCType.REAL, 15))); } - + if (columnar) { expected.put("values", Arrays.asList( Arrays.asList("text" + i, "text" + (i + 1)), @@ -257,7 +257,7 @@ public void testScoreWithFieldNamedScore() throws IOException { } else { expected.put("rows", singletonList(Arrays.asList("test", 10, value))); } - + assertResponse(expected, runSql(mode, "SELECT *, SCORE() FROM test ORDER BY SCORE()", columnar)); assertResponse(expected, runSql(mode, "SELECT name, \\\"score\\\", SCORE() FROM test ORDER BY SCORE()", columnar)); } @@ -395,14 +395,14 @@ public void testSelectScoreInScalar() throws Exception { @Override public void testHardLimitForSortOnAggregate() throws Exception { index("{\"a\": 1, \"b\": 2}"); - expectBadRequest(() -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 12000"), - containsString("The maximum LIMIT for aggregate sorting is [10000], received [12000]")); + expectBadRequest(() -> runSql(randomMode(), "SELECT max(a) max FROM test GROUP BY b ORDER BY max LIMIT 120000"), + containsString("The maximum LIMIT for aggregate sorting is [65535], received [120000]")); } public void testUseColumnarForUnsupportedFormats() throws Exception { String format = randomFrom("txt", "csv", "tsv"); index("{\"foo\":1}"); - + Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT); request.addParameter("error_trace", "true"); request.addParameter("pretty", "true"); @@ -415,7 +415,7 @@ public void testUseColumnarForUnsupportedFormats() throws Exception { return Collections.emptyMap(); }, containsString("Invalid use of [columnar] argument: cannot be used in combination with txt, csv or tsv formats")); } - + public void testUseColumnarForTranslateRequest() throws IOException { index("{\"test\":\"test\"}", "{\"test\":\"test\"}"); @@ -453,7 +453,7 @@ public static void expectBadRequest(CheckedSupplier, Excepti private Map runSql(String mode, String sql) throws IOException { return runSql(mode, sql, StringUtils.EMPTY, randomBoolean()); } - + private Map runSql(String mode, String sql, boolean columnar) throws IOException { return runSql(mode, sql, StringUtils.EMPTY, columnar); } @@ -463,7 +463,7 @@ private Map runSql(String mode, String sql, String suffix, boole return runSql(new StringEntity(query(sql).mode(mode).columnar(columnarValue(columnar)).toString(), ContentType.APPLICATION_JSON), suffix, mode); } - + protected Map runTranslateSql(String sql) throws IOException { return runSql(new StringEntity(sql, ContentType.APPLICATION_JSON), "/translate/", Mode.PLAIN.toString()); } @@ -528,7 +528,7 @@ public void testPrettyPrintingEnabled() throws IOException { } executeAndAssertPrettyPrinting(expected, "true", columnar); } - + public void testPrettyPrintingDisabled() throws IOException { boolean columnar = randomBoolean(); String expected = ""; @@ -539,7 +539,7 @@ public void testPrettyPrintingDisabled() throws IOException { } executeAndAssertPrettyPrinting(expected, randomFrom("false", null), columnar); } - + private void executeAndAssertPrettyPrinting(String expectedJson, String prettyParameter, boolean columnar) throws IOException { index("{\"test1\":\"test1\"}", @@ -611,7 +611,7 @@ public void testBasicQueryWithParameters() throws IOException { } else { expected.put("rows", Arrays.asList(Arrays.asList("foo", 10))); } - + String params = mode.equals("jdbc") ? "{\"type\": \"integer\", \"value\": 10}, {\"type\": \"keyword\", \"value\": \"foo\"}" : "10, \"foo\""; assertResponse(expected, runSql( @@ -816,7 +816,7 @@ public void testQueryWithoutHeaderInCSV() throws IOException { Tuple response = runSqlAsText(query, "text/csv; header=absent"); assertEquals(expected, response.v1()); } - + public void testNextPageCSV() throws IOException { executeQueryWithNextPage("text/csv; header=present", "text,number,sum\r\n", "%s,%d,%d\r\n"); } @@ -838,11 +838,11 @@ public void testQueryInTSV() throws IOException { response = runSqlAsTextFormat(query, "tsv"); assertEquals(expected, response.v1()); } - + public void testNextPageTSV() throws IOException { executeQueryWithNextPage("text/tab-separated-values", "text\tnumber\tsum\n", "%s\t%d\t%d\n"); } - + private void executeQueryWithNextPage(String format, String expectedHeader, String expectedLineFormat) throws IOException { int size = 20; String[] docs = new String[size]; From 10418478cf415fb9df7697bb8f07cd4c89e061d0 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 28 May 2020 13:45:12 -0400 Subject: [PATCH 2/9] Remove unnecessary checks form reduce --- .../aggregations/bucket/BucketsAggregator.java | 6 ++++++ .../bucket/adjacency/InternalAdjacencyMatrix.java | 4 +--- .../bucket/composite/InternalComposite.java | 3 +-- .../bucket/geogrid/InternalGeoGrid.java | 8 ++------ .../bucket/histogram/InternalAutoDateHistogram.java | 13 +------------ .../bucket/histogram/InternalDateHistogram.java | 11 +---------- .../bucket/histogram/InternalHistogram.java | 11 +---------- .../histogram/AutoDateHistogramAggregatorTests.java | 8 ++++++++ .../bucket/range/DateRangeAggregatorTests.java | 9 +++++++++ .../test/InternalAggregationTestCase.java | 8 ++++---- 10 files changed, 34 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 50873c8b257cb..0da97e995e9c6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.search.aggregations.bucket; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.IntArray; @@ -50,11 +51,13 @@ public abstract class BucketsAggregator extends AggregatorBase { private final BigArrays bigArrays; private IntArray docCounts; + private final CircuitBreaker breaker; public BucketsAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, Map metadata) throws IOException { super(name, factories, context, parent, metadata); bigArrays = context.bigArrays(); + breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); docCounts = bigArrays.newIntArray(1, true); } @@ -84,6 +87,9 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long * Same as {@link #collectBucket(LeafBucketCollector, int, long)}, but doesn't check if the docCounts needs to be re-sized. */ public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException { + if (doc == 1) { + breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + } docCounts.increment(bucketOrd, 1); subCollector.collect(doc, bucketOrd); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index deba1a10c1ea4..6a9d51c3ef32f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -196,12 +196,10 @@ public InternalAggregation reduce(List aggregations, Reduce for (List sameRangeList : bucketsMap.values()) { InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext); if(reducedBucket.docCount >= 1){ - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reducedBucket); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reducedBucket)); } } + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey)); InternalAdjacencyMatrix reduced = new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index 9f6a1da9c8a49..bb9e7f3b6d49d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -178,7 +178,6 @@ public InternalAggregation reduce(List aggregations, Reduce if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) { InternalBucket reduceBucket = reduceBucket(buckets, reduceContext); buckets.clear(); - reduceContext.consumeBucketsAndMaybeBreak(1); result.add(reduceBucket); if (result.size() >= size) { break; @@ -192,7 +191,6 @@ public InternalAggregation reduce(List aggregations, Reduce } if (buckets.size() > 0) { InternalBucket reduceBucket = reduceBucket(buckets, reduceContext); - reduceContext.consumeBucketsAndMaybeBreak(1); result.add(reduceBucket); } @@ -205,6 +203,7 @@ public InternalAggregation reduce(List aggregations, Reduce reducedFormats = lastBucket.formats; lastKey = lastBucket.getRawKey(); } + reduceContext.consumeBucketsAndMaybeBreak(result.size()); return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls, earlyTerminated, metadata); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java index c967787ed67f2..a1bd27d1f2994 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java @@ -100,18 +100,14 @@ public InternalGeoGrid reduce(List aggregations, ReduceCont BucketPriorityQueue ordered = new BucketPriorityQueue<>(size); for (LongObjectPagedHashMap.Cursor> cursor : buckets) { List sameCellBuckets = cursor.value; - InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext)); - if (removed != null) { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); - } else { - reduceContext.consumeBucketsAndMaybeBreak(1); - } + ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext)); } buckets.close(); InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()]; for (int i = ordered.size() - 1; i >= 0; i--) { list[i] = ordered.pop(); } + reduceContext.consumeBucketsAndMaybeBreak(list.length); return create(getName(), requiredSize, Arrays.asList(list), getMetadata()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index 7465ae27de70a..630e4ced5f8b0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -328,7 +328,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (reduceRounding.round(top.current.key) != key) { // the key changes, reduce what we already buffered and reset the buffer for current buckets final Bucket reduced = reduceBucket(currentBuckets, reduceContext); - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); currentBuckets.clear(); key = reduceRounding.round(top.current.key); @@ -348,7 +347,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (currentBuckets.isEmpty() == false) { final Bucket reduced = reduceBucket(currentBuckets, reduceContext); - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); } } @@ -376,22 +374,17 @@ private List mergeBuckets(List reducedBuckets, Rounding reduceRo long roundedBucketKey = reduceRounding.round(bucket.key); if (Double.isNaN(key)) { key = roundedBucketKey; - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); } else if (roundedBucketKey == key) { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); } else { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); sameKeyedBuckets.clear(); key = roundedBucketKey; - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations)); } } if (sameKeyedBuckets.isEmpty() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); } reducedBuckets = mergedBuckets; @@ -449,7 +442,6 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red if (lastBucket != null) { long key = rounding.nextRoundingValue(lastBucket.key); while (key < nextBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs)); key = rounding.nextRoundingValue(key); } @@ -515,7 +507,7 @@ public InternalAggregation reduce(List aggregations, Reduce // Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext); } - + reduceContext.consumeBucketsAndMaybeBreak(reducedBucketsResult.buckets.size()); BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx, this.bucketInfo.emptySubAggregations); @@ -551,16 +543,13 @@ private BucketReduceResult mergeConsecutiveBuckets(List reducedBuckets, for (int i = 0; i < reducedBuckets.size(); i++) { Bucket bucket = reducedBuckets.get(i); if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); sameKeyedBuckets.clear(); key = roundingInfo.rounding.round(bucket.key); } - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1); sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations)); } if (sameKeyedBuckets.isEmpty() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext)); } return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 0513fcafd912d..7efb5d630ae97 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -329,10 +329,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { // the key changes, reduce what we already buffered and reset the buffer for current buckets final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } currentBuckets.clear(); key = top.current.key; @@ -353,10 +350,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (currentBuckets.isEmpty() == false) { final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } } } @@ -396,7 +390,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = bounds.getMin() + offset; long max = bounds.getMax() + offset; while (key <= max) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -406,7 +399,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = bounds.getMin() + offset; if (key < firstBucket.key) { while (key < firstBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -422,7 +414,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { if (lastBucket != null) { long key = nextKey(lastBucket.key).longValue(); while (key < nextBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -436,7 +427,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { long key = nextKey(lastBucket.key).longValue(); long max = bounds.getMax() + offset; while (key <= max) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key).longValue(); } @@ -462,6 +452,7 @@ public InternalAggregation reduce(List aggregations, Reduce CollectionUtil.introSort(reducedBuckets, order.comparator()); } } + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo, format, keyed, getMetadata()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index f525eecce88e5..a3eaedaca7d71 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -313,10 +313,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { // Using Double.compare instead of != to handle NaN correctly. final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } currentBuckets.clear(); key = top.current.key; @@ -337,10 +334,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (currentBuckets.isEmpty() == false) { final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { - reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); - } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced)); } } } @@ -380,7 +374,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { if (iter.hasNext() == false) { // fill with empty buckets for (double key = round(emptyBucketInfo.minBound); key <= emptyBucketInfo.maxBound; key = nextKey(key)) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); } } else { @@ -388,7 +381,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { if (Double.isFinite(emptyBucketInfo.minBound)) { // fill with empty buckets until the first key for (double key = round(emptyBucketInfo.minBound); key < first.key; key = nextKey(key)) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); } } @@ -401,7 +393,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { if (lastBucket != null) { double key = nextKey(lastBucket.key); while (key < nextBucket.key) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); key = nextKey(key); } @@ -412,7 +403,6 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { // finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user) for (double key = nextKey(lastBucket.key); key <= emptyBucketInfo.maxBound; key = nextKey(key)) { - reduceContext.consumeBucketsAndMaybeBreak(1); iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); } } @@ -437,6 +427,7 @@ public InternalAggregation reduce(List aggregations, Reduce CollectionUtil.introSort(reducedBuckets, order.comparator()); } } + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata()); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index f6e28f7615fbc..8e2239d18ed3d 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; @@ -869,4 +870,11 @@ private void indexSampleData(List dataset, RandomIndexWriter inde i += 1; } } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java index 2fb281170b732..14f9d9871b411 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/DateRangeAggregatorTests.java @@ -36,7 +36,9 @@ import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import java.io.IOException; @@ -304,4 +306,11 @@ private void testCase(DateRangeAggregationBuilder aggregationBuilder, } } } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index ff690d54da885..b88efc41ef823 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -82,15 +82,15 @@ import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler; import org.elasticsearch.search.aggregations.bucket.sampler.ParsedSampler; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms; -import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; From 625c6dd3affba1593b1462c7c0c5a93cd6a583c0 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 28 May 2020 14:38:19 -0400 Subject: [PATCH 3/9] Suppress number of buckets check in some aggregations --- .../geogrid/GeoGridAggregatorTestCase.java | 9 +++ .../DateHistogramAggregatorTests.java | 67 ------------------- .../InternalAutoDateHistogramTests.java | 11 ++- .../test/InternalAggregationTestCase.java | 8 ++- 4 files changed, 26 insertions(+), 69 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java index 28b3c8d4eef37..6f2882b9a03b8 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java @@ -33,9 +33,11 @@ import org.elasticsearch.common.geo.GeoUtils; import org.elasticsearch.index.mapper.GeoPointFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; import org.elasticsearch.search.aggregations.support.ValuesSourceType; @@ -253,4 +255,11 @@ private void testCase(Query query, int precision, GeoBoundingBox geoBoundingBox, indexReader.close(); directory.close(); } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index ee8f4d836999a..13a9e33096cf0 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -967,73 +967,6 @@ public void testMinDocCount() throws IOException { ); } - public void testMaxBucket() throws IOException { - Query query = new MatchAllDocsQuery(); - List timestamps = Arrays.asList( - "2010-01-01T00:00:00.000Z", - "2011-01-01T00:00:00.000Z", - "2017-01-01T00:00:00.000Z" - ); - - // Shouldn't throw until reduction - testSearchCase(query, timestamps, - aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.fixedInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE).minDocCount(0L), - histogram -> {}, 100, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> - aggregation.fixedInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - .subAggregation( - AggregationBuilders.dateHistogram("1") - .fixedInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - ), - histogram -> {}, 5, false)); - } - - public void testMaxBucketDeprecated() throws IOException { - Query query = new MatchAllDocsQuery(); - List timestamps = Arrays.asList( - "2010-01-01T00:00:00.000Z", - "2011-01-01T00:00:00.000Z", - "2017-01-01T00:00:00.000Z" - ); - - // Shouldn't throw until reduction - testSearchCase(query, timestamps, - aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE), - histogram -> {}, 2, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(AGGREGABLE_DATE).minDocCount(0L), - histogram -> {}, 100, false)); - - expectThrows(TooManyBucketsException.class, () -> testSearchAndReduceCase(query, timestamps, - aggregation -> - aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - .subAggregation( - AggregationBuilders.dateHistogram("1") - .dateHistogramInterval(DateHistogramInterval.seconds(5)) - .field(AGGREGABLE_DATE) - ), - histogram -> {}, 5, false)); - assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future."); - } - public void testFixedWithCalendar() throws IOException { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testSearchCase(new MatchAllDocsQuery(), Arrays.asList( diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java index 4e4076fbeadc0..f3b512a53c109 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java @@ -23,8 +23,10 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo; import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram.BucketInfo; @@ -147,7 +149,7 @@ protected void assertReduced(InternalAutoDateHistogram reduced, List= 0; j--) { @@ -391,4 +393,11 @@ public void testCreateWithReplacementBuckets() { assertThat(copy.getFormatter(), equalTo(orig.getFormatter())); assertThat(copy.getInterval(), equalTo(orig.getInterval())); } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index b88efc41ef823..ca62062bca3a7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -44,6 +44,7 @@ import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder; @@ -387,10 +388,15 @@ public void testReduceRandom() throws IOException { bigArrays, mockScriptService, bucketConsumer, PipelineTree.EMPTY); @SuppressWarnings("unchecked") T reduced = (T) inputs.get(0).reduce(toReduce, context); - assertMultiBucketConsumer(reduced, bucketConsumer); + doAssertReducedMultiBucketConsumer(reduced, bucketConsumer); assertReduced(reduced, inputs); } + protected void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + InternalAggregationTestCase.assertMultiBucketConsumer(agg, bucketConsumer); + } + + /** * overwrite in tests that need it */ From 8252b039d9c103182925ec4747ed1dc4a1631e85 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 28 May 2020 14:50:39 -0400 Subject: [PATCH 4/9] Cleanup after merge --- .../aggregations/bucket/terms/NumericTermsAggregator.java | 3 --- .../bucket/histogram/DateHistogramAggregatorTests.java | 2 -- 2 files changed, 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 17b536941790a..27d47e3b7cec0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -172,9 +172,6 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws if (bucketCountThresholds.getShardMinDocCount() <= docCount) { updateBucket(spare, ordsEnum, docCount); spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 13a9e33096cf0..84b1ef2352e86 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -38,10 +38,8 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.search.aggregations.AggregationBuilder; -import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.MultiBucketConsumerService.TooManyBucketsException; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; From eed40609db22529c8412ca4a55b18fdf1d30018b Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 28 May 2020 16:49:11 -0400 Subject: [PATCH 5/9] Suppress bucket count checks in more tests --- .../bucket/histogram/InternalAutoDateHistogramTests.java | 9 --------- .../test/InternalMultiBucketAggregationTestCase.java | 8 ++++++++ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java index f3b512a53c109..2dffec48b2ff4 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java @@ -23,10 +23,8 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo; import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram.BucketInfo; @@ -393,11 +391,4 @@ public void testCreateWithReplacementBuckets() { assertThat(copy.getFormatter(), equalTo(orig.getFormatter())); assertThat(copy.getInterval(), equalTo(orig.getInterval())); } - - @Override - public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { - /* - * No-op. - */ - } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java index c8d1fbee2f03b..ec8204b362aed 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java @@ -24,6 +24,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; @@ -184,4 +185,11 @@ protected void assertBucket(MultiBucketsAggregation.Bucket expected, MultiBucket } } } + + @Override + public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { + /* + * No-op. + */ + } } From 8ec8048999f200ad15dd28f841c671247819bac2 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Fri, 29 May 2020 15:51:38 -0400 Subject: [PATCH 6/9] Fix counter in BucketsAggregator --- .../search/aggregations/bucket/BucketsAggregator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 0da97e995e9c6..733101fb117c8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -87,10 +87,9 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long * Same as {@link #collectBucket(LeafBucketCollector, int, long)}, but doesn't check if the docCounts needs to be re-sized. */ public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException { - if (doc == 1) { + if (docCounts.increment(bucketOrd, 1) == 1) { breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); } - docCounts.increment(bucketOrd, 1); subCollector.collect(doc, bucketOrd); } From 39e034aa51a65c376c90ecd35f3c7296d4ebe470 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 1 Jun 2020 14:33:18 -0400 Subject: [PATCH 7/9] Make sure that we still call breaker occasionally --- .../MultiBucketConsumerService.java | 19 +++++++++++-------- .../bucket/BucketsAggregator.java | 12 ++++++++---- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java index f29c6ef169d94..9cbc4eeab9c9a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java @@ -102,6 +102,7 @@ public static class MultiBucketConsumer implements IntConsumer { // aggregations execute in a single thread so no atomic here private int count; + private int callCount = 0; public MultiBucketConsumer(int limit, CircuitBreaker breaker) { this.limit = limit; @@ -110,15 +111,17 @@ public MultiBucketConsumer(int limit, CircuitBreaker breaker) { @Override public void accept(int value) { - count += value; - if (count > limit) { - throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit - + "] but was [" + count + "]. This limit can be set by changing the [" + - MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); + if (value != 0) { + count += value; + if (count > limit) { + throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit + + "] but was [" + count + "]. This limit can be set by changing the [" + + MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit); + } } - - // check parent circuit breaker every 1024 buckets - if (value > 0 && (count & 0x3FF) == 0) { + // check parent circuit breaker every 1024 calls + callCount++; + if ((callCount & 0x3FF) == 0) { breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 733101fb117c8..fb73f50384a70 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.search.aggregations.bucket; -import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.IntArray; @@ -45,19 +44,24 @@ import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.IntConsumer; import java.util.function.ToLongFunction; public abstract class BucketsAggregator extends AggregatorBase { private final BigArrays bigArrays; + private final IntConsumer multiBucketConsumer; private IntArray docCounts; - private final CircuitBreaker breaker; public BucketsAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, Map metadata) throws IOException { super(name, factories, context, parent, metadata); bigArrays = context.bigArrays(); - breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + if (context.aggregations() != null) { + multiBucketConsumer = context.aggregations().multiBucketConsumer(); + } else { + multiBucketConsumer = (count) -> {}; + } docCounts = bigArrays.newIntArray(1, true); } @@ -88,7 +92,7 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long */ public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException { if (docCounts.increment(bucketOrd, 1) == 1) { - breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets"); + multiBucketConsumer.accept(0); } subCollector.collect(doc, bucketOrd); } From 887092555b86b07df43f8c36d1c6b3f8af92ab17 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 1 Jun 2020 15:07:10 -0400 Subject: [PATCH 8/9] Fix HierarchyCircuitBreakerServiceTests --- .../indices/breaker/HierarchyCircuitBreakerServiceTests.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java index 02afb98cf9e7f..8a73b7d7aeaab 100644 --- a/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerServiceTests.java @@ -323,7 +323,10 @@ public void testAllocationBucketsBreaker() { // make sure used bytes is greater than the total circuit breaker limit breaker.addWithoutBreaking(200); - + // make sure that we check on the the following call + for (int i = 0; i < 1023; i++) { + multiBucketConsumer.accept(0); + } CircuitBreakingException exception = expectThrows(CircuitBreakingException.class, () -> multiBucketConsumer.accept(1024)); assertThat(exception.getMessage(), containsString("[parent] Data too large, data for [allocated_buckets] would be")); From 6bddc558fd3eb1dc17289f6da7937bf1a224e0c9 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 3 Jun 2020 10:00:04 -0400 Subject: [PATCH 9/9] Add comment on collectExistingBucket --- .../search/aggregations/bucket/BucketsAggregator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index fb73f50384a70..2f088b98e1d04 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -92,6 +92,9 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long */ public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException { if (docCounts.increment(bucketOrd, 1) == 1) { + // We calculate the final number of buckets only during the reduce phase. But we still need to + // trigger bucket consumer from time to time in order to give it a chance to check available memory and break + // the execution if we are running out. To achieve that we are passing 0 as a bucket count. multiBucketConsumer.accept(0); } subCollector.collect(doc, bucketOrd);