Skip to content

Commit 5648dfa

Browse files
committed
Terms aggregation should remap global ordinal buckets when a sub-aggregator is used to sort the terms (#24941)
`terms` aggregations at the root level use the `global_ordinals` execution hint by default. When all sub-aggregators can be run in `breadth_first` mode the collected buckets for these sub-aggs are dense (remapped after the initial pruning). But if a sub-aggregator is not deferrable and needs to collect all buckets before pruning we don't remap global ords and the aggregator needs to deal with sparse buckets. Most (if not all) aggregators expect dense buckets and uses this information to allocate memories. This change forces the remap of the global ordinals but only when there is at least one sub-aggregator that cannot be deferred. Relates #24788
1 parent a9df691 commit 5648dfa

File tree

8 files changed

+362
-277
lines changed

8 files changed

+362
-277
lines changed

core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java

Lines changed: 22 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.lucene.index.LeafReaderContext;
2323
import org.apache.lucene.util.BytesRef;
2424
import org.elasticsearch.common.lease.Releasables;
25-
import org.elasticsearch.common.util.LongHash;
2625
import org.elasticsearch.search.DocValueFormat;
2726
import org.elasticsearch.search.aggregations.Aggregator;
2827
import org.elasticsearch.search.aggregations.AggregatorFactories;
@@ -52,22 +51,28 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
5251
protected final SignificantTermsAggregatorFactory termsAggFactory;
5352
private final SignificanceHeuristic significanceHeuristic;
5453

55-
public GlobalOrdinalsSignificantTermsAggregator(String name, AggregatorFactories factories,
56-
ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, DocValueFormat format,
57-
BucketCountThresholds bucketCountThresholds, IncludeExclude.OrdinalsFilter includeExclude,
58-
SearchContext context, Aggregator parent,
59-
SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggFactory,
60-
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
61-
54+
public GlobalOrdinalsSignificantTermsAggregator(String name,
55+
AggregatorFactories factories,
56+
ValuesSource.Bytes.WithOrdinals.FieldData valuesSource,
57+
DocValueFormat format,
58+
BucketCountThresholds bucketCountThresholds,
59+
IncludeExclude.OrdinalsFilter includeExclude,
60+
SearchContext context,
61+
Aggregator parent,
62+
boolean forceRemapGlobalOrds,
63+
SignificanceHeuristic significanceHeuristic,
64+
SignificantTermsAggregatorFactory termsAggFactory,
65+
List<PipelineAggregator> pipelineAggregators,
66+
Map<String, Object> metaData) throws IOException {
6267
super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, context, parent,
63-
SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
68+
forceRemapGlobalOrds, SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
6469
this.significanceHeuristic = significanceHeuristic;
6570
this.termsAggFactory = termsAggFactory;
71+
this.numCollectedDocs = 0;
6672
}
6773

6874
@Override
69-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
70-
final LeafBucketCollector sub) throws IOException {
75+
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
7176
return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) {
7277
@Override
7378
public void collect(int doc, long bucket) throws IOException {
@@ -77,18 +82,17 @@ public void collect(int doc, long bucket) throws IOException {
7782
};
7883
}
7984

80-
8185
@Override
8286
public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws IOException {
8387
assert owningBucketOrdinal == 0;
84-
if (globalOrds == null) { // no context in this reader
88+
if (valueCount == 0) { // no context in this reader
8589
return buildEmptyAggregation();
8690
}
8791

8892
final int size;
8993
if (bucketCountThresholds.getMinDocCount() == 0) {
9094
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
91-
size = (int) Math.min(globalOrds.getValueCount(), bucketCountThresholds.getShardSize());
95+
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
9296
} else {
9397
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
9498
}
@@ -97,7 +101,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
97101

98102
BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
99103
SignificantStringTerms.Bucket spare = null;
100-
for (long globalTermOrd = 0; globalTermOrd < globalOrds.getValueCount(); ++globalTermOrd) {
104+
for (long globalTermOrd = 0; globalTermOrd < valueCount; ++globalTermOrd) {
101105
if (includeExclude != null && !acceptedGlobalOrdinals.get(globalTermOrd)) {
102106
continue;
103107
}
@@ -114,7 +118,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
114118
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
115119
}
116120
spare.bucketOrd = bucketOrd;
117-
copy(globalOrds.lookupOrd(globalTermOrd), spare.termBytes);
121+
copy(lookupGlobalOrd.apply(globalTermOrd), spare.termBytes);
118122
spare.subsetDf = bucketDocCount;
119123
spare.subsetSize = subsetSize;
120124
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
@@ -147,62 +151,13 @@ public SignificantStringTerms buildEmptyAggregation() {
147151
IndexReader topReader = searcher.getIndexReader();
148152
int supersetSize = topReader.numDocs();
149153
return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
150-
pipelineAggregators(), metaData(), format, 0, supersetSize, significanceHeuristic, emptyList());
154+
pipelineAggregators(), metaData(), format, numCollectedDocs, supersetSize, significanceHeuristic, emptyList());
151155
}
152156

153157
@Override
154158
protected void doClose() {
159+
super.doClose();
155160
Releasables.close(termsAggFactory);
156161
}
157-
158-
public static class WithHash extends GlobalOrdinalsSignificantTermsAggregator {
159-
160-
private final LongHash bucketOrds;
161-
162-
public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource,
163-
DocValueFormat format, BucketCountThresholds bucketCountThresholds, IncludeExclude.OrdinalsFilter includeExclude,
164-
SearchContext context, Aggregator parent, SignificanceHeuristic significanceHeuristic,
165-
SignificantTermsAggregatorFactory termsAggFactory, List<PipelineAggregator> pipelineAggregators,
166-
Map<String, Object> metaData) throws IOException {
167-
super(name, factories, valuesSource, format, bucketCountThresholds, includeExclude, context, parent, significanceHeuristic,
168-
termsAggFactory, pipelineAggregators, metaData);
169-
bucketOrds = new LongHash(1, context.bigArrays());
170-
}
171-
172-
@Override
173-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
174-
final LeafBucketCollector sub) throws IOException {
175-
return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) {
176-
@Override
177-
public void collect(int doc, long bucket) throws IOException {
178-
assert bucket == 0;
179-
numCollectedDocs++;
180-
globalOrds.setDocument(doc);
181-
final int numOrds = globalOrds.cardinality();
182-
for (int i = 0; i < numOrds; i++) {
183-
final long globalOrd = globalOrds.ordAt(i);
184-
long bucketOrd = bucketOrds.add(globalOrd);
185-
if (bucketOrd < 0) {
186-
bucketOrd = -1 - bucketOrd;
187-
collectExistingBucket(sub, doc, bucketOrd);
188-
} else {
189-
collectBucket(sub, doc, bucketOrd);
190-
}
191-
}
192-
}
193-
};
194-
}
195-
196-
@Override
197-
protected long getBucketOrd(long termOrd) {
198-
return bucketOrds.find(termOrd);
199-
}
200-
201-
@Override
202-
protected void doClose() {
203-
Releasables.close(termsAggFactory, bucketOrds);
204-
}
205-
}
206-
207162
}
208163

