Skip to content

Commit 0a31d51

Browse files
authored
Refactor: Aggs use NOOP leaf collector (backport of #70320) (#70419)
Before this commit, if an aggregator didn't have anything to do in `AggregatorBase#getLeafCollector` it was obligated to throw `CollectionTerminatedException` if there wasn't a parent aggregator, otherwise it was obligated to return `LeafBucketCollector.NOOP`. This seems like something aggregators shouldn't have to do. So this commit changes `getLeafCollector` so aggregators are obligated to return `LeafBucketCollector.NOOP` if they have no work to do. The aggregation framework will throw the exception if its appropriate. Otherwise it'll use the `NOOP` collector. If they have work to do the `LeafBucketCollector`s that they do return may still throw `CollectionTerminatedException` to signal that they are done with the leaf.
1 parent 90a245d commit 0a31d51

File tree

20 files changed

+161
-95
lines changed

20 files changed

+161
-95
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.elasticsearch.action.search;
1010

1111
import org.apache.lucene.index.LeafReaderContext;
12-
import org.apache.lucene.search.CollectionTerminatedException;
1312
import org.apache.lucene.search.ScoreMode;
1413
import org.elasticsearch.ExceptionsHelper;
1514
import org.elasticsearch.action.ActionListener;
@@ -578,7 +577,7 @@ public void close() {}
578577

579578
@Override
580579
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
581-
throw new CollectionTerminatedException();
580+
return LeafBucketCollector.NO_OP_COLLECTOR;
582581
}
583582

584583
@Override

