Skip to content

Commit 42ea644

Browse files
authored
Remove single shard optimization when suggesting shard_size (#37041)
When executing terms aggregations we set the shard_size, meaning the number of buckets to collect on each shard, to a value that's higher than the number of requested buckets, to guarantee some basic level of precision. We have an optimization in place so that we leave shard_size set to size whenever we are searching against a single shard, in which case maximum precision is guaranteed by definition. Such optimization requires us access to the total number of shards that the search is executing against. In the context of cross-cluster search, once we will introduce multiple reduction steps (one per cluster) each cluster will only know the number of local shards, which is problematic as we should only optimize if we are searching against a single shard in a single cluster. It could be that we are searching against one shard per cluster in which case the current code would optimize number of terms causing a loss of precision. While discussing how to address the CCS scenario, we decided that we do not want to introduce further complexity caused by this single shard optimization, as it benefits only a minority of cases, especially when the benefits are not so great. This commit removes the single shard optimization, meaning that we will always have heuristic enabled on how many number of buckets to collect on the shards, even when searching against a single shard. This will cause more buckets to be collected when searching against a single shard compared to before. If that becomes a problem for some users, they can work around that by setting the shard_size equal to the size. Relates to #32125
1 parent e0a677b commit 42ea644

File tree

10 files changed

+24
-46
lines changed

10 files changed

+24
-46
lines changed

docs/reference/aggregations/bucket/significantterms-aggregation.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ If the number of unique terms is greater than `size`, the returned list can be s
448448
size buckets was not returned).
449449

450450
To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard
451-
using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter
451+
(`2 * (size * 1.5 + 10)`). To take manual control of this setting the `shard_size` parameter
452452
can be used to control the volumes of candidate terms produced by each shard.
453453

454454
Low-frequency terms can turn out to be the most interesting ones once all results are combined so the

docs/reference/aggregations/bucket/significanttext-aggregation.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ If the number of unique terms is greater than `size`, the returned list can be s
364364
size buckets was not returned).
365365

366366
To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard
367-
using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter
367+
(`2 * (size * 1.5 + 10)`). To take manual control of this setting the `shard_size` parameter
368368
can be used to control the volumes of candidate terms produced by each shard.
369369

370370
Low-frequency terms can turn out to be the most interesting ones once all results are combined so the

docs/reference/aggregations/bucket/terms-aggregation.asciidoc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,7 @@ NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sens
220220
override it and reset it to be equal to `size`.
221221

222222

223-
The default `shard_size` will be `size` if the search request needs to go to a single shard, and `(size * 1.5 + 10)`
224-
otherwise.
223+
The default `shard_size` is `(size * 1.5 + 10)`.
225224

226225
==== Calculating Document Count Error
227226

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,12 @@ private BucketUtils() {}
3131
*
3232
* @param finalSize
3333
* The number of terms required in the final reduce phase.
34-
* @param singleShard
35-
* whether a single shard is being queried, or multiple shards
3634
* @return A suggested default for the size of any shard-side PriorityQueues
3735
*/
38-
public static int suggestShardSideQueueSize(int finalSize, boolean singleShard) {
36+
public static int suggestShardSideQueueSize(int finalSize) {
3937
if (finalSize < 1) {
4038
throw new IllegalArgumentException("size must be positive, got " + finalSize);
4139
}
42-
if (singleShard) {
43-
// In the case of a single shard, we do not need to over-request
44-
return finalSize;
45-
}
4640
// Request 50% more buckets on the shards in order to improve accuracy
4741
// as well as a small constant that should help with small values of 'size'
4842
final long shardSampleSize = (long) (finalSize * 1.5 + 10);

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public int shardSize() {
157157
if (shardSize < 0) {
158158
// Use default heuristic to avoid any wrong-ranking caused by
159159
// distributed counting
160-
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards() == 1);
160+
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize);
161161
}
162162

163163
if (requiredSize <= 0 || shardSize <= 0) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
195195
// each shard and as
196196
// such are impossible to differentiate from non-significant terms
197197
// at that early stage.
198-
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
199-
context.numberOfShards() == 1));
198+
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
200199
}
201200

202201
if (valuesSource instanceof ValuesSource.Bytes) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,7 @@ protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingl
175175
// we want to find have only one occurrence on each shard and as
176176
// such are impossible to differentiate from non-significant terms
177177
// at that early stage.
178-
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
179-
context.numberOfShards() == 1));
178+
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
180179
}
181180

182181
// TODO - need to check with mapping that this is indeed a text field....

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
121121
// The user has not made a shardSize selection. Use default
122122
// heuristic to avoid any wrong-ranking caused by distributed
123123
// counting
124-
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
125-
context.numberOfShards() == 1));
124+
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
126125
}
127126
bucketCountThresholds.ensureValidity();
128127
if (valuesSource instanceof ValuesSource.Bytes) {

server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketUtilsTests.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,31 +27,22 @@ public class BucketUtilsTests extends ESTestCase {
2727

2828
public void testBadInput() {
2929
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
30-
() -> BucketUtils.suggestShardSideQueueSize(0, randomBoolean()));
30+
() -> BucketUtils.suggestShardSideQueueSize(0));
3131
assertEquals(e.getMessage(), "size must be positive, got 0");
3232
}
3333

