Skip to content

Commit 8e6d478

Browse files
authored
Stop terms agg from losing buckets (#70493)
When the `terms` agg is at the top level it can run as a `filters` agg instead because that is typically faster. This was added in #68871 and we mistakely made it so that a bucket without any hits could take up a slot on the way back to the coordinating node. You could trigger this by having a fairly precise `size` on the terms agg and a top level filter. This fixes the issue by properly mimicing the regular terms aggregator in the "as filters" version: only send back buckets without any matching documents if the min_doc_count is 0. Closes #70449
1 parent 7807811 commit 8e6d478

File tree

4 files changed

+119
-2
lines changed

4 files changed

+119
-2
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/20_terms.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,3 +1281,50 @@ setup:
12811281
- match: { aggregations.str_terms.buckets.0.key: cow }
12821282
- match: { aggregations.str_terms.buckets.0.doc_count: 1 }
12831283
- match: { aggregations.str_terms.buckets.0.filter.max_number.value: 7.0 }
1284+
1285+
---
1286+
precise size:
1287+
- skip:
1288+
version: " - 7.99.99"
1289+
reason: "Fixed in 8.0.0, to be backported to 7.13. Once backported this should *always* pass without a need for a skip"
1290+
1291+
- do:
1292+
bulk:
1293+
index: test_1
1294+
refresh: true
1295+
body: |
1296+
{ "index": {} }
1297+
{ "str": "a" }
1298+
{ "index": {} }
1299+
{ "str": "b" }
1300+
{ "index": {} }
1301+
{ "str": "c" }
1302+
{ "index": {} }
1303+
{ "str": "b" }
1304+
{ "index": {} }
1305+
{ "str": "c" }
1306+
{ "index": {} }
1307+
{ "str": "c" }
1308+
1309+
- do:
1310+
search:
1311+
index: test_1
1312+
body:
1313+
size: 0
1314+
query:
1315+
terms:
1316+
str:
1317+
- b
1318+
- c
1319+
aggs:
1320+
str_terms:
1321+
terms:
1322+
size: 2
1323+
field: str
1324+
order:
1325+
- _key : asc
1326+
- length: { aggregations.str_terms.buckets: 2 }
1327+
- match: { aggregations.str_terms.buckets.0.key: b }
1328+
- match: { aggregations.str_terms.buckets.0.doc_count: 2 }
1329+
- match: { aggregations.str_terms.buckets.1.key: c }
1330+
- match: { aggregations.str_terms.buckets.1.doc_count: 3 }

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,21 @@ protected InternalAggregation adapt(InternalAggregation delegateResult) throws I
154154
List<StringTerms.Bucket> buckets;
155155
long otherDocsCount = 0;
156156
BucketOrder reduceOrder = isKeyOrder(order) ? order : InternalOrder.key(true);
157+
/*
158+
* We default to a shardMinDocCount of 0 which means we'd keep all
159+
* hits, even those that don't have live documents or those that
160+
* don't match any documents in the top level query. This is correct
161+
* if the minDocCount is also 0, but if it is larger than 0 then we
162+
* don't need to send those buckets back to the coordinating node.
163+
* GlobalOrdinalsStringTermsAggregator doesn't collect those
164+
* buckets either. It's a good thing, too, because if you take them
165+
* into account when you sort by, say, key, you might throw away
166+
* buckets with actual docs in them.
167+
*/
168+
long minDocCount = bucketCountThresholds.getShardMinDocCount();
169+
if (minDocCount == 0 && bucketCountThresholds.getMinDocCount() > 0) {
170+
minDocCount = 1;
171+
}
157172
if (filters.getBuckets().size() > bucketCountThresholds.getShardSize()) {
158173
PriorityQueue<OrdBucket> queue = new PriorityQueue<OrdBucket>(bucketCountThresholds.getShardSize()) {
159174
private final Comparator<Bucket> comparator = order.comparator();
@@ -165,7 +180,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) {
165180
};
166181
OrdBucket spare = null;
167182
for (InternalFilters.InternalBucket b : filters.getBuckets()) {
168-
if (b.getDocCount() < bucketCountThresholds.getShardMinDocCount()) {
183+
if (b.getDocCount() < minDocCount) {
169184
continue;
170185
}
171186
if (spare == null) {
@@ -203,7 +218,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) {
203218
} else {
204219
buckets = new ArrayList<>(filters.getBuckets().size());
205220
for (InternalFilters.InternalBucket b : filters.getBuckets()) {
206-
if (b.getDocCount() < bucketCountThresholds.getShardMinDocCount()) {
221+
if (b.getDocCount() < minDocCount) {
207222
continue;
208223
}
209224
buckets.add(buildBucket(b));

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ public void ensureValidity() {
9494
}
9595
}
9696

97+
/**
98+
* The minimum number of documents a bucket must have in order to
99+
* be returned from a shard.
100+
* <p>
101+
* Important: The default for this is 0, but we should only return
102+
* 0 document buckets if {@link #getMinDocCount()} is *also* 0.
103+
*/
97104
public long getShardMinDocCount() {
98105
return shardMinDocCount;
99106
}
@@ -102,6 +109,10 @@ public void setShardMinDocCount(long shardMinDocCount) {
102109
this.shardMinDocCount = shardMinDocCount;
103110
}
104111

112+
/**
113+
* The minimum numbers of documents a bucket must have in order to
114+
* survive the final reduction.
115+
*/
105116
public long getMinDocCount() {
106117
return minDocCount;
107118
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.lucene.search.DocValuesFieldExistsQuery;
2626
import org.apache.lucene.search.IndexSearcher;
2727
import org.apache.lucene.search.MatchAllDocsQuery;
28+
import org.apache.lucene.search.Query;
29+
import org.apache.lucene.search.TermInSetQuery;
2830
import org.apache.lucene.search.TotalHits;
2931
import org.apache.lucene.store.Directory;
3032
import org.apache.lucene.util.BytesRef;
@@ -1687,6 +1689,48 @@ public void testAsSubAgg() throws IOException {
16871689
}, dft, kft);
16881690
}
16891691

1692+
public void testWithFilterAndPreciseSize() throws IOException {
1693+
KeywordFieldType kft = new KeywordFieldType("k", true, true, Collections.emptyMap());
1694+
CheckedConsumer<RandomIndexWriter, IOException> buildIndex = iw -> {
1695+
iw.addDocument(
1696+
List.of(
1697+
new Field("k", new BytesRef("a"), KeywordFieldMapper.Defaults.FIELD_TYPE),
1698+
new SortedSetDocValuesField("k", new BytesRef("a"))
1699+
)
1700+
);
1701+
iw.addDocument(
1702+
List.of(
1703+
new Field("k", new BytesRef("b"), KeywordFieldMapper.Defaults.FIELD_TYPE),
1704+
new SortedSetDocValuesField("k", new BytesRef("b"))
1705+
)
1706+
);
1707+
iw.addDocument(
1708+
List.of(
1709+
new Field("k", new BytesRef("c"), KeywordFieldMapper.Defaults.FIELD_TYPE),
1710+
new SortedSetDocValuesField("k", new BytesRef("c"))
1711+
)
1712+
);
1713+
};
1714+
TermsAggregationBuilder builder = new TermsAggregationBuilder("k").field("k");
1715+
/*
1716+
* There was a bug where we would accidentally send buckets with 0
1717+
* docs in them back to the coordinating node which would take up a
1718+
* slot that a bucket with docs in it deserves. Combination of
1719+
* ordering by bucket, the precise size, and the top level query
1720+
* would trigger that bug.
1721+
*/
1722+
builder.size(2).order(BucketOrder.key(true));
1723+
Query topLevel = new TermInSetQuery("k", new BytesRef[] {new BytesRef("b"), new BytesRef("c")});
1724+
testCase(builder, topLevel, buildIndex, (StringTerms terms) -> {
1725+
assertThat(terms.getBuckets().stream().map(StringTerms.Bucket::getKey).collect(toList()), equalTo(List.of("b", "c")));
1726+
}, kft);
1727+
withAggregator(builder, topLevel, buildIndex, (searcher, terms) -> {
1728+
Map<String, Object> info = new HashMap<>();
1729+
terms.collectDebugInfo(info::put);
1730+
assertThat(info, hasEntry("delegate", "FiltersAggregator.FilterByFilter"));
1731+
}, kft);
1732+
}
1733+
16901734
private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
16911735
private List<Document> generateDocsWithNested(String id, int value, int[] nestedValues) {
16921736
List<Document> documents = new ArrayList<>();

0 commit comments

Comments
 (0)