Skip to content

Commit 1db17ad

Browse files
Hoholimotovelasticmachine
authored
Fix wrong error upper bound when performing incremental reductions (#43874)
When performing incremental reductions, 0 value of docCountError may mean that the error was not previously calculated, or that the error was indeed previously calculated and its value was 0. We end up rejecting true values set to 0 this way. This may lead to wrong upper bound of error in result. To fix it, this PR makes docCountError nullable. null values mean that error was not calculated yet. Fixes #40005 Co-authored-by: Igor Motov <[email protected]> Co-authored-by: Elastic Machine <[email protected]>
1 parent 0232675 commit 1db17ad

File tree

21 files changed

+104
-59
lines changed

21 files changed

+104
-59
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) {
154154
true,
155155
0,
156156
buckets,
157-
0
157+
0L
158158
);
159159
}
160160

benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private StringTerms newTerms(boolean withNested) {
7070
false,
7171
100000,
7272
resultBuckets,
73-
0
73+
0L
7474
);
7575
}
7676

client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ public void testSearchWithParentJoin() throws IOException {
569569
assertEquals(Float.NaN, searchResponse.getHits().getMaxScore(), 0f);
570570
assertEquals(1, searchResponse.getAggregations().asList().size());
571571
Terms terms = searchResponse.getAggregations().get("top-tags");
572-
assertEquals(0, terms.getDocCountError());
572+
assertEquals(0, terms.getDocCountError().longValue());
573573
assertEquals(0, terms.getSumOfOtherDocCounts());
574574
assertEquals(3, terms.getBuckets().size());
575575
for (Terms.Bucket bucket : terms.getBuckets()) {
@@ -581,7 +581,7 @@ public void testSearchWithParentJoin() throws IOException {
581581
assertEquals(2, children.getDocCount());
582582
assertEquals(1, children.getAggregations().asList().size());
583583
Terms leafTerms = children.getAggregations().get("top-names");
584-
assertEquals(0, leafTerms.getDocCountError());
584+
assertEquals(0, leafTerms.getDocCountError().longValue());
585585
assertEquals(0, leafTerms.getSumOfOtherDocCounts());
586586
assertEquals(2, leafTerms.getBuckets().size());
587587
assertEquals(2, leafTerms.getBuckets().size());

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java

Lines changed: 60 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
2020
import org.elasticsearch.test.ESIntegTestCase;
2121

22+
import java.io.IOException;
2223
import java.util.ArrayList;
2324
import java.util.HashMap;
2425
import java.util.List;
@@ -88,8 +89,7 @@ public void setupSuiteScopeCluster() throws Exception {
8889
.field(DOUBLE_FIELD_NAME, 1.0 * randomInt(numUniqueTerms))
8990
.endObject()));
9091
}
91-
assertAcked(prepareCreate("idx_fixed_docs_0").setMapping(STRING_FIELD_NAME, "type=keyword")
92-
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
92+
9393
Map<String, Integer> shard0DocsPerTerm = new HashMap<>();
9494
shard0DocsPerTerm.put("A", 25);
9595
shard0DocsPerTerm.put("B", 18);
@@ -101,16 +101,8 @@ public void setupSuiteScopeCluster() throws Exception {
101101
shard0DocsPerTerm.put("H", 2);
102102
shard0DocsPerTerm.put("I", 1);
103103
shard0DocsPerTerm.put("J", 1);
104-
for (Map.Entry<String, Integer> entry : shard0DocsPerTerm.entrySet()) {
105-
for (int i = 0; i < entry.getValue(); i++) {
106-
String term = entry.getKey();
107-
builders.add(client().prepareIndex("idx_fixed_docs_0").setId(term + "-" + i)
108-
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).endObject()));
109-
}
110-
}
104+
buildIndex(shard0DocsPerTerm, "idx_fixed_docs_0", 0, builders);
111105

112-
assertAcked(prepareCreate("idx_fixed_docs_1").setMapping(STRING_FIELD_NAME, "type=keyword")
113-
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
114106
Map<String, Integer> shard1DocsPerTerm = new HashMap<>();
115107
shard1DocsPerTerm.put("A", 30);
116108
shard1DocsPerTerm.put("B", 25);
@@ -122,17 +114,8 @@ public void setupSuiteScopeCluster() throws Exception {
122114
shard1DocsPerTerm.put("Q", 6);
123115
shard1DocsPerTerm.put("J", 8);
124116
shard1DocsPerTerm.put("C", 4);
125-
for (Map.Entry<String, Integer> entry : shard1DocsPerTerm.entrySet()) {
126-
for (int i = 0; i < entry.getValue(); i++) {
127-
String term = entry.getKey();
128-
builders.add(client().prepareIndex("idx_fixed_docs_1").setId(term + "-" + i)
129-
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", 1).endObject()));
130-
}
131-
}
117+
buildIndex(shard1DocsPerTerm, "idx_fixed_docs_1", 1, builders);
132118

133-
assertAcked(prepareCreate("idx_fixed_docs_2")
134-
.setMapping(STRING_FIELD_NAME, "type=keyword")
135-
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
136119
Map<String, Integer> shard2DocsPerTerm = new HashMap<>();
137120
shard2DocsPerTerm.put("A", 45);
138121
shard2DocsPerTerm.put("C", 44);
@@ -142,16 +125,49 @@ public void setupSuiteScopeCluster() throws Exception {
142125
shard2DocsPerTerm.put("H", 28);
143126
shard2DocsPerTerm.put("Q", 2);
144127
shard2DocsPerTerm.put("D", 1);
145-
for (Map.Entry<String, Integer> entry : shard2DocsPerTerm.entrySet()) {
128+
buildIndex(shard2DocsPerTerm, "idx_fixed_docs_2", 2, builders);
129+
130+
Map<String, Integer> shard3DocsPerTerm = Map.of(
131+
"A", 1,
132+
"B", 1,
133+
"C", 1
134+
);
135+
buildIndex(shard3DocsPerTerm, "idx_fixed_docs_3", 3, builders);
136+
137+
Map<String, Integer> shard4DocsPerTerm = Map.of(
138+
"K", 1,
139+
"L", 1,
140+
"M", 1
141+
);
142+
buildIndex(shard4DocsPerTerm, "idx_fixed_docs_4", 4, builders);
143+
144+
Map<String, Integer> shard5DocsPerTerm = Map.of(
145+
"X", 1,
146+
"Y", 1,
147+
"Z", 1
148+
);
149+
buildIndex(shard5DocsPerTerm, "idx_fixed_docs_5", 5, builders);
150+
151+
indexRandom(true, builders);
152+
ensureSearchable();
153+
}
154+
155+
private void buildIndex(Map<String, Integer> docsPerTerm, String index, int shard, List<IndexRequestBuilder> builders)
156+
throws IOException {
157+
assertAcked(
158+
prepareCreate(index).setMapping(STRING_FIELD_NAME, "type=keyword")
159+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))
160+
);
161+
for (Map.Entry<String, Integer> entry : docsPerTerm.entrySet()) {
146162
for (int i = 0; i < entry.getValue(); i++) {
147163
String term = entry.getKey();
148-
builders.add(client().prepareIndex("idx_fixed_docs_2").setId(term + "-" + i)
149-
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", 2).endObject()));
164+
builders.add(
165+
client().prepareIndex(index)
166+
.setId(term + "-" + i)
167+
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", shard).endObject())
168+
);
150169
}
151170
}
152-
153-
indexRandom(true, builders);
154-
ensureSearchable();
155171
}
156172

157173
private void assertDocCountErrorWithinBounds(int size, SearchResponse accurateResponse, SearchResponse testResponse) {
@@ -1014,4 +1030,21 @@ public void testFixedDocs() throws Exception {
10141030
assertThat(bucket.getDocCountError(), equalTo(29L));
10151031
}
10161032

1033+
/**
1034+
* Tests the upper bounds are correct when performing incremental reductions
1035+
* See https://github.com/elastic/elasticsearch/issues/40005 for more details
1036+
*/
1037+
public void testIncrementalReduction() {
1038+
SearchResponse response = client().prepareSearch("idx_fixed_docs_3", "idx_fixed_docs_4", "idx_fixed_docs_5")
1039+
.addAggregation(terms("terms")
1040+
.executionHint(randomExecutionHint())
1041+
.field(STRING_FIELD_NAME)
1042+
.showTermDocCountError(true)
1043+
.size(5).shardSize(5)
1044+
.collectMode(randomFrom(SubAggCollectionMode.values())))
1045+
.get();
1046+
assertSearchResponse(response);
1047+
Terms terms = response.getAggregations().get("terms");
1048+
assertThat(terms.getDocCountError(), equalTo(0L));
1049+
}
10171050
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public abstract static class AbstractTermsBucket extends InternalMultiBucketAggr
7676

7777
protected abstract long getSumOfOtherDocCounts();
7878

79-
protected abstract long getDocCountError();
79+
protected abstract Long getDocCountError();
8080

8181
protected abstract void setDocCountError(long docCountError);
8282

@@ -133,7 +133,7 @@ private long getDocCountError(A terms) {
133133
if (size == 0 || size < terms.getShardSize() || isKeyOrder(terms.getOrder())) {
134134
return 0;
135135
} else if (InternalOrder.isCountDesc(terms.getOrder())) {
136-
if (terms.getDocCountError() > 0) {
136+
if (terms.getDocCountError() != null && terms.getDocCountError() > 0) {
137137
// If there is an existing docCountError for this agg then
138138
// use this as the error for this aggregation
139139
return terms.getDocCountError();

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {
3333

3434
protected StringTerms buildEmptyTermsAggregation() {
3535
return new StringTerms(name, order, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
36-
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
36+
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0L);
3737
}
3838

3939
protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subsetSize, SignificanceHeuristic significanceHeuristic) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public int hashCode() {
9191

9292
public DoubleTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
9393
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
94-
List<Bucket> buckets, long docCountError) {
94+
List<Bucket> buckets, Long docCountError) {
9595
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
9696
otherDocCount, buckets, docCountError);
9797
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
765765
}
766766
return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
767767
bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
768-
otherDocCount, Arrays.asList(topBuckets), 0);
768+
otherDocCount, Arrays.asList(topBuckets), 0L);
769769
}
770770

771771
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedTerms.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.search.aggregations.bucket.terms;
1010

11+
import org.elasticsearch.Version;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -32,11 +33,11 @@ public abstract class InternalMappedTerms<A extends InternalTerms<A, B>, B exten
3233
protected final List<B> buckets;
3334
protected Map<String, B> bucketMap;
3435

35-
protected long docCountError;
36+
protected Long docCountError;
3637

3738
protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
3839
Map<String, Object> metadata, DocValueFormat format, int shardSize,
39-
boolean showTermDocCountError, long otherDocCount, List<B> buckets, long docCountError) {
40+
boolean showTermDocCountError, long otherDocCount, List<B> buckets, Long docCountError) {
4041
super(name, reduceOrder, order, requiredSize, minDocCount, metadata);
4142
this.format = format;
4243
this.shardSize = shardSize;
@@ -51,7 +52,14 @@ protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder
5152
*/
5253
protected InternalMappedTerms(StreamInput in, Bucket.Reader<B> bucketReader) throws IOException {
5354
super(in);
54-
docCountError = in.readZLong();
55+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo fix after backport
56+
docCountError = in.readOptionalLong();
57+
} else {
58+
docCountError = in.readZLong();
59+
if (docCountError == 0) {
60+
docCountError = null;
61+
}
62+
}
5563
format = in.readNamedWriteable(DocValueFormat.class);
5664
shardSize = readSize(in);
5765
showTermDocCountError = in.readBoolean();
@@ -61,7 +69,11 @@ protected InternalMappedTerms(StreamInput in, Bucket.Reader<B> bucketReader) thr
6169

6270
@Override
6371
protected final void writeTermTypeInfoTo(StreamOutput out) throws IOException {
64-
out.writeZLong(docCountError);
72+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // todo fix after backport
73+
out.writeOptionalLong(docCountError);
74+
} else {
75+
out.writeZLong(docCountError == null ? 0 : docCountError);
76+
}
6577
out.writeNamedWriteable(format);
6678
writeSize(shardSize, out);
6779
out.writeBoolean(showTermDocCountError);
@@ -80,7 +92,7 @@ protected int getShardSize() {
8092
}
8193

8294
@Override
83-
public long getDocCountError() {
95+
public Long getDocCountError() {
8496
return docCountError;
8597
}
8698

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public int hashCode() {
103103

104104
public LongTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
105105
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
106-
List<Bucket> buckets, long docCountError) {
106+
List<Bucket> buckets, Long docCountError) {
107107
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
108108
otherDocCount, buckets, docCountError);
109109
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
446446
}
447447
return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
448448
bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
449-
otherDocCount, Arrays.asList(topBuckets), 0);
449+
otherDocCount, Arrays.asList(topBuckets), 0L);
450450
}
451451

452452
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket
372372
showTermDocCountError,
373373
otherDocCount,
374374
List.of(topBuckets),
375-
0
375+
0L
376376
);
377377
}
378378

@@ -390,7 +390,7 @@ LongTerms buildEmptyResult() {
390390
showTermDocCountError,
391391
0,
392392
emptyList(),
393-
0
393+
0L
394394
);
395395
}
396396
}
@@ -454,7 +454,7 @@ DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bu
454454
showTermDocCountError,
455455
otherDocCount,
456456
List.of(topBuckets),
457-
0
457+
0L
458458
);
459459
}
460460