34-
public void testOptimizesSingleShard() {
35-
for (int iter = 0; iter < 10; ++iter) {
36-
final int size = randomIntBetween(1, Integer.MAX_VALUE);
37-
assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, true));
38-
}
39-
}
40-
4134
public void testOverFlow() {
4235
for (int iter = 0; iter < 10; ++iter) {
4336
final int size = Integer.MAX_VALUE - randomInt(10);
44-
final int numberOfShards = randomIntBetween(1, 10);
45-
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
37+
final int shardSize = BucketUtils.suggestShardSideQueueSize( size);
4638
assertThat(shardSize, greaterThanOrEqualTo(shardSize));
4739
}
4840
}
4941

5042
public void testShardSizeIsGreaterThanGlobalSize() {
5143
for (int iter = 0; iter < 10; ++iter) {
5244
final int size = randomIntBetween(1, Integer.MAX_VALUE);
53-
final int numberOfShards = randomIntBetween(1, 10);
54-
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
45+
final int shardSize = BucketUtils.suggestShardSideQueueSize( size);
5546
assertThat(shardSize, greaterThanOrEqualTo(size));
5647
}
5748
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsShardMinDocCountIT.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,30 @@
2323
import org.elasticsearch.common.settings.Settings;
2424
import org.elasticsearch.common.xcontent.XContentType;
2525
import org.elasticsearch.index.query.QueryBuilders;
26+
import org.elasticsearch.search.aggregations.BucketOrder;
2627
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
2728
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
2829
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregatorFactory;
2930
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
30-
import org.elasticsearch.search.aggregations.BucketOrder;
3131
import org.elasticsearch.test.ESIntegTestCase;
3232

3333
import java.util.ArrayList;
3434
import java.util.List;
3535

3636
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
3737
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
38-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
39-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
4038
import static org.elasticsearch.search.aggregations.AggregationBuilders.filter;
4139
import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms;
4240
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
41+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
42+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
4343
import static org.hamcrest.Matchers.equalTo;
4444

4545
public class TermsShardMinDocCountIT extends ESIntegTestCase {
4646
private static final String index = "someindex";
4747
private static final String type = "testtype";
48-
public String randomExecutionHint() {
48+
49+
private static String randomExecutionHint() {
4950
return randomBoolean() ? null : randomFrom(SignificantTermsAggregatorFactory.ExecutionMode.values()).toString();
5051
}
5152

@@ -74,7 +75,7 @@ public void testShardMinDocCountSignificantTermsTest() throws Exception {
7475
SearchResponse response = client().prepareSearch(index)
7576
.addAggregation(
7677
(filter("inclass", QueryBuilders.termQuery("class", true)))
77-
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).size(2)
78+
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).size(2).shardSize(2)
7879
.executionHint(randomExecutionHint()))
7980
)
8081
.get();
@@ -87,16 +88,14 @@ public void testShardMinDocCountSignificantTermsTest() throws Exception {
8788
response = client().prepareSearch(index)
8889
.addAggregation(
8990
(filter("inclass", QueryBuilders.termQuery("class", true)))
90-
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2)
91-
.shardMinDocCount(2).size(2)
92-
.executionHint(randomExecutionHint()))
91+
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).shardSize(2)
92+
.shardMinDocCount(2).size(2).executionHint(randomExecutionHint()))
9393
)
9494
.get();
9595
assertSearchResponse(response);
9696
filteredBucket = response.getAggregations().get("inclass");
9797
sigterms = filteredBucket.getAggregations().get("mySignificantTerms");
9898
assertThat(sigterms.getBuckets().size(), equalTo(2));
99-
10099
}
101100

102101
private void addTermsDocs(String term, int numInClass, int numNotInClass, List<IndexRequestBuilder> builders) {
@@ -133,19 +132,18 @@ public void testShardMinDocCountTermsTest() throws Exception {
133132
// first, check that indeed when not setting the shardMinDocCount parameter 0 terms are returned
134133
SearchResponse response = client().prepareSearch(index)
135134
.addAggregation(
136-
terms("myTerms").field("text").minDocCount(2).size(2).executionHint(randomExecutionHint())
137-
.order(BucketOrder.key(true))
135+
terms("myTerms").field("text").minDocCount(2).size(2).shardSize(2).executionHint(randomExecutionHint())
136+
.order(BucketOrder.key(true))
138137
)
139138
.get();
140139
assertSearchResponse(response);
141140
Terms sigterms = response.getAggregations().get("myTerms");
142141
assertThat(sigterms.getBuckets().size(), equalTo(0));
143142

144-
145143
response = client().prepareSearch(index)
146144
.addAggregation(
147-
terms("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).executionHint(randomExecutionHint())
148-
.order(BucketOrder.key(true))
145+
terms("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).shardSize(2)
146+
.executionHint(randomExecutionHint()).order(BucketOrder.key(true))
149147
)
150148
.get();
151149
assertSearchResponse(response);
@@ -154,11 +152,10 @@ public void testShardMinDocCountTermsTest() throws Exception {
154152

155153
}
156154

157-
private void addTermsDocs(String term, int numDocs, List<IndexRequestBuilder> builders) {
155+
private static void addTermsDocs(String term, int numDocs, List<IndexRequestBuilder> builders) {
158156
String sourceClass = "{\"text\": \"" + term + "\"}";
159157
for (int i = 0; i < numDocs; i++) {
160158
builders.add(client().prepareIndex(index, type).setSource(sourceClass, XContentType.JSON));
161159
}
162-
163160
}
164161
}

0 commit comments

Comments
 (0)