Skip to content

Commit 9c34ff9

Browse files
authored
Decouple pipeline reductions from final agg reduction (elastic#45796)
Historically only two things happened in the final reduction: empty buckets were filled, and pipeline aggs were reduced (since it was the final reduction, this was safe). Usage of the final reduction is growing however. Auto-date-histo might need to perform many reductions on final-reduce to merge down buckets, CCS may need to side-step the final reduction if sending to a different cluster, etc Having pipelines generate their output in the final reduce was convenient, but is becoming increasingly difficult to manage as the rest of the agg framework advances. This commit decouples pipeline aggs from the final reduction by introducing a new "top level" reduce, which should be called at the beginning of the reduce cycle (e.g. from the SearchPhaseController). This will only reduce pipeline aggs on the final reduce after the non-pipeline agg tree has been fully reduced. By separating pipeline reduction into their own set of methods, aggregations are free to use the final reduction for whatever purpose without worrying about generating pipeline results which are non-reducible
1 parent 1a349af commit 9c34ff9

File tree

72 files changed

+223
-122
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+223
-122
lines changed

modules/aggs-matrix-stats/src/main/java/org/elasticsearch/search/aggregations/matrix/stats/InternalMatrixStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ public Object getProperty(List<String> path) {
233233
}
234234

235235
@Override
236-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
236+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
237237
// merge stats across all shards
238238
List<InternalAggregation> aggs = new ArrayList<>(aggregations);
239239
aggs.removeIf(p -> ((InternalMatrixStats)p).stats == null);

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
487487
}
488488
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
489489
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
490-
InternalAggregations.reduce(aggregationsList, reduceContext);
490+
InternalAggregations.topLevelReduce(aggregationsList, reduceContext);
491491
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
492492
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
493493
reducedCompletionSuggestions);
@@ -625,7 +625,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
625625
if (index == bufferSize) {
626626
if (hasAggs) {
627627
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
628-
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
628+
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
629629
Arrays.fill(aggsBuffer, null);
630630
aggsBuffer[0] = reducedAggs;
631631
}

server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ SearchResponse getMergedResponse(Clusters clusters) {
196196
SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats);
197197
setSuggestShardIndex(shards, groupedSuggestions);
198198
Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
199-
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true));
199+
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContextFunction.apply(true));
200200
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
201201
SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
202202
//make failures ordering consistent between ordinary search and CCS by looking at the shard they come from

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,23 +126,25 @@ public String getName() {
126126
return name;
127127
}
128128

129+
/**
130+
* Creates the output from all pipeline aggs that this aggregation is associated with. Should only
131+
* be called after all aggregations have been fully reduced
132+
*/
133+
public InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
134+
assert reduceContext.isFinalReduce();
135+
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
136+
reducedAggs = pipelineAggregator.reduce(reducedAggs, reduceContext);
137+
}
138+
return reducedAggs;
139+
}
140+
129141
/**
130142
* Reduces the given aggregations to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
131143
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
132144
* try reusing an existing instance (typically the first in the given list) to save on redundant object
133145
* construction.
134146
*/
135-
public final InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
136-
InternalAggregation aggResult = doReduce(aggregations, reduceContext);
137-
if (reduceContext.isFinalReduce()) {
138-
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
139-
aggResult = pipelineAggregator.reduce(aggResult, reduceContext);
140-
}
141-
}
142-
return aggResult;
143-
}
144-
145-
public abstract InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
147+
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
146148

