Skip to content

Commit ce7195d

Browse files
authored
Terms aggregation should remap global ordinal buckets when a sub-aggregator is used to sort the terms (#24941)
`terms` aggregations at the root level use the `global_ordinals` execution hint by default. When all sub-aggregators can be run in `breadth_first` mode the collected buckets for these sub-aggs are dense (remapped after the initial pruning). But if a sub-aggregator is not deferrable and needs to collect all buckets before pruning we don't remap global ords and the aggregator needs to deal with sparse buckets. Most (if not all) aggregators expect dense buckets and uses this information to allocate memories. This change forces the remap of the global ordinals but only when there is at least one sub-aggregator that cannot be deferred. Relates #24788
1 parent 2a6e686 commit ce7195d

File tree

8 files changed

+354
-286
lines changed

8 files changed

+354
-286
lines changed

core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java

Lines changed: 22 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -53,22 +53,28 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
5353
protected final SignificantTermsAggregatorFactory termsAggFactory;
5454
private final SignificanceHeuristic significanceHeuristic;
5555

56-
public GlobalOrdinalsSignificantTermsAggregator(String name, AggregatorFactories factories,
57-
ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, DocValueFormat format,
58-
BucketCountThresholds bucketCountThresholds, IncludeExclude.OrdinalsFilter includeExclude,
59-
SearchContext context, Aggregator parent,
60-
SignificanceHeuristic significanceHeuristic, SignificantTermsAggregatorFactory termsAggFactory,
61-
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
62-
56+
public GlobalOrdinalsSignificantTermsAggregator(String name,
57+
AggregatorFactories factories,
58+
ValuesSource.Bytes.WithOrdinals.FieldData valuesSource,
59+
DocValueFormat format,
60+
BucketCountThresholds bucketCountThresholds,
61+
IncludeExclude.OrdinalsFilter includeExclude,
62+
SearchContext context,
63+
Aggregator parent,
64+
boolean forceRemapGlobalOrds,
65+
SignificanceHeuristic significanceHeuristic,
66+
SignificantTermsAggregatorFactory termsAggFactory,
67+
List<PipelineAggregator> pipelineAggregators,
68+
Map<String, Object> metaData) throws IOException {
6369
super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, context, parent,
64-
SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
70+
forceRemapGlobalOrds, SubAggCollectionMode.DEPTH_FIRST, false, pipelineAggregators, metaData);
6571
this.significanceHeuristic = significanceHeuristic;
6672
this.termsAggFactory = termsAggFactory;
73+
this.numCollectedDocs = 0;
6774
}
6875

6976
@Override
70-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
71-
final LeafBucketCollector sub) throws IOException {
77+
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
7278
return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) {
7379
@Override
7480
public void collect(int doc, long bucket) throws IOException {
@@ -78,18 +84,17 @@ public void collect(int doc, long bucket) throws IOException {
7884
};
7985
}
8086

81-
8287
@Override
8388
public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws IOException {
8489
assert owningBucketOrdinal == 0;
85-
if (globalOrds == null) { // no context in this reader
90+
if (valueCount == 0) { // no context in this reader
8691
return buildEmptyAggregation();
8792
}
8893

8994
final int size;
9095
if (bucketCountThresholds.getMinDocCount() == 0) {
9196
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
92-
size = (int) Math.min(globalOrds.getValueCount(), bucketCountThresholds.getShardSize());
97+
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
9398
} else {
9499
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
95100
}
@@ -98,7 +103,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
98103

99104
BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
100105
SignificantStringTerms.Bucket spare = null;
101-
for (long globalTermOrd = 0; globalTermOrd < globalOrds.getValueCount(); ++globalTermOrd) {
106+
for (long globalTermOrd = 0; globalTermOrd < valueCount; ++globalTermOrd) {
102107
if (includeExclude != null && !acceptedGlobalOrdinals.get(globalTermOrd)) {
103108
continue;
104109
}
@@ -115,7 +120,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
115120
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
116121
}
117122
spare.bucketOrd = bucketOrd;
118-
copy(globalOrds.lookupOrd(globalTermOrd), spare.termBytes);
123+
copy(lookupGlobalOrd.apply(globalTermOrd), spare.termBytes);
119124
spare.subsetDf = bucketDocCount;
120125
spare.subsetSize = subsetSize;
121126
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
@@ -148,63 +153,13 @@ public SignificantStringTerms buildEmptyAggregation() {
148153
IndexReader topReader = searcher.getIndexReader();
149154
int supersetSize = topReader.numDocs();
150155
return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
151-
pipelineAggregators(), metaData(), format, 0, supersetSize, significanceHeuristic, emptyList());
156+
pipelineAggregators(), metaData(), format, numCollectedDocs, supersetSize, significanceHeuristic, emptyList());
152157
}
153158

154159
@Override
155160
protected void doClose() {
161+
super.doClose();
156162
Releasables.close(termsAggFactory);
157163
}
158-
159-
public static class WithHash extends GlobalOrdinalsSignificantTermsAggregator {
160-
161-
private final LongHash bucketOrds;
162-
163-
public WithHash(String name, AggregatorFactories factories, ValuesSource.Bytes.WithOrdinals.FieldData valuesSource,
164-
DocValueFormat format, BucketCountThresholds bucketCountThresholds, IncludeExclude.OrdinalsFilter includeExclude,
165-
SearchContext context, Aggregator parent, SignificanceHeuristic significanceHeuristic,
166-
SignificantTermsAggregatorFactory termsAggFactory, List<PipelineAggregator> pipelineAggregators,
167-
Map<String, Object> metaData) throws IOException {
168-
super(name, factories, valuesSource, format, bucketCountThresholds, includeExclude, context, parent, significanceHeuristic,
169-
termsAggFactory, pipelineAggregators, metaData);
170-
bucketOrds = new LongHash(1, context.bigArrays());
171-
}
172-
173-
@Override
174-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
175-
final LeafBucketCollector sub) throws IOException {
176-
return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) {
177-
@Override
178-
public void collect(int doc, long bucket) throws IOException {
179-
assert bucket == 0;
180-
numCollectedDocs++;
181-
if (globalOrds.advanceExact(doc)) {
182-
for (long globalOrd = globalOrds.nextOrd();
183-
globalOrd != SortedSetDocValues.NO_MORE_ORDS;
184-
globalOrd = globalOrds.nextOrd()) {
185-
long bucketOrd = bucketOrds.add(globalOrd);
186-
if (bucketOrd < 0) {
187-
bucketOrd = -1 - bucketOrd;
188-
collectExistingBucket(sub, doc, bucketOrd);
189-
} else {
190-
collectBucket(sub, doc, bucketOrd);
191-
}
192-
}
193-
}
194-
}
195-
};
196-
}
197-
198-
@Override
199-
protected long getBucketOrd(long termOrd) {
200-
return bucketOrds.find(termOrd);
201-
}
202-
203-
@Override
204-
protected void doClose() {
205-
Releasables.close(termsAggFactory, bucketOrds);
206-
}
207-
}
208-
209164
}
210165