@@ -472,7 +472,7 @@ DoubleTerms buildEmptyResult() {
472472
showTermDocCountError,
473473
0,
474474
emptyList(),
475-
0
475+
0L
476476
);
477477
}
478478
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/ParsedTerms.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public abstract class ParsedTerms extends ParsedMultiBucketAggregation<ParsedTer
3232
protected long sumOtherDocCount;
3333

3434
@Override
35-
public long getDocCountError() {
35+
public Long getDocCountError() {
3636
return docCountErrorUpperBound;
3737
}
3838

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public int hashCode() {
9494

9595
public StringTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
9696
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
97-
List<Bucket> buckets, long docCountError) {
97+
List<Bucket> buckets, Long docCountError) {
9898
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format,
9999
shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
100100
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) {
227227
showTermDocCountError,
228228
otherDocsCount,
229229
buckets,
230-
0
230+
0L
231231
);
232232
}
233233

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/Terms.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ interface Bucket extends MultiBucketsAggregation.Bucket {
4141
/**
4242
* Get an upper bound of the error on document counts in this aggregation.
4343
*/
44-
long getDocCountError();
44+
Long getDocCountError();
4545

4646
/**
4747
* Return the sum of the document counts of all buckets that did not make

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ protected int getShardSize() {
110110
}
111111

112112
@Override
113-
public long getDocCountError() {
114-
return 0;
113+
public Long getDocCountError() {
114+
return 0L;
115115
}
116116

117117
@Override

server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ public void testReduceEmptyAggs() {
4949

5050
public void testNonFinalReduceTopLevelPipelineAggs() {
5151
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true),
52-
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
52+
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0L);
5353
List<InternalAggregations> aggs = singletonList(InternalAggregations.from(Collections.singletonList(terms)));
5454
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction());
5555
assertEquals(1, reducedAggs.aggregations.size());
5656
}
5757

5858
public void testFinalReduceTopLevelPipelineAggs() {
5959
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true),
60-
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
60+
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0L);
6161

6262
InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(terms));
6363
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs),

0 commit comments

Comments
 (0)