147149
/**
148150
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434
import java.util.Map;
3535
import java.util.Objects;
36+
import java.util.stream.Collectors;
3637

3738
/**
3839
* An internal implementation of {@link Aggregations}.
@@ -91,10 +92,47 @@ public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
9192
return topLevelPipelineAggregators;
9293
}
9394

95+
@SuppressWarnings("unchecked")
96+
private List<InternalAggregation> getInternalAggregations() {
97+
return (List<InternalAggregation>) aggregations;
98+
}
99+
100+
/**
101+
* Begin the reduction process. This should be the entry point for the "first" reduction, e.g. called by
102+
* SearchPhaseController or anywhere else that wants to initiate a reduction. It _should not_ be called
103+
* as an intermediate reduction step (e.g. in the middle of an aggregation tree).
104+
*
105+
* This method first reduces the aggregations, and if it is the final reduce, then reduce the pipeline
106+
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
107+
*/
108+
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
109+
InternalAggregations reduced = reduce(aggregationsList, context);
110+
if (reduced == null) {
111+
return null;
112+
}
113+
114+
if (context.isFinalReduce()) {
115+
List<InternalAggregation> reducedInternalAggs = reduced.getInternalAggregations();
116+
reducedInternalAggs = reducedInternalAggs.stream()
117+
.map(agg -> agg.reducePipelines(agg, context))
118+
.collect(Collectors.toList());
119+
120+
List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();
121+
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
122+
InternalAggregation newAgg
123+
= pipelineAggregator.doReduce(new InternalAggregations(reducedInternalAggs), context);
124+
reducedInternalAggs.add(newAgg);
125+
}
126+
return new InternalAggregations(reducedInternalAggs);
127+
}
128+
return reduced;
129+
}
130+
94131
/**
95132
* Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first
96133
* {@link InternalAggregations} object found in the list.
97-
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
134+
* Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled
135+
* separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)}
98136
*/
99137
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
100138
if (aggregationsList.isEmpty()) {
@@ -123,13 +161,6 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
123161
reducedAggregations.add(first.reduce(aggregations, context));
124162
}
125163

126-
if (context.isFinalReduce()) {
127-
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
128-
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context);
129-
reducedAggregations.add(newAgg);
130-
}
131-
return new InternalAggregations(reducedAggregations);
132-
}
133164
return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators);
134165
}
135166
}

server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2727

2828
import java.io.IOException;
29+
import java.util.ArrayList;
2930
import java.util.List;
3031
import java.util.Map;
3132

@@ -73,7 +74,7 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException {
7374
protected abstract B reduceBucket(List<B> buckets, ReduceContext context);
7475

7576
@Override
76-
public abstract List<? extends InternalBucket> getBuckets();
77+
public abstract List<B> getBuckets();
7778

7879
@Override
7980
public Object getProperty(List<String> path) {
@@ -141,6 +142,30 @@ public static int countInnerBucket(Aggregation agg) {
141142
return size;
142143
}
143144

145+
/**
146+
* Unlike {@link InternalAggregation#reducePipelines(InternalAggregation, ReduceContext)}, a multi-bucket
147+
* agg needs to first reduce the buckets (and their parent pipelines) before allowing sibling pipelines
148+
* to materialize
149+
*/
150+
@Override
151+
public final InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
152+
assert reduceContext.isFinalReduce();
153+
List<B> materializedBuckets = reducePipelineBuckets(reduceContext);
154+
return super.reducePipelines(create(materializedBuckets), reduceContext);
155+
}
156+
157+
private List<B> reducePipelineBuckets(ReduceContext reduceContext) {
158+
List<B> reducedBuckets = new ArrayList<>();
159+
for (B bucket : getBuckets()) {
160+
List<InternalAggregation> aggs = new ArrayList<>();
161+
for (Aggregation agg : bucket.getAggregations()) {
162+
aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext));
163+
}
164+
reducedBuckets.add(createBucket(new InternalAggregations(aggs), bucket));
165+
}
166+
return reducedBuckets;
167+
}
168+
144169
public abstract static class InternalBucket implements Bucket, Writeable {
145170

146171
public Object getProperty(String containingAggName, List<String> path) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public InternalSingleBucketAggregation create(InternalAggregations subAggregatio
9797
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);
9898

9999
@Override
100-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
100+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
101101
long docCount = 0L;
102102
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
103103
for (InternalAggregation aggregation : aggregations) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public InternalBucket getBucketByKey(String key) {
181181
}
182182

183183
@Override
184-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
184+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
185185
Map<String, List<InternalBucket>> bucketsMap = new HashMap<>();
186186
for (InternalAggregation aggregation : aggregations) {
187187
InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ int[] getReverseMuls() {
143143
}
144144

145145
@Override
146-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
146+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
147147
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size());
148148
for (InternalAggregation agg : aggregations) {
149149
InternalComposite sortedAgg = (InternalComposite) agg;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public InternalBucket getBucketByKey(String key) {
189189
}
190190

191191
@Override
192-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
192+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
193193
List<List<InternalBucket>> bucketsList = null;
194194
for (InternalAggregation aggregation : aggregations) {
195195
InternalFilters filters = (InternalFilters) aggregation;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public List<InternalGeoGridBucket> getBuckets() {
8181
}
8282

8383
@Override
84-
public InternalGeoGrid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
84+
public InternalGeoGrid reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
8585
LongObjectPagedHashMap<List<InternalGeoGridBucket>> buckets = null;
8686
for (InternalAggregation aggregation : aggregations) {
8787
InternalGeoGrid grid = (InternalGeoGrid) aggregation;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ static int getAppropriateRounding(long minKey, long maxKey, int roundingIdx,
498498
}
499499

500500
@Override
501-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
501+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
502502
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);
503503

504504
if (reduceContext.isFinalReduce()) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
448448
}
449449

450450
@Override
451-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
451+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
452452
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
453453
if (reduceContext.isFinalReduce()) {
454454
if (minDocCount == 0) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
421421
}
422422

423423
@Override
424-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
424+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
425425
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
426426
if (reduceContext.isFinalReduce()) {
427427
if (minDocCount == 0) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
230230
}
231231

232232
@Override
233-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
233+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
234234
reduceContext.consumeBucketsAndMaybeBreak(buckets.size());
235235
long[] docCounts = new long[buckets.size()];
236236
InternalAggregations[][] aggs = new InternalAggregations[buckets.size()][];

server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ public B createBucket(InternalAggregations aggregations, B prototype) {
288288

289289
@SuppressWarnings("unchecked")
290290
@Override
291-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
291+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
292292
reduceContext.consumeBucketsAndMaybeBreak(ranges.size());
293293
List<B>[] rangeList = new List[ranges.size()];
294294
for (int i = 0; i < rangeList.length; ++i) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public String getWriteableName() {
4949
}
5050

5151
@Override
52-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
52+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
5353
return new UnmappedSampler(name, pipelineAggregators(), metaData);
5454
}
5555

server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/GlobalOrdinalsSignificantTermsAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws
126126
}
127127

128128
if (spare == null) {
129-
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
129+
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0);
130130
}
131131
spare.bucketOrd = bucketOrd;
132132
copy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);

