From 7337104d4438a386c13d651c7f8af86e600e2e80 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 31 Dec 2018 13:58:54 +0100 Subject: [PATCH 1/3] Remove single shard optimization when suggesting shard_size When executing terms aggregations we set the shard_size, meaning the number of buckets to collect on each shard, to a value that's higher than the number of requested buckets, to guarantee some basic level of precision. We have an optimization in place so that we leave shard_size set to size whenever we are searching against a single shard, in which case maximum precision is guaranteed by definition. Such optimization requires us access to the total number of shards that the search is executing against. In the context of cross-cluster search, once we will introduce multiple reduction steps (one per cluster) each cluster will only know the number of local shards, which is problematic as we can only optimize if we are searching against a single shard in a single cluster. While discussing how to address the CCS scenario, we decided that we do not want to introduce further complexity caused by this single shard optimization that benefits only a minority of cases, especially when the benefits are not so huge. This commit removes the single shard optimization, meaning that we will always have heuristic enabled on how many number of buckets to collect on the shards, even when searching against a single shard. --- .../search/aggregations/bucket/BucketUtils.java | 8 +------- .../bucket/geogrid/GeoGridAggregationBuilder.java | 2 +- .../SignificantTermsAggregatorFactory.java | 3 +-- .../SignificantTextAggregatorFactory.java | 3 +-- .../bucket/terms/TermsAggregatorFactory.java | 3 +-- .../aggregations/bucket/BucketUtilsTests.java | 15 +++------------ 6 files changed, 8 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java index 17b50fa9bef5f..823a2b1e43422 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java @@ -31,18 +31,12 @@ private BucketUtils() {} * * @param finalSize * The number of terms required in the final reduce phase. - * @param singleShard - * whether a single shard is being queried, or multiple shards * @return A suggested default for the size of any shard-side PriorityQueues */ - public static int suggestShardSideQueueSize(int finalSize, boolean singleShard) { + public static int suggestShardSideQueueSize(int finalSize) { if (finalSize < 1) { throw new IllegalArgumentException("size must be positive, got " + finalSize); } - if (singleShard) { - // In the case of a single shard, we do not need to over-request - return finalSize; - } // Request 50% more buckets on the shards in order to improve accuracy // as well as a small constant that should help with small values of 'size' final long shardSampleSize = (long) (finalSize * 1.5 + 10); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java index 353f391f213d6..38469ff875365 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java @@ -157,7 +157,7 @@ public int shardSize() { if (shardSize < 0) { // Use default heuristic to avoid any wrong-ranking caused by // distributed counting - shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards() == 1); + shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize); } if (requiredSize <= 0 || shardSize <= 0) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java index 7fe41407af4ca..09fd5877344f6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java @@ -195,8 +195,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare // each shard and as // such are impossible to differentiate from non-significant terms // at that early stage. - bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(), - context.numberOfShards() == 1)); + bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize())); } if (valuesSource instanceof ValuesSource.Bytes) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java index a51a33defdd00..92a136960395d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java @@ -175,8 +175,7 @@ protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingl // we want to find have only one occurrence on each shard and as // such are impossible to differentiate from non-significant terms // at that early stage. - bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(), - context.numberOfShards() == 1)); + bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize())); } // TODO - need to check with mapping that this is indeed a text field.... diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 25f552075dead..1ff0efd3e8307 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -121,8 +121,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare // The user has not made a shardSize selection. Use default // heuristic to avoid any wrong-ranking caused by distributed // counting - bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(), - context.numberOfShards() == 1)); + bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize())); } bucketCountThresholds.ensureValidity(); if (valuesSource instanceof ValuesSource.Bytes) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketUtilsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketUtilsTests.java index 35f3175f7cfe5..756bc14a498ed 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketUtilsTests.java @@ -27,22 +27,14 @@ public class BucketUtilsTests extends ESTestCase { public void testBadInput() { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> BucketUtils.suggestShardSideQueueSize(0, randomBoolean())); + () -> BucketUtils.suggestShardSideQueueSize(0)); assertEquals(e.getMessage(), "size must be positive, got 0"); } - public void testOptimizesSingleShard() { - for (int iter = 0; iter < 10; ++iter) { - final int size = randomIntBetween(1, Integer.MAX_VALUE); - assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, true)); - } - } - public void testOverFlow() { for (int iter = 0; iter < 10; ++iter) { final int size = Integer.MAX_VALUE - randomInt(10); - final int numberOfShards = randomIntBetween(1, 10); - final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1); + final int shardSize = BucketUtils.suggestShardSideQueueSize( size); assertThat(shardSize, greaterThanOrEqualTo(shardSize)); } } @@ -50,8 +42,7 @@ public void testOverFlow() { public void testShardSizeIsGreaterThanGlobalSize() { for (int iter = 0; iter < 10; ++iter) { final int size = randomIntBetween(1, Integer.MAX_VALUE); - final int numberOfShards = randomIntBetween(1, 10); - final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1); + final int shardSize = BucketUtils.suggestShardSideQueueSize( size); assertThat(shardSize, greaterThanOrEqualTo(size)); } } From a795b3845ea9f382f727c8e4e1c453b98e0f87dd Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 2 Jan 2019 12:15:14 +0100 Subject: [PATCH 2/3] fix TermsShardMinDocCountIT --- .../bucket/TermsShardMinDocCountIT.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsShardMinDocCountIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsShardMinDocCountIT.java index 0f685ded62c1c..a3311db11350f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsShardMinDocCountIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsShardMinDocCountIT.java @@ -23,11 +23,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms; import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregatorFactory; import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.test.ESIntegTestCase; import java.util.ArrayList; @@ -35,17 +35,18 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.elasticsearch.search.aggregations.AggregationBuilders.filter; import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; public class TermsShardMinDocCountIT extends ESIntegTestCase { private static final String index = "someindex"; private static final String type = "testtype"; - public String randomExecutionHint() { + + private static String randomExecutionHint() { return randomBoolean() ? null : randomFrom(SignificantTermsAggregatorFactory.ExecutionMode.values()).toString(); } @@ -74,7 +75,7 @@ public void testShardMinDocCountSignificantTermsTest() throws Exception { SearchResponse response = client().prepareSearch(index) .addAggregation( (filter("inclass", QueryBuilders.termQuery("class", true))) - .subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).size(2) + .subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).size(2).shardSize(2) .executionHint(randomExecutionHint())) ) .get(); @@ -87,16 +88,14 @@ public void testShardMinDocCountSignificantTermsTest() throws Exception { response = client().prepareSearch(index) .addAggregation( (filter("inclass", QueryBuilders.termQuery("class", true))) - .subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2) - .shardMinDocCount(2).size(2) - .executionHint(randomExecutionHint())) + .subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).shardSize(2) + .shardMinDocCount(2).size(2).executionHint(randomExecutionHint())) ) .get(); assertSearchResponse(response); filteredBucket = response.getAggregations().get("inclass"); sigterms = filteredBucket.getAggregations().get("mySignificantTerms"); assertThat(sigterms.getBuckets().size(), equalTo(2)); - } private void addTermsDocs(String term, int numInClass, int numNotInClass, List builders) { @@ -133,19 +132,18 @@ public void testShardMinDocCountTermsTest() throws Exception { // first, check that indeed when not setting the shardMinDocCount parameter 0 terms are returned SearchResponse response = client().prepareSearch(index) .addAggregation( - terms("myTerms").field("text").minDocCount(2).size(2).executionHint(randomExecutionHint()) - .order(BucketOrder.key(true)) + terms("myTerms").field("text").minDocCount(2).size(2).shardSize(2).executionHint(randomExecutionHint()) + .order(BucketOrder.key(true)) ) .get(); assertSearchResponse(response); Terms sigterms = response.getAggregations().get("myTerms"); assertThat(sigterms.getBuckets().size(), equalTo(0)); - response = client().prepareSearch(index) .addAggregation( - terms("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).executionHint(randomExecutionHint()) - .order(BucketOrder.key(true)) + terms("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).shardSize(2) + .executionHint(randomExecutionHint()).order(BucketOrder.key(true)) ) .get(); assertSearchResponse(response); @@ -154,11 +152,10 @@ public void testShardMinDocCountTermsTest() throws Exception { } - private void addTermsDocs(String term, int numDocs, List builders) { + private static void addTermsDocs(String term, int numDocs, List builders) { String sourceClass = "{\"text\": \"" + term + "\"}"; for (int i = 0; i < numDocs; i++) { builders.add(client().prepareIndex(index, type).setSource(sourceClass, XContentType.JSON)); } - } } From affffe897dfa89e7d2972ab4f1da3cb6977cda58 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 2 Jan 2019 14:00:21 +0100 Subject: [PATCH 3/3] update docs --- .../aggregations/bucket/significantterms-aggregation.asciidoc | 2 +- .../aggregations/bucket/significanttext-aggregation.asciidoc | 2 +- docs/reference/aggregations/bucket/terms-aggregation.asciidoc | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/reference/aggregations/bucket/significantterms-aggregation.asciidoc b/docs/reference/aggregations/bucket/significantterms-aggregation.asciidoc index 0a8a46a0b67b6..bfaeecc1f82d3 100644 --- a/docs/reference/aggregations/bucket/significantterms-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/significantterms-aggregation.asciidoc @@ -448,7 +448,7 @@ If the number of unique terms is greater than `size`, the returned list can be s size buckets was not returned). To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard -using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter +(`2 * (size * 1.5 + 10)`). To take manual control of this setting the `shard_size` parameter can be used to control the volumes of candidate terms produced by each shard. Low-frequency terms can turn out to be the most interesting ones once all results are combined so the diff --git a/docs/reference/aggregations/bucket/significanttext-aggregation.asciidoc b/docs/reference/aggregations/bucket/significanttext-aggregation.asciidoc index a541eb0ac14db..429c822d3623d 100644 --- a/docs/reference/aggregations/bucket/significanttext-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/significanttext-aggregation.asciidoc @@ -364,7 +364,7 @@ If the number of unique terms is greater than `size`, the returned list can be s size buckets was not returned). To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard -using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter +(`2 * (size * 1.5 + 10)`). To take manual control of this setting the `shard_size` parameter can be used to control the volumes of candidate terms produced by each shard. Low-frequency terms can turn out to be the most interesting ones once all results are combined so the diff --git a/docs/reference/aggregations/bucket/terms-aggregation.asciidoc b/docs/reference/aggregations/bucket/terms-aggregation.asciidoc index 1562bf41074e0..188b2ed3774c0 100644 --- a/docs/reference/aggregations/bucket/terms-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/terms-aggregation.asciidoc @@ -220,8 +220,7 @@ NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sens override it and reset it to be equal to `size`. -The default `shard_size` will be `size` if the search request needs to go to a single shard, and `(size * 1.5 + 10)` -otherwise. +The default `shard_size` is `(size * 1.5 + 10)`. ==== Calculating Document Count Error