core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java

Lines changed: 69 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,17 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
7070
private final TermsAggregator.BucketCountThresholds bucketCountThresholds;
7171
private final SignificanceHeuristic significanceHeuristic;
7272

73-
public SignificantTermsAggregatorFactory(String name, ValuesSourceConfig<ValuesSource> config, IncludeExclude includeExclude,
74-
String executionHint, QueryBuilder filterBuilder, TermsAggregator.BucketCountThresholds bucketCountThresholds,
75-
SignificanceHeuristic significanceHeuristic, SearchContext context, AggregatorFactory<?> parent,
76-
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
73+
public SignificantTermsAggregatorFactory(String name,
74+
ValuesSourceConfig<ValuesSource> config,
75+
IncludeExclude includeExclude,
76+
String executionHint,
77+
QueryBuilder filterBuilder,
78+
TermsAggregator.BucketCountThresholds bucketCountThresholds,
79+
SignificanceHeuristic significanceHeuristic,
80+
SearchContext context,
81+
AggregatorFactory<?> parent,
82+
AggregatorFactories.Builder subFactoriesBuilder,
83+
Map<String, Object> metaData) throws IOException {
7784
super(name, config, context, parent, subFactoriesBuilder, metaData);
7885
this.includeExclude = includeExclude;
7986
this.executionHint = executionHint;
@@ -246,44 +253,71 @@ public enum ExecutionMode {
246253
MAP(new ParseField("map")) {
247254

248255
@Override
249-
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
250-
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
251-
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
252-
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
253-
Map<String, Object> metaData) throws IOException {
256+
Aggregator create(String name,
257+
AggregatorFactories factories,
258+
ValuesSource valuesSource,
259+
DocValueFormat format,
260+
TermsAggregator.BucketCountThresholds bucketCountThresholds,
261+
IncludeExclude includeExclude,
262+
SearchContext aggregationContext,
263+
Aggregator parent,
264+
SignificanceHeuristic significanceHeuristic,
265+
SignificantTermsAggregatorFactory termsAggregatorFactory,
266+
List<PipelineAggregator> pipelineAggregators,
267+
Map<String, Object> metaData) throws IOException {
268+
254269
final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format);
255270
return new SignificantStringTermsAggregator(name, factories, valuesSource, format, bucketCountThresholds, filter,
256271
aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
272+
257273
}
258274

259275
},
260276
GLOBAL_ORDINALS(new ParseField("global_ordinals")) {
261277

262278
@Override
263-
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
264-
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
265-
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
266-
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
267-
Map<String, Object> metaData) throws IOException {
279+
Aggregator create(String name,
280+
AggregatorFactories factories,
281+
ValuesSource valuesSource,
282+
DocValueFormat format,
283+
TermsAggregator.BucketCountThresholds bucketCountThresholds,
284+
IncludeExclude includeExclude,
285+
SearchContext aggregationContext,
286+
Aggregator parent,
287+
SignificanceHeuristic significanceHeuristic,
288+
SignificantTermsAggregatorFactory termsAggregatorFactory,
289+
List<PipelineAggregator> pipelineAggregators,
290+
Map<String, Object> metaData) throws IOException {
291+
268292
final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);
269293
return new GlobalOrdinalsSignificantTermsAggregator(name, factories,
270294
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter,
271-
aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
295+
aggregationContext, parent, false, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
296+
272297
}
273298

274299
},
275300
GLOBAL_ORDINALS_HASH(new ParseField("global_ordinals_hash")) {
276301

277302
@Override
278-
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
279-
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
280-
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
281-
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
282-
Map<String, Object> metaData) throws IOException {
303+
Aggregator create(String name,
304+
AggregatorFactories factories,
305+
ValuesSource valuesSource,
306+
DocValueFormat format,
307+
TermsAggregator.BucketCountThresholds bucketCountThresholds,
308+
IncludeExclude includeExclude,
309+
SearchContext aggregationContext,
310+
Aggregator parent,
311+
SignificanceHeuristic significanceHeuristic,
312+
SignificantTermsAggregatorFactory termsAggregatorFactory,
313+
List<PipelineAggregator> pipelineAggregators,
314+
Map<String, Object> metaData) throws IOException {
315+
283316
final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);
284-
return new GlobalOrdinalsSignificantTermsAggregator.WithHash(name, factories,
285-
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter,
286-
aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
317+
return new GlobalOrdinalsSignificantTermsAggregator(name, factories,
318+
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter, aggregationContext, parent,
319+
true, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
320+
287321
}
288322
};
289323

@@ -302,11 +336,18 @@ public static ExecutionMode fromString(String value) {
302336
this.parseField = parseField;
303337
}
304338

305-
abstract Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
306-
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
307-
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
308-
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
309-
Map<String, Object> metaData) throws IOException;
339+
abstract Aggregator create(String name,
340+
AggregatorFactories factories,
341+
ValuesSource valuesSource,
342+
DocValueFormat format,
343+
TermsAggregator.BucketCountThresholds bucketCountThresholds,
344+
IncludeExclude includeExclude,
345+
SearchContext aggregationContext,
346+
Aggregator parent,
347+
SignificanceHeuristic significanceHeuristic,
348+
SignificantTermsAggregatorFactory termsAggregatorFactory,
349+
List<PipelineAggregator> pipelineAggregators,
350+
Map<String, Object> metaData) throws IOException;
310351

311352
@Override
312353
public String toString() {

0 commit comments

Comments
 (0)