server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ public long getSubsetSize() {
105105
return subsetSize;
106106
}
107107

108+
// TODO we should refactor to remove this, since buckets should be immutable after they are generated.
109+
// This can lead to confusing bugs if the bucket is re-created (via createBucket() or similar) without
110+
// the score
108111
void updateScore(SignificanceHeuristic significanceHeuristic) {
109112
score = significanceHeuristic.getScore(subsetDf, subsetSize, supersetDf, supersetSize);
110113
}
@@ -191,7 +194,7 @@ protected final void doWriteTo(StreamOutput out) throws IOException {
191194
public abstract List<B> getBuckets();
192195

193196
@Override
194-
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
197+
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
195198
long globalSubsetSize = 0;
196199
long globalSupersetSize = 0;
197200
// Compute the overall result set size and the corpus size using the

server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,9 @@ static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
4242
long term;
4343

4444
Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations,
45-
DocValueFormat format) {
45+
DocValueFormat format, double score) {
4646
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
4747
this.term = term;
48-
}
49-
50-
Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations,
51-
double score) {
52-
this(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, null);
5348
this.score = score;
5449
}
5550

@@ -134,7 +129,7 @@ public SignificantLongTerms create(List<SignificantLongTerms.Bucket> buckets) {
134129
@Override
135130
public Bucket createBucket(InternalAggregations aggregations, SignificantLongTerms.Bucket prototype) {
136131
return new Bucket(prototype.subsetDf, prototype.subsetSize, prototype.supersetDf, prototype.supersetSize, prototype.term,
137-
aggregations, prototype.format);
132+
aggregations, prototype.format, prototype.score);
138133
}
139134

140135
@Override
@@ -151,6 +146,6 @@ protected Bucket[] createBucketsArray(int size) {
151146
@Override
152147
Bucket createBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize,
153148
InternalAggregations aggregations, SignificantLongTerms.Bucket prototype) {
154-
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, prototype.term, aggregations, format);
149+
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, prototype.term, aggregations, format, prototype.score);
155150
}
156151
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTermsAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public SignificantLongTerms buildAggregation(long owningBucketOrdinal) throws IO
8888
continue;
8989
}
9090
if (spare == null) {
91-
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format);
91+
spare = new SignificantLongTerms.Bucket(0, 0, 0, 0, 0, null, format, 0);
9292
}
9393
spare.term = bucketOrds.get(i);
9494
spare.subsetDf = docCount;

0 commit comments

Comments
 (0)