Skip to content

Commit fa8e76a

Browse files
authored
Improve reduction of terms aggregations (#61779) (#62028)
Today, the terms aggregation reduces multiple aggregations at once using a map to group same buckets together. This operation can be costly since it requires to lookup every bucket in a global map with no particular order. This commit changes how term buckets are sorted by shards and partial reduces in order to be able to reduce results using a merge-sort strategy. For bwc, results are merged with the legacy code if any of the aggregations use a different sort (if it was returned by a node in prior versions). Relates #51857
1 parent b07b75c commit fa8e76a

24 files changed

+418
-242
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations.bucket;
21+
22+
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
23+
24+
import java.util.Iterator;
25+
26+
public class IteratorAndCurrent<B extends InternalMultiBucketAggregation.InternalBucket> implements Iterator<B> {
27+
private final Iterator<B> iterator;
28+
private B current;
29+
30+
public IteratorAndCurrent(Iterator<B> iterator) {
31+
this.iterator = iterator;
32+
this.current = iterator.next();
33+
}
34+
35+
public Iterator<B> getIterator() {
36+
return iterator;
37+
}
38+
39+
public B current() {
40+
return current;
41+
}
42+
43+
@Override
44+
public boolean hasNext() {
45+
return iterator.hasNext();
46+
}
47+
48+
@Override
49+
public B next() {
50+
return current = iterator.next();
51+
}
52+
}
53+

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

+12-24
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.search.aggregations.InternalAggregations;
3030
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
3131
import org.elasticsearch.search.aggregations.KeyComparable;
32+
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
3233
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
3334
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
3435

@@ -38,7 +39,6 @@
3839
import java.util.ArrayList;
3940
import java.util.Arrays;
4041
import java.util.Collections;
41-
import java.util.Iterator;
4242
import java.util.List;
4343
import java.util.ListIterator;
4444
import java.util.Map;
@@ -270,17 +270,6 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
270270
return new Bucket(prototype.key, prototype.docCount, prototype.format, aggregations);
271271
}
272272