server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void preProcess(SearchContext context) {
4444
}
4545
context.aggregations().aggregators(aggregators);
4646
if (collectors.isEmpty() == false) {
47-
Collector collector = MultiBucketCollector.wrap(collectors);
47+
Collector collector = MultiBucketCollector.wrap(true, collectors);
4848
((BucketCollector)collector).preCollection();
4949
if (context.getProfilers() != null) {
5050
collector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION,
@@ -80,7 +80,7 @@ public void execute(SearchContext context) {
8080

8181
// optimize the global collector based execution
8282
if (globals.isEmpty() == false) {
83-
BucketCollector globalsCollector = MultiBucketCollector.wrap(globals);
83+
BucketCollector globalsCollector = MultiBucketCollector.wrap(false, globals);
8484
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());
8585

8686
try {

server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@ public Map<String, Object> metadata() {
156156
/**
157157
* Get a {@link LeafBucketCollector} for the given ctx, which should
158158
* delegate to the given collector.
159+
* <p>
160+
* {@linkplain Aggregator}s that perform collection independent of the main
161+
* search should collect the provided leaf in their implementation of this
162+
* method and return {@link LeafBucketCollector#NO_OP_COLLECTOR} to signal
163+
* that they don't need to be collected with the main search. We'll remove
164+
* them from the list of collectors.
159165
*/
160166
protected abstract LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException;
161167

@@ -181,8 +187,7 @@ protected void doPreCollection() throws IOException {
181187

182188
@Override
183189
public final void preCollection() throws IOException {
184-
List<BucketCollector> collectors = Arrays.asList(subAggregators);
185-
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
190+
collectableSubAggregators = MultiBucketCollector.wrap(false, Arrays.asList(subAggregators));
186191
doPreCollection();
187192
collectableSubAggregators.preCollection();
188193
}

server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketCollector.java

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,26 @@
2929
* the null ones.
3030
*/
3131
public class MultiBucketCollector extends BucketCollector {
32-
33-
/** See {@link #wrap(Iterable)}. */
34-
public static BucketCollector wrap(BucketCollector... collectors) {
35-
return wrap(Arrays.asList(collectors));
36-
}
37-
3832
/**
3933
* Wraps a list of {@link BucketCollector}s with a {@link MultiBucketCollector}. This
4034
* method works as follows:
4135
* <ul>
4236
* <li>Filters out the {@link BucketCollector#NO_OP_COLLECTOR}s collectors, so they are not used
4337
* during search time.
44-
* <li>If the input contains 1 real collector, it is returned.
38+
* <li>If the input contains 1 real collector we wrap it in a collector that takes
39+
* {@code terminateIfNoop} into account.
4540
* <li>Otherwise the method returns a {@link MultiBucketCollector} which wraps the
4641
* non-{@link BucketCollector#NO_OP_COLLECTOR} collectors.
4742
* </ul>
43+
* @param terminateIfNoop Pass true if {@link #getLeafCollector} should throw
44+
* {@link CollectionTerminatedException} if all leaf collectors are noop. Pass
45+
* false if terminating would break stuff. The top level collection for
46+
* aggregations should pass true here because we want to skip collections if
47+
* all aggregations return NOOP. But when aggregtors themselves call this
48+
* method they chould *generally* pass false here because they have collection
49+
* actions to perform even if their sub-aggregators are NOOPs.
4850
*/
49-
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectors) {
51+
public static BucketCollector wrap(boolean terminateIfNoop, Iterable<? extends BucketCollector> collectors) {
5052
// For the user's convenience, we allow NO_OP collectors to be passed.
5153
// However, to improve performance, these null collectors are found
5254
// and dropped from the array we save for actual collection time.
@@ -68,7 +70,43 @@ public static BucketCollector wrap(Iterable<? extends BucketCollector> collector
6870
break;
6971
}
7072
}
71-
return col;
73+
final BucketCollector collector = col;
74+
// Wrap the collector in one that takes terminateIfNoop into account.
75+
return new BucketCollector() {
76+
@Override
77+
public ScoreMode scoreMode() {
78+
return collector.scoreMode();
79+
}
80+
81+
@Override
82+
public void preCollection() throws IOException {
83+
collector.preCollection();
84+
}
85+
86+
@Override
87+
public void postCollection() throws IOException {
88+
collector.postCollection();
89+
}
90+
91+
@Override
92+
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
93+
try {
94+
LeafBucketCollector leafCollector = collector.getLeafCollector(ctx);
95+
if (false == leafCollector.isNoop()) {
96+
return leafCollector;
97+
}
98+
} catch (CollectionTerminatedException e) {
99+
throw new IllegalStateException(
100+
"getLeafCollector should return a noop collector instead of throw "
101+
+ CollectionTerminatedException.class.getSimpleName(), e
102+
);
103+
}
104+
if (terminateIfNoop) {
105+
throw new CollectionTerminatedException();
106+
}
107+
return LeafBucketCollector.NO_OP_COLLECTOR;
108+
}
109+
};
72110
} else {
73111
BucketCollector[] colls = new BucketCollector[n];
74112
n = 0;
@@ -77,14 +115,16 @@ public static BucketCollector wrap(Iterable<? extends BucketCollector> collector
77115
colls[n++] = c;
78116
}
79117
}
80-
return new MultiBucketCollector(colls);
118+
return new MultiBucketCollector(terminateIfNoop, colls);
81119
}
82120
}
83121

122+
private final boolean terminateIfNoop;
84123
private final boolean cacheScores;
85124
private final BucketCollector[] collectors;
86125

87-
private MultiBucketCollector(BucketCollector... collectors) {
126+
private MultiBucketCollector(boolean terminateIfNoop, BucketCollector... collectors) {
127+
this.terminateIfNoop = terminateIfNoop;
88128
this.collectors = collectors;
89129
int numNeedsScores = 0;
90130
for (Collector collector : collectors) {
@@ -129,26 +169,27 @@ public String toString() {
129169

130170
@Override
131171
public LeafBucketCollector getLeafCollector(LeafReaderContext context) throws IOException {
132-
final List<LeafBucketCollector> leafCollectors = new ArrayList<>();
172+
final List<LeafBucketCollector> leafCollectors = new ArrayList<>(collectors.length);
133173
for (BucketCollector collector : collectors) {
134-
final LeafBucketCollector leafCollector;
135174
try {
136-
leafCollector = collector.getLeafCollector(context);
175+
LeafBucketCollector leafCollector = collector.getLeafCollector(context);
176+
if (false == leafCollector.isNoop()) {
177+
leafCollectors.add(leafCollector);
178+
}
137179
} catch (CollectionTerminatedException e) {
138-
// this leaf collector does not need this segment
139-
continue;
180+
throw new IllegalStateException(
181+
"getLeafCollector should return a noop collector instead of throw "
182+
+ CollectionTerminatedException.class.getSimpleName(),
183+
e
184+
);
140185
}
141-
leafCollectors.add(leafCollector);
142186
}
143187
switch (leafCollectors.size()) {
144188
case 0:
145-
// TODO it's probably safer to return noop and let the caller throw if it wants to
146-
/*
147-
* See MinAggregator which only throws if it has a parent.
148-
* That is because it doesn't want there to ever drop
149-
* to this case and throw, thus skipping calculating the parent.
150-
*/
151-
throw new CollectionTerminatedException();
189+
if (terminateIfNoop) {
190+
throw new CollectionTerminatedException();
191+
}
192+
return LeafBucketCollector.NO_OP_COLLECTOR;
152193
case 1:
153194
return leafCollectors.get(0);
154195
default:

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public ScoreMode scoreMode() {
8585
/** Set the deferred collectors. */
8686
@Override
8787
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
88-
this.collector = MultiBucketCollector.wrap(deferredCollectors);
88+
this.collector = MultiBucketCollector.wrap(true, deferredCollectors);
8989
}
9090

9191
/**

server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ protected void doPreCollection() throws IOException {
5757
deferringCollector.setDeferredCollector(deferredAggregations);
5858
collectors.add(deferringCollector);
5959
}
60-
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
60+
collectableSubAggregators = MultiBucketCollector.wrap(false, collectors);
6161
}
6262

6363
/**

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,7 @@ protected void doClose() {
118118

119119
@Override
120120
protected void doPreCollection() throws IOException {
121-
List<BucketCollector> collectors = Arrays.asList(subAggregators);
122-
deferredCollectors = MultiBucketCollector.wrap(collectors);
121+
deferredCollectors = MultiBucketCollector.wrap(false, Arrays.asList(subAggregators));
123122
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
124123
}
125124

@@ -388,7 +387,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
388387
// Throwing this exception will terminate the execution of the search for this root aggregation,
389388
// see {@link MultiCollector} for more details on how we handle early termination in aggregations.
390389
earlyTerminated = true;
391-
throw new CollectionTerminatedException();
390+
return LeafBucketCollector.NO_OP_COLLECTOR;
392391
} else {
393392
if (fillDocIdSet) {
394393
currentLeaf = ctx;
@@ -399,7 +398,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
399398
// that is after the index sort prefix using the rawAfterKey and we start collecting
400399
// document from there.
401400
processLeafFromQuery(ctx, indexSortPrefix);
402-
throw new CollectionTerminatedException();
401+
return LeafBucketCollector.NO_OP_COLLECTOR;
403402
} else {
404403
final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen));
405404
return new LeafBucketCollector() {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.elasticsearch.search.aggregations.bucket.filter;
1010

1111
import org.apache.lucene.index.LeafReaderContext;
12-
import org.apache.lucene.search.CollectionTerminatedException;
1312
import org.apache.lucene.search.LeafCollector;
1413
import org.apache.lucene.search.Scorable;
1514
import org.apache.lucene.util.Bits;
@@ -394,8 +393,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
394393
} else {
395394
collectSubs(ctx, live, sub);
396395
}
397-
// Throwing this exception is how we communicate to the collection mechanism that we don't need the segment.
398-
throw new CollectionTerminatedException();
396+
return LeafBucketCollector.NO_OP_COLLECTOR;
399397
}
400398

401399
/**

server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public ScoreMode scoreMode() {
7373
/** Set the deferred collectors. */
7474
@Override
7575
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
76-
this.deferred = MultiBucketCollector.wrap(deferredCollectors);
76+
this.deferred = MultiBucketCollector.wrap(true, deferredCollectors);
7777
}
7878

7979
@Override

server/src/main/java/org/elasticsearch/search/aggregations/metrics/MaxAggregator.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.lucene.index.LeafReader;
1111
import org.apache.lucene.index.LeafReaderContext;
1212
import org.apache.lucene.index.PointValues;
13-
import org.apache.lucene.search.CollectionTerminatedException;
1413
import org.apache.lucene.search.ScoreMode;
1514
import org.apache.lucene.util.Bits;
1615
import org.apache.lucene.util.FutureArrays;
@@ -71,12 +70,7 @@ public ScoreMode scoreMode() {
7170
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
7271
final LeafBucketCollector sub) throws IOException {
7372
if (valuesSource == null) {
74-
if (parent != null) {
75-
return LeafBucketCollector.NO_OP_COLLECTOR;
76-
} else {
77-
// we have no parent and the values source is empty so we can skip collecting hits.
78-
throw new CollectionTerminatedException();
79-
}
73+
return LeafBucketCollector.NO_OP_COLLECTOR;
8074
}
8175
if (pointConverter != null) {
8276
Number segMax = findLeafMaxValue(ctx.reader(), pointField, pointConverter);
@@ -90,7 +84,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
9084
max = Math.max(max, segMax.doubleValue());
9185
maxes.set(0, max);
9286
// the maximum value has been extracted, we don't need to collect hits on this segment.
93-
throw new CollectionTerminatedException();
87+
return LeafBucketCollector.NO_OP_COLLECTOR;
9488
}
9589
}
9690
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);

server/src/main/java/org/elasticsearch/search/aggregations/metrics/MinAggregator.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,7 @@ public ScoreMode scoreMode() {
7272
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
7373
final LeafBucketCollector sub) throws IOException {
7474
if (valuesSource == null) {
75-
if (parent == null) {
76-
return LeafBucketCollector.NO_OP_COLLECTOR;
77-
} else {
78-
// we have no parent and the values source is empty so we can skip collecting hits.
79-
throw new CollectionTerminatedException();
80-
}
75+
return LeafBucketCollector.NO_OP_COLLECTOR;
8176
}
8277
if (pointConverter != null) {
8378
Number segMin = findLeafMinValue(ctx.reader(), pointField, pointConverter);
@@ -90,13 +85,12 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
9085
min = Math.min(min, segMin.doubleValue());
9186
mins.set(0, min);
9287
// the minimum value has been extracted, we don't need to collect hits on this segment.
93-
throw new CollectionTerminatedException();
88+
return LeafBucketCollector.NO_OP_COLLECTOR;
9489
}
9590
}
9691
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
9792
final NumericDoubleValues values = MultiValueMode.MIN.select(allValues);
9893
return new LeafBucketCollectorBase(sub, allValues) {
99-
10094
@Override
10195
public void collect(int doc, long bucket) throws IOException {
10296
if (bucket >= mins.size()) {

server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingLeafBucketCollector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,9 @@ public void setScorer(Scorable scorer) throws IOException {
3939
delegate.setScorer(scorer);
4040
}
4141

42+
@Override
43+
public boolean isNoop() {
44+
return delegate.isNoop();
45+
}
46+
4247
}

0 commit comments

Comments
 (0)