Skip to content

Commit b5e02b5

Browse files
authored
Stop terms agg from losing buckets (backport of #70493) (#70670)
When the `terms` agg is at the top level it can run as a `filters` agg instead because that is typically faster. This was added in #68871 and we mistakely made it so that a bucket without any hits could take up a slot on the way back to the coordinating node. You could trigger this by having a fairly precise `size` on the terms agg and a top level filter. This fixes the issue by properly mimicing the regular terms aggregator in the "as filters" version: only send back buckets without any matching documents if the min_doc_count is 0. Closes #70449
1 parent f8f8cbf commit b5e02b5

File tree

4 files changed

+118
-2
lines changed

4 files changed

+118
-2
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1338,3 +1338,46 @@ setup:
13381338
- match: { aggregations.str_terms.buckets.0.key: cow }
13391339
- match: { aggregations.str_terms.buckets.0.doc_count: 1 }
13401340
- match: { aggregations.str_terms.buckets.0.filter.max_number.value: 7.0 }
1341+
1342+
---
1343+
precise size:
1344+
- do:
1345+
bulk:
1346+
index: test_1
1347+
refresh: true
1348+
body: |
1349+
{ "index": {} }
1350+
{ "str": "a" }
1351+
{ "index": {} }
1352+
{ "str": "b" }
1353+
{ "index": {} }
1354+
{ "str": "c" }
1355+
{ "index": {} }
1356+
{ "str": "b" }
1357+
{ "index": {} }
1358+
{ "str": "c" }
1359+
{ "index": {} }
1360+
{ "str": "c" }
1361+
1362+
- do:
1363+
search:
1364+
index: test_1
1365+
body:
1366+
size: 0
1367+
query:
1368+
terms:
1369+
str:
1370+
- b
1371+
- c
1372+
aggs:
1373+
str_terms:
1374+
terms:
1375+
size: 2
1376+
field: str
1377+
order:
1378+
- _key : asc
1379+
- length: { aggregations.str_terms.buckets: 2 }
1380+
- match: { aggregations.str_terms.buckets.0.key: b }
1381+
- match: { aggregations.str_terms.buckets.0.doc_count: 2 }
1382+
- match: { aggregations.str_terms.buckets.1.key: c }
1383+
- match: { aggregations.str_terms.buckets.1.doc_count: 3 }

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,21 @@ protected InternalAggregation adapt(InternalAggregation delegateResult) throws I
154154
List<StringTerms.Bucket> buckets;
155155
long otherDocsCount = 0;
156156
BucketOrder reduceOrder = isKeyOrder(order) ? order : InternalOrder.key(true);
157+
/*
158+
* We default to a shardMinDocCount of 0 which means we'd keep all
159+
* hits, even those that don't have live documents or those that
160+
* don't match any documents in the top level query. This is correct
161+
* if the minDocCount is also 0, but if it is larger than 0 then we
162+
* don't need to send those buckets back to the coordinating node.
163+
* GlobalOrdinalsStringTermsAggregator doesn't collect those
164+
* buckets either. It's a good thing, too, because if you take them
165+
* into account when you sort by, say, key, you might throw away
166+
* buckets with actual docs in them.
167+
*/
168+
long minDocCount = bucketCountThresholds.getShardMinDocCount();
169+
if (minDocCount == 0 && bucketCountThresholds.getMinDocCount() > 0) {
170+
minDocCount = 1;
171+
}
157172
if (filters.getBuckets().size() > bucketCountThresholds.getShardSize()) {
158173
PriorityQueue<OrdBucket> queue = new PriorityQueue<OrdBucket>(bucketCountThresholds.getShardSize()) {
159174
private final Comparator<Bucket> comparator = order.comparator();
@@ -165,7 +180,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) {
165180
};
166181
OrdBucket spare = null;
167182
for (InternalFilters.InternalBucket b : filters.getBuckets()) {
168-
if (b.getDocCount() < bucketCountThresholds.getShardMinDocCount()) {
183+
if (b.getDocCount() < minDocCount) {
169184
continue;
170185
}
171186
if (spare == null) {
@@ -203,7 +218,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) {
203218
} else {
204219
buckets = new ArrayList<>(filters.getBuckets().size());
205220
for (InternalFilters.InternalBucket b : filters.getBuckets()) {
206-
if (b.getDocCount() < bucketCountThresholds.getShardMinDocCount()) {
221+
if (b.getDocCount() < minDocCount) {
207222
continue;
208223
}
209224
buckets.add(buildBucket(b));

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ public void ensureValidity() {
9494
}
9595
}
9696

97+
/**
98+
* The minimum number of documents a bucket must have in order to
99+
* be returned from a shard.
100+
* <p>
101+
* Important: The default for this is 0, but we should only return
102+
* 0 document buckets if {@link #getMinDocCount()} is *also* 0.
103+
*/
97104
public long getShardMinDocCount() {
98105
return shardMinDocCount;
99106
}
@@ -102,6 +109,10 @@ public void setShardMinDocCount(long shardMinDocCount) {
102109
this.shardMinDocCount = shardMinDocCount;
103110
}
104111

112+
/**
113+
* The minimum numbers of documents a bucket must have in order to
114+
* survive the final reduction.
115+
*/
105116
public long getMinDocCount() {
106117
return minDocCount;
107118
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.lucene.search.DocValuesFieldExistsQuery;
2626
import org.apache.lucene.search.IndexSearcher;
2727
import org.apache.lucene.search.MatchAllDocsQuery;
28+
import org.apache.lucene.search.Query;
29+
import org.apache.lucene.search.TermInSetQuery;
2830
import org.apache.lucene.search.TotalHits;
2931
import org.apache.lucene.store.Directory;
3032
import org.apache.lucene.util.BytesRef;
@@ -1699,6 +1701,51 @@ public void testAsSubAgg() throws IOException {
16991701
}, dft, kft);
17001702
}
17011703

1704+
public void testWithFilterAndPreciseSize() throws IOException {
1705+
KeywordFieldType kft = new KeywordFieldType("k", true, true, Collections.emptyMap());
1706+
CheckedConsumer<RandomIndexWriter, IOException> buildIndex = iw -> {
1707+
iw.addDocument(
1708+
org.elasticsearch.common.collect.List.of(
1709+
new Field("k", new BytesRef("a"), KeywordFieldMapper.Defaults.FIELD_TYPE),
1710+
new SortedSetDocValuesField("k", new BytesRef("a"))
1711+
)
1712+
);
1713+
iw.addDocument(
1714+
org.elasticsearch.common.collect.List.of(
1715+
new Field("k", new BytesRef("b"), KeywordFieldMapper.Defaults.FIELD_TYPE),
1716+
new SortedSetDocValuesField("k", new BytesRef("b"))
1717+
)
1718+
);
1719+
iw.addDocument(
1720+
org.elasticsearch.common.collect.List.of(
1721+
new Field("k", new BytesRef("c"), KeywordFieldMapper.Defaults.FIELD_TYPE),
1722+
new SortedSetDocValuesField("k", new BytesRef("c"))
1723+
)
1724+
);
1725+
};
1726+
TermsAggregationBuilder builder = new TermsAggregationBuilder("k").field("k");
1727+
/*
1728+
* There was a bug where we would accidentally send buckets with 0
1729+
* docs in them back to the coordinating node which would take up a
1730+
* slot that a bucket with docs in it deserves. Combination of
1731+
* ordering by bucket, the precise size, and the top level query
1732+
* would trigger that bug.
1733+
*/
1734+
builder.size(2).order(BucketOrder.key(true));
1735+
Query topLevel = new TermInSetQuery("k", new BytesRef[] {new BytesRef("b"), new BytesRef("c")});
1736+
testCase(builder, topLevel, buildIndex, (StringTerms terms) -> {
1737+
assertThat(
1738+
terms.getBuckets().stream().map(StringTerms.Bucket::getKey).collect(toList()),
1739+
equalTo(org.elasticsearch.common.collect.List.of("b", "c"))
1740+
);
1741+
}, kft);
1742+
withAggregator(builder, topLevel, buildIndex, (searcher, terms) -> {
1743+
Map<String, Object> info = new HashMap<>();
1744+
terms.collectDebugInfo(info::put);
1745+
assertThat(info, hasEntry("delegate", "FiltersAggregator.FilterByFilter"));
1746+
}, kft);
1747+
}
1748+
17021749
private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
17031750
private List<Document> generateDocsWithNested(String id, int value, int[] nestedValues) {
17041751
List<Document> documents = new ArrayList<>();

0 commit comments

Comments
 (0)