273-
private static class IteratorAndCurrent {
274-
275-
private final Iterator<Bucket> iterator;
276-
private Bucket current;
277-
278-
IteratorAndCurrent(Iterator<Bucket> iterator) {
279-
this.iterator = iterator;
280-
current = iterator.next();
281-
}
282-
}
283-
284273
/**
285274
* This method works almost exactly the same as
286275
* InternalDateHistogram#reduceBuckets(List, ReduceContext), the different
@@ -305,10 +294,10 @@ private BucketReduceResult reduceBuckets(List<InternalAggregation> aggregations,
305294
}
306295
Rounding.Prepared reduceRounding = prepare(reduceRoundingIdx, min, max);
307296

308-
final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
297+
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<IteratorAndCurrent<Bucket>>(aggregations.size()) {
309298
@Override
310-
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
311-
return a.current.key < b.current.key;
299+
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
300+
return a.current().key < b.current().key;
312301
}
313302
};
314303
for (InternalAggregation aggregation : aggregations) {
@@ -322,25 +311,24 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
322311
if (pq.size() > 0) {
323312
// list of buckets coming from different shards that have the same key
324313
List<Bucket> currentBuckets = new ArrayList<>();
325-
long key = reduceRounding.round(pq.top().current.key);
314+
long key = reduceRounding.round(pq.top().current().key);
326315

327316
do {
328-
final IteratorAndCurrent top = pq.top();
317+
final IteratorAndCurrent<Bucket> top = pq.top();
329318

330-
if (reduceRounding.round(top.current.key) != key) {
319+
if (reduceRounding.round(top.current().key) != key) {
331320
// the key changes, reduce what we already buffered and reset the buffer for current buckets
332321
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
333322
reducedBuckets.add(reduced);
334323
currentBuckets.clear();
335-
key = reduceRounding.round(top.current.key);
324+
key = reduceRounding.round(top.current().key);
336325
}
337326

338-
currentBuckets.add(top.current);
327+
currentBuckets.add(top.current());
339328

340-
if (top.iterator.hasNext()) {
341-
final Bucket next = top.iterator.next();
342-
assert next.key > top.current.key : "shards must return data sorted by key";
343-
top.current = next;
329+
if (top.hasNext()) {
330+
top.next();
331+
assert top.current().key > key: "shards must return data sorted by key";
344332
pq.updateTop();
345333
} else {
346334
pq.pop();

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

+12-26
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@
3232
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
3333
import org.elasticsearch.search.aggregations.InternalOrder;
3434
import org.elasticsearch.search.aggregations.KeyComparable;
35+
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
3536
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
3637

3738
import java.io.IOException;
3839
import java.time.Instant;
3940
import java.time.ZoneOffset;
4041
import java.util.ArrayList;
4142
import java.util.Collections;
42-
import java.util.Iterator;
4343
import java.util.List;
4444
import java.util.ListIterator;
4545
import java.util.Map;
@@ -289,24 +289,11 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
289289
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
290290
}
291291

292-
private static class IteratorAndCurrent {
293-
294-
private final Iterator<Bucket> iterator;
295-
private Bucket current;
296-
297-
IteratorAndCurrent(Iterator<Bucket> iterator) {
298-
this.iterator = iterator;
299-
current = iterator.next();
300-
}
301-
302-
}
303-
304292
private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
305-
306-
final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
293+
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<IteratorAndCurrent<Bucket>>(aggregations.size()) {
307294
@Override
308-
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
309-
return a.current.key < b.current.key;
295+
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
296+
return a.current().key < b.current().key;
310297
}
311298
};
312299
for (InternalAggregation aggregation : aggregations) {
@@ -320,27 +307,26 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
320307
if (pq.size() > 0) {
321308
// list of buckets coming from different shards that have the same key
322309
List<Bucket> currentBuckets = new ArrayList<>();
323-
double key = pq.top().current.key;
310+
double key = pq.top().current().key;
324311

325312
do {
326-
final IteratorAndCurrent top = pq.top();
313+
final IteratorAndCurrent<Bucket> top = pq.top();
327314

328-
if (top.current.key != key) {
315+
if (top.current().key != key) {
329316
// the key changes, reduce what we already buffered and reset the buffer for current buckets
330317
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
331318
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
332319
reducedBuckets.add(reduced);
333320
}
334321
currentBuckets.clear();
335-
key = top.current.key;
322+
key = top.current().key;
336323
}
337324

338-
currentBuckets.add(top.current);
325+
currentBuckets.add(top.current());
339326

340-
if (top.iterator.hasNext()) {
341-
final Bucket next = top.iterator.next();
342-
assert next.key > top.current.key : "shards must return data sorted by key";
343-
top.current = next;
327+
if (top.hasNext()) {
328+
top.next();
329+
assert top.current().key > key : "shards must return data sorted by key";
344330
pq.updateTop();
345331
} else {
346332
pq.pop();

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

+12-25
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
3232
import org.elasticsearch.search.aggregations.InternalOrder;
3333
import org.elasticsearch.search.aggregations.KeyComparable;
34+
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
3435
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
3536

3637
import java.io.IOException;
3738
import java.util.ArrayList;
3839
import java.util.Collections;
39-
import java.util.Iterator;
4040
import java.util.List;
4141
import java.util.ListIterator;
4242
import java.util.Map;
@@ -279,24 +279,12 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
279279
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
280280
}
281281

282-
private static class IteratorAndCurrent {
283-
284-
private final Iterator<Bucket> iterator;
285-
private Bucket current;
286-
287-
IteratorAndCurrent(Iterator<Bucket> iterator) {
288-
this.iterator = iterator;
289-
current = iterator.next();
290-
}
291-
292-
}
293-
294282
private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
295283

296-
final PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
284+
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<IteratorAndCurrent<Bucket>>(aggregations.size()) {
297285
@Override
298-
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
299-
return Double.compare(a.current.key, b.current.key) < 0;
286+
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
287+
return Double.compare(a.current().key, b.current().key) < 0;
300288
}
301289
};
302290
for (InternalAggregation aggregation : aggregations) {
@@ -310,28 +298,27 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
310298
if (pq.size() > 0) {
311299
// list of buckets coming from different shards that have the same key
312300
List<Bucket> currentBuckets = new ArrayList<>();
313-
double key = pq.top().current.key;
301+
double key = pq.top().current().key;
314302

315303
do {
316-
final IteratorAndCurrent top = pq.top();
304+
final IteratorAndCurrent<Bucket> top = pq.top();
317305

318-
if (Double.compare(top.current.key, key) != 0) {
306+
if (Double.compare(top.current().key, key) != 0) {
319307
// The key changes, reduce what we already buffered and reset the buffer for current buckets.
320308
// Using Double.compare instead of != to handle NaN correctly.
321309
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
322310
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
323311
reducedBuckets.add(reduced);
324312
}
325313
currentBuckets.clear();
326-
key = top.current.key;
314+
key = top.current().key;
327315
}
328316

329-
currentBuckets.add(top.current);
317+
currentBuckets.add(top.current());
330318

331-
if (top.iterator.hasNext()) {
332-
final Bucket next = top.iterator.next();
333-
assert Double.compare(next.key, top.current.key) > 0 : "shards must return data sorted by key";
334-
top.current = next;
319+
if (top.hasNext()) {
320+
top.next();
321+
assert Double.compare(top.current().key, key) > 0 : "shards must return data sorted by key";
335322
pq.updateTop();
336323
} else {
337324
pq.pop();

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java

+13-25
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@
2929
import org.elasticsearch.search.aggregations.InternalAggregations;
3030
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
3131
import org.elasticsearch.search.aggregations.KeyComparable;
32+
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
3233
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
3334

3435
import java.io.IOException;
3536
import java.util.ArrayList;
3637
import java.util.Collections;
3738
import java.util.Comparator;
38-
import java.util.Iterator;
3939
import java.util.List;
4040
import java.util.Map;
4141
import java.util.Objects;
@@ -317,18 +317,6 @@ public Number nextKey(Number key) {
317317
= */
318318
private double nextKey(double key){ return key + 1; }
319319