core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java

Lines changed: 69 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,17 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
7070
private final TermsAggregator.BucketCountThresholds bucketCountThresholds;
7171
private final SignificanceHeuristic significanceHeuristic;
7272

73-
public SignificantTermsAggregatorFactory(String name, ValuesSourceConfig<ValuesSource> config, IncludeExclude includeExclude,
74-
String executionHint, QueryBuilder filterBuilder, TermsAggregator.BucketCountThresholds bucketCountThresholds,
75-
SignificanceHeuristic significanceHeuristic, SearchContext context, AggregatorFactory<?> parent,
76-
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
73+
public SignificantTermsAggregatorFactory(String name,
74+
ValuesSourceConfig<ValuesSource> config,
75+
IncludeExclude includeExclude,
76+
String executionHint,
77+
QueryBuilder filterBuilder,
78+
TermsAggregator.BucketCountThresholds bucketCountThresholds,
79+
SignificanceHeuristic significanceHeuristic,
80+
SearchContext context,
81+
AggregatorFactory<?> parent,
82+
AggregatorFactories.Builder subFactoriesBuilder,
83+
Map<String, Object> metaData) throws IOException {
7784
super(name, config, context, parent, subFactoriesBuilder, metaData);
7885
this.includeExclude = includeExclude;
7986
this.executionHint = executionHint;
@@ -246,44 +253,71 @@ public enum ExecutionMode {
246253
MAP(new ParseField("map")) {
247254

248255
@Override
249-
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
250-
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
251-
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
252-
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
253-
Map<String, Object> metaData) throws IOException {
256+
Aggregator create(String name,
257+
AggregatorFactories factories,
258+
ValuesSource valuesSource,
259+
DocValueFormat format,
260+
TermsAggregator.BucketCountThresholds bucketCountThresholds,
261+
IncludeExclude includeExclude,
262+
SearchContext aggregationContext,
263+
Aggregator parent,
264+
SignificanceHeuristic significanceHeuristic,
265+
SignificantTermsAggregatorFactory termsAggregatorFactory,
266+
List<PipelineAggregator> pipelineAggregators,
267+
Map<String, Object> metaData) throws IOException {
268+
254269
final IncludeExclude.StringFilter filter = includeExclude == null ? null : includeExclude.convertToStringFilter(format);
255270
return new SignificantStringTermsAggregator(name, factories, valuesSource, format, bucketCountThresholds, filter,
256271
aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
272+
257273
}
258274

259275
},
260276
GLOBAL_ORDINALS(new ParseField("global_ordinals")) {
261277

262278
@Override
263-
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
264-
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
265-
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
266-
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
267-
Map<String, Object> metaData) throws IOException {
279+
Aggregator create(String name,
280+
AggregatorFactories factories,
281+
ValuesSource valuesSource,
282+
DocValueFormat format,
283+
TermsAggregator.BucketCountThresholds bucketCountThresholds,
284+
IncludeExclude includeExclude,
285+
SearchContext aggregationContext,
286+
Aggregator parent,
287+
SignificanceHeuristic significanceHeuristic,
288+
SignificantTermsAggregatorFactory termsAggregatorFactory,
289+
List<PipelineAggregator> pipelineAggregators,
290+
Map<String, Object> metaData) throws IOException {
291+
268292
final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);
269293
return new GlobalOrdinalsSignificantTermsAggregator(name, factories,
270294
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter,
271-
aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
295+
aggregationContext, parent, false, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
296+
272297
}
273298

274299
},
275300
GLOBAL_ORDINALS_HASH(new ParseField("global_ordinals_hash")) {
276301

277302
@Override
278-
Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
279-
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
280-
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
281-
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
282-
Map<String, Object> metaData) throws IOException {
303+
Aggregator create(String name,
304+
AggregatorFactories factories,
305+
ValuesSource valuesSource,
306+
DocValueFormat format,
307+
TermsAggregator.BucketCountThresholds bucketCountThresholds,
308+
IncludeExclude includeExclude,
309+
SearchContext aggregationContext,
310+
Aggregator parent,
311+
SignificanceHeuristic significanceHeuristic,
312+
SignificantTermsAggregatorFactory termsAggregatorFactory,
313+
List<PipelineAggregator> pipelineAggregators,
314+
Map<String, Object> metaData) throws IOException {
315+
283316
final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);
284-
return new GlobalOrdinalsSignificantTermsAggregator.WithHash(name, factories,
285-
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter,
286-
aggregationContext, parent, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
317+
return new GlobalOrdinalsSignificantTermsAggregator(name, factories,
318+
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter, aggregationContext, parent,
319+
true, significanceHeuristic, termsAggregatorFactory, pipelineAggregators, metaData);
320+
287321
}
288322
};
289323

@@ -302,11 +336,18 @@ public static ExecutionMode fromString(String value) {
302336
this.parseField = parseField;
303337
}
304338

305-
abstract Aggregator create(String name, AggregatorFactories factories, ValuesSource valuesSource, DocValueFormat format,
306-
TermsAggregator.BucketCountThresholds bucketCountThresholds, IncludeExclude includeExclude,
307-
SearchContext aggregationContext, Aggregator parent, SignificanceHeuristic significanceHeuristic,
308-
SignificantTermsAggregatorFactory termsAggregatorFactory, List<PipelineAggregator> pipelineAggregators,
309-
Map<String, Object> metaData) throws IOException;
339+
abstract Aggregator create(String name,
340+
AggregatorFactories factories,
341+
ValuesSource valuesSource,
342+
DocValueFormat format,
343+
TermsAggregator.BucketCountThresholds bucketCountThresholds,
344+
IncludeExclude includeExclude,
345+
SearchContext aggregationContext,
346+
Aggregator parent,
347+
SignificanceHeuristic significanceHeuristic,
348+
SignificantTermsAggregatorFactory termsAggregatorFactory,
349+
List<PipelineAggregator> pipelineAggregators,
350+
Map<String, Object> metaData) throws IOException;
310351

311352
@Override
312353
public String toString() {

0 commit comments

Comments
 (0)