Skip to content

Fix wrong error upper bound when performing incremental reductions #43874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 22, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) {
true,
0,
buckets,
0
0L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private StringTerms newTerms(boolean withNested) {
false,
100000,
resultBuckets,
0
0L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ public void testSearchWithParentJoin() throws IOException {
assertEquals(Float.NaN, searchResponse.getHits().getMaxScore(), 0f);
assertEquals(1, searchResponse.getAggregations().asList().size());
Terms terms = searchResponse.getAggregations().get("top-tags");
assertEquals(0, terms.getDocCountError());
assertEquals(0, terms.getDocCountError().longValue());
assertEquals(0, terms.getSumOfOtherDocCounts());
assertEquals(3, terms.getBuckets().size());
for (Terms.Bucket bucket : terms.getBuckets()) {
Expand All @@ -581,7 +581,7 @@ public void testSearchWithParentJoin() throws IOException {
assertEquals(2, children.getDocCount());
assertEquals(1, children.getAggregations().asList().size());
Terms leafTerms = children.getAggregations().get("top-names");
assertEquals(0, leafTerms.getDocCountError());
assertEquals(0, leafTerms.getDocCountError().longValue());
assertEquals(0, leafTerms.getSumOfOtherDocCounts());
assertEquals(2, leafTerms.getBuckets().size());
assertEquals(2, leafTerms.getBuckets().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -88,8 +89,7 @@ public void setupSuiteScopeCluster() throws Exception {
.field(DOUBLE_FIELD_NAME, 1.0 * randomInt(numUniqueTerms))
.endObject()));
}
assertAcked(prepareCreate("idx_fixed_docs_0").setMapping(STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));

Map<String, Integer> shard0DocsPerTerm = new HashMap<>();
shard0DocsPerTerm.put("A", 25);
shard0DocsPerTerm.put("B", 18);
Expand All @@ -101,16 +101,8 @@ public void setupSuiteScopeCluster() throws Exception {
shard0DocsPerTerm.put("H", 2);
shard0DocsPerTerm.put("I", 1);
shard0DocsPerTerm.put("J", 1);
for (Map.Entry<String, Integer> entry : shard0DocsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_0").setId(term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).endObject()));
}
}
buildIndex(shard0DocsPerTerm, "idx_fixed_docs_0", 0, builders);

assertAcked(prepareCreate("idx_fixed_docs_1").setMapping(STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard1DocsPerTerm = new HashMap<>();
shard1DocsPerTerm.put("A", 30);
shard1DocsPerTerm.put("B", 25);
Expand All @@ -122,17 +114,8 @@ public void setupSuiteScopeCluster() throws Exception {
shard1DocsPerTerm.put("Q", 6);
shard1DocsPerTerm.put("J", 8);
shard1DocsPerTerm.put("C", 4);
for (Map.Entry<String, Integer> entry : shard1DocsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_1").setId(term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", 1).endObject()));
}
}
buildIndex(shard1DocsPerTerm, "idx_fixed_docs_1", 1, builders);

assertAcked(prepareCreate("idx_fixed_docs_2")
.setMapping(STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard2DocsPerTerm = new HashMap<>();
shard2DocsPerTerm.put("A", 45);
shard2DocsPerTerm.put("C", 44);
Expand All @@ -142,16 +125,49 @@ public void setupSuiteScopeCluster() throws Exception {
shard2DocsPerTerm.put("H", 28);
shard2DocsPerTerm.put("Q", 2);
shard2DocsPerTerm.put("D", 1);
for (Map.Entry<String, Integer> entry : shard2DocsPerTerm.entrySet()) {
buildIndex(shard2DocsPerTerm, "idx_fixed_docs_2", 2, builders);

Map<String, Integer> shard3DocsPerTerm = Map.of(
"A", 1,
"B", 1,
"C", 1
);
buildIndex(shard3DocsPerTerm, "idx_fixed_docs_3", 3, builders);

Map<String, Integer> shard4DocsPerTerm = Map.of(
"K", 1,
"L", 1,
"M", 1
);
buildIndex(shard4DocsPerTerm, "idx_fixed_docs_4", 4, builders);

Map<String, Integer> shard5DocsPerTerm = Map.of(
"X", 1,
"Y", 1,
"Z", 1
);
buildIndex(shard5DocsPerTerm, "idx_fixed_docs_5", 5, builders);

indexRandom(true, builders);
ensureSearchable();
}

private void buildIndex(Map<String, Integer> docsPerTerm, String index, int shard, List<IndexRequestBuilder> builders)
throws IOException {
assertAcked(
prepareCreate(index).setMapping(STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))
);
for (Map.Entry<String, Integer> entry : docsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_2").setId(term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", 2).endObject()));
builders.add(
client().prepareIndex(index)
.setId(term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", shard).endObject())
);
}
}

indexRandom(true, builders);
ensureSearchable();
}

private void assertDocCountErrorWithinBounds(int size, SearchResponse accurateResponse, SearchResponse testResponse) {
Expand Down Expand Up @@ -1014,4 +1030,21 @@ public void testFixedDocs() throws Exception {
assertThat(bucket.getDocCountError(), equalTo(29L));
}

/**
* Tests the upper bounds are correct when performing incremental reductions
* See https://github.com/elastic/elasticsearch/issues/40005 for more details
*/
public void testIncrementalReduction() {
SearchResponse response = client().prepareSearch("idx_fixed_docs_3", "idx_fixed_docs_4", "idx_fixed_docs_5")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.field(STRING_FIELD_NAME)
.showTermDocCountError(true)
.size(5).shardSize(5)
.collectMode(randomFrom(SubAggCollectionMode.values())))
.get();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms.getDocCountError(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public abstract static class AbstractTermsBucket extends InternalMultiBucketAggr

protected abstract long getSumOfOtherDocCounts();

protected abstract long getDocCountError();
protected abstract Long getDocCountError();

protected abstract void setDocCountError(long docCountError);

Expand Down Expand Up @@ -133,7 +133,7 @@ private long getDocCountError(A terms) {
if (size == 0 || size < terms.getShardSize() || isKeyOrder(terms.getOrder())) {
return 0;
} else if (InternalOrder.isCountDesc(terms.getOrder())) {
if (terms.getDocCountError() > 0) {
if (terms.getDocCountError() != null && terms.getDocCountError() > 0) {
// If there is an existing docCountError for this agg then
// use this as the error for this aggregation
return terms.getDocCountError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {

protected StringTerms buildEmptyTermsAggregation() {
return new StringTerms(name, order, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0L);
}

protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subsetSize, SignificanceHeuristic significanceHeuristic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public int hashCode() {

public DoubleTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
List<Bucket> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
otherDocCount, buckets, docCountError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
}
return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
otherDocCount, Arrays.asList(topBuckets), 0);
otherDocCount, Arrays.asList(topBuckets), 0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.search.aggregations.bucket.terms;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -32,11 +33,11 @@ public abstract class InternalMappedTerms<A extends InternalTerms<A, B>, B exten
protected final List<B> buckets;
protected Map<String, B> bucketMap;

protected long docCountError;
protected Long docCountError;

protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize,
boolean showTermDocCountError, long otherDocCount, List<B> buckets, long docCountError) {
boolean showTermDocCountError, long otherDocCount, List<B> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata);
this.format = format;
this.shardSize = shardSize;
Expand All @@ -51,7 +52,14 @@ protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder
*/
protected InternalMappedTerms(StreamInput in, Bucket.Reader<B> bucketReader) throws IOException {
super(in);
docCountError = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo fix after backport
docCountError = in.readOptionalLong();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this stay a zlong?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 why was it a zlong in the first place? Can errors be negative?

} else {
docCountError = in.readZLong();
if (docCountError == 0) {
docCountError = null;
}
}
format = in.readNamedWriteable(DocValueFormat.class);
shardSize = readSize(in);
showTermDocCountError = in.readBoolean();
Expand All @@ -61,7 +69,11 @@ protected InternalMappedTerms(StreamInput in, Bucket.Reader<B> bucketReader) thr

@Override
protected final void writeTermTypeInfoTo(StreamOutput out) throws IOException {
out.writeZLong(docCountError);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // todo fix after backport
out.writeOptionalLong(docCountError);
} else {
out.writeZLong(docCountError == null ? 0 : docCountError);
}
out.writeNamedWriteable(format);
writeSize(shardSize, out);
out.writeBoolean(showTermDocCountError);
Expand All @@ -80,7 +92,7 @@ protected int getShardSize() {
}

@Override
public long getDocCountError() {
public Long getDocCountError() {
return docCountError;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public int hashCode() {

public LongTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
List<Bucket> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
otherDocCount, buckets, docCountError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
}
return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
otherDocCount, Arrays.asList(topBuckets), 0);
otherDocCount, Arrays.asList(topBuckets), 0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
0
0L
);
}

Expand All @@ -390,7 +390,7 @@ LongTerms buildEmptyResult() {
showTermDocCountError,
0,
emptyList(),
0
0L
);
}
}
Expand Down Expand Up @@ -454,7 +454,7 @@ DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bu
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
0
0L
);
}

Expand All @@ -472,7 +472,7 @@ DoubleTerms buildEmptyResult() {
showTermDocCountError,
0,
emptyList(),
0
0L
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public abstract class ParsedTerms extends ParsedMultiBucketAggregation<ParsedTer
protected long sumOtherDocCount;

@Override
public long getDocCountError() {
public Long getDocCountError() {
return docCountErrorUpperBound;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public int hashCode() {

public StringTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
List<Bucket> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format,
shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) {
showTermDocCountError,
otherDocsCount,
buckets,
0
0L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ interface Bucket extends MultiBucketsAggregation.Bucket {
/**
* Get an upper bound of the error on document counts in this aggregation.
*/
long getDocCountError();
Long getDocCountError();

/**
* Return the sum of the document counts of all buckets that did not make
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ protected int getShardSize() {
}

@Override
public long getDocCountError() {
return 0;
public Long getDocCountError() {
return 0L;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ public void testReduceEmptyAggs() {

public void testNonFinalReduceTopLevelPipelineAggs() {
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true),
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0L);
List<InternalAggregations> aggs = singletonList(InternalAggregations.from(Collections.singletonList(terms)));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction());
assertEquals(1, reducedAggs.aggregations.size());
}

public void testFinalReduceTopLevelPipelineAggs() {
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true),
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0L);

InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(terms));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs),
Expand Down
Loading