320-
private static class IteratorAndCurrent {
321-
322-
private final Iterator<Bucket> iterator;
323-
private Bucket current;
324-
325-
IteratorAndCurrent(Iterator<Bucket> iterator) {
326-
this.iterator = iterator;
327-
current = iterator.next();
328-
}
329-
330-
}
331-
332320
@Override
333321
protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
334322
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
@@ -350,10 +338,10 @@ protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
350338
}
351339

352340
public List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
353-
PriorityQueue<IteratorAndCurrent> pq = new PriorityQueue<IteratorAndCurrent>(aggregations.size()) {
341+
PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<IteratorAndCurrent<Bucket>>(aggregations.size()) {
354342
@Override
355-
protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
356-
return Double.compare(a.current.centroid, b.current.centroid) < 0;
343+
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
344+
return Double.compare(a.current().centroid, b.current().centroid) < 0;
357345
}
358346
};
359347
for (InternalAggregation aggregation : aggregations) {
@@ -365,27 +353,27 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
365353

366354
List<Bucket> reducedBuckets = new ArrayList<>();
367355
if(pq.size() > 0) {
368-
double key = pq.top().current.centroid();
356+
double key = pq.top().current().centroid();
369357
// list of buckets coming from different shards that have the same key
370358
List<Bucket> currentBuckets = new ArrayList<>();
371359
do {
372-
IteratorAndCurrent top = pq.top();
360+
IteratorAndCurrent<Bucket> top = pq.top();
373361

374-
if (Double.compare(top.current.centroid(), key) != 0) {
362+
if (Double.compare(top.current().centroid(), key) != 0) {
375363
// The key changes, reduce what we already buffered and reset the buffer for current buckets.
376364
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
377365
reduceContext.consumeBucketsAndMaybeBreak(1);
378366
reducedBuckets.add(reduced);
379367
currentBuckets.clear();
380-
key = top.current.centroid();
368+
key = top.current().centroid();
381369
}
382370

383-
currentBuckets.add(top.current);
371+
currentBuckets.add(top.current());
384372

385-
if (top.iterator.hasNext()) {
386-
Bucket next = top.iterator.next();
387-
assert next.compareKey(top.current) >= 0 : "shards must return data sorted by centroid";
388-
top.current = next;
373+
if (top.hasNext()) {
374+
Bucket prev = top.current();
375+
top.next();
376+
assert top.current().compareKey(prev) >= 0 : "shards must return data sorted by centroid";
389377
pq.updateTop();
390378
} else {
391379
pq.pop();

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {
4545
}
4646

4747
protected StringTerms buildEmptyTermsAggregation() {
48-
return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
48+
return new StringTerms(name, order, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
4949
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
5050
}
5151

0 commit comments

Comments
 (0)