Skip to content

Remove single shard optimization when suggesting shard_size #37041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ If the number of unique terms is greater than `size`, the returned list can be s
size buckets was not returned).

To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard
using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter
(`2 * (size * 1.5 + 10)`). To take manual control of this setting the `shard_size` parameter
can be used to control the volumes of candidate terms produced by each shard.

Low-frequency terms can turn out to be the most interesting ones once all results are combined so the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ If the number of unique terms is greater than `size`, the returned list can be s
size buckets was not returned).

To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard
using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter
(`2 * (size * 1.5 + 10)`). To take manual control of this setting the `shard_size` parameter
can be used to control the volumes of candidate terms produced by each shard.

Low-frequency terms can turn out to be the most interesting ones once all results are combined so the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sens
override it and reset it to be equal to `size`.


The default `shard_size` will be `size` if the search request needs to go to a single shard, and `(size * 1.5 + 10)`
otherwise.
The default `shard_size` is `(size * 1.5 + 10)`.

==== Calculating Document Count Error

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@ private BucketUtils() {}
*
* @param finalSize
* The number of terms required in the final reduce phase.
* @param singleShard
* whether a single shard is being queried, or multiple shards
* @return A suggested default for the size of any shard-side PriorityQueues
*/
public static int suggestShardSideQueueSize(int finalSize, boolean singleShard) {
public static int suggestShardSideQueueSize(int finalSize) {
if (finalSize < 1) {
throw new IllegalArgumentException("size must be positive, got " + finalSize);
}
if (singleShard) {
// In the case of a single shard, we do not need to over-request
return finalSize;
}
// Request 50% more buckets on the shards in order to improve accuracy
// as well as a small constant that should help with small values of 'size'
final long shardSampleSize = (long) (finalSize * 1.5 + 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public int shardSize() {
if (shardSize < 0) {
// Use default heuristic to avoid any wrong-ranking caused by
// distributed counting
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards() == 1);
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize);
}

if (requiredSize <= 0 || shardSize <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
// each shard and as
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards() == 1));
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}

if (valuesSource instanceof ValuesSource.Bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingl
// we want to find have only one occurrence on each shard and as
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards() == 1));
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}

// TODO - need to check with mapping that this is indeed a text field....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
// The user has not made a shardSize selection. Use default
// heuristic to avoid any wrong-ranking caused by distributed
// counting
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards() == 1));
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}
bucketCountThresholds.ensureValidity();
if (valuesSource instanceof ValuesSource.Bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,22 @@ public class BucketUtilsTests extends ESTestCase {

public void testBadInput() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> BucketUtils.suggestShardSideQueueSize(0, randomBoolean()));
() -> BucketUtils.suggestShardSideQueueSize(0));
assertEquals(e.getMessage(), "size must be positive, got 0");
}

public void testOptimizesSingleShard() {
for (int iter = 0; iter < 10; ++iter) {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, true));
}
}

public void testOverFlow() {
for (int iter = 0; iter < 10; ++iter) {
final int size = Integer.MAX_VALUE - randomInt(10);
final int numberOfShards = randomIntBetween(1, 10);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size);
assertThat(shardSize, greaterThanOrEqualTo(shardSize));
}
}

public void testShardSizeIsGreaterThanGlobalSize() {
for (int iter = 0; iter < 10; ++iter) {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
final int numberOfShards = randomIntBetween(1, 10);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size);
assertThat(shardSize, greaterThanOrEqualTo(size));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,30 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.search.aggregations.AggregationBuilders.filter;
import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;

public class TermsShardMinDocCountIT extends ESIntegTestCase {
private static final String index = "someindex";
private static final String type = "testtype";
public String randomExecutionHint() {

private static String randomExecutionHint() {
return randomBoolean() ? null : randomFrom(SignificantTermsAggregatorFactory.ExecutionMode.values()).toString();
}

Expand Down Expand Up @@ -74,7 +75,7 @@ public void testShardMinDocCountSignificantTermsTest() throws Exception {
SearchResponse response = client().prepareSearch(index)
.addAggregation(
(filter("inclass", QueryBuilders.termQuery("class", true)))
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).size(2)
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).size(2).shardSize(2)
.executionHint(randomExecutionHint()))
)
.get();
Expand All @@ -87,16 +88,14 @@ public void testShardMinDocCountSignificantTermsTest() throws Exception {
response = client().prepareSearch(index)
.addAggregation(
(filter("inclass", QueryBuilders.termQuery("class", true)))
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2)
.shardMinDocCount(2).size(2)
.executionHint(randomExecutionHint()))
.subAggregation(significantTerms("mySignificantTerms").field("text").minDocCount(2).shardSize(2)
.shardMinDocCount(2).size(2).executionHint(randomExecutionHint()))
)
.get();
assertSearchResponse(response);
filteredBucket = response.getAggregations().get("inclass");
sigterms = filteredBucket.getAggregations().get("mySignificantTerms");
assertThat(sigterms.getBuckets().size(), equalTo(2));

}

private void addTermsDocs(String term, int numInClass, int numNotInClass, List<IndexRequestBuilder> builders) {
Expand Down Expand Up @@ -133,19 +132,18 @@ public void testShardMinDocCountTermsTest() throws Exception {
// first, check that indeed when not setting the shardMinDocCount parameter 0 terms are returned
SearchResponse response = client().prepareSearch(index)
.addAggregation(
terms("myTerms").field("text").minDocCount(2).size(2).executionHint(randomExecutionHint())
.order(BucketOrder.key(true))
terms("myTerms").field("text").minDocCount(2).size(2).shardSize(2).executionHint(randomExecutionHint())
.order(BucketOrder.key(true))
)
.get();
assertSearchResponse(response);
Terms sigterms = response.getAggregations().get("myTerms");
assertThat(sigterms.getBuckets().size(), equalTo(0));


response = client().prepareSearch(index)
.addAggregation(
terms("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).executionHint(randomExecutionHint())
.order(BucketOrder.key(true))
terms("myTerms").field("text").minDocCount(2).shardMinDocCount(2).size(2).shardSize(2)
.executionHint(randomExecutionHint()).order(BucketOrder.key(true))
)
.get();
assertSearchResponse(response);
Expand All @@ -154,11 +152,10 @@ public void testShardMinDocCountTermsTest() throws Exception {

}

private void addTermsDocs(String term, int numDocs, List<IndexRequestBuilder> builders) {
private static void addTermsDocs(String term, int numDocs, List<IndexRequestBuilder> builders) {
String sourceClass = "{\"text\": \"" + term + "\"}";
for (int i = 0; i < numDocs; i++) {
builders.add(client().prepareIndex(index, type).setSource(sourceClass, XContentType.JSON));
}

}
}