Skip to content

Commit 902a3e1

Browse files
committed
Histogram aggs: add empty buckets only in the final reduce step (#35921)
Empty buckets don't need to be added when performing an incremental reduction step, they can be added later in the final reduction step. This will allow us to later remove the max buckets limit when performing non final reduction.
1 parent 4428e9b commit 902a3e1

File tree

3 files changed

+48
-44
lines changed

3 files changed

+48
-44
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -444,26 +444,22 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
444444
@Override
445445
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
446446
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
447-
448-
// adding empty buckets if needed
449-
if (minDocCount == 0) {
450-
addEmptyBuckets(reducedBuckets, reduceContext);
451-
}
452-
453-
if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) {
454-
// nothing to do, data are already sorted since shards return
455-
// sorted buckets and the merge-sort performed by reduceBuckets
456-
// maintains order
457-
} else if (InternalOrder.isKeyDesc(order)) {
458-
// we just need to reverse here...
459-
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
460-
Collections.reverse(reverse);
461-
reducedBuckets = reverse;
462-
} else {
463-
// sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort
464-
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
447+
if (reduceContext.isFinalReduce()) {
448+
if (minDocCount == 0) {
449+
addEmptyBuckets(reducedBuckets, reduceContext);
450+
}
451+
if (InternalOrder.isKeyDesc(order)) {
452+
// we just need to reverse here...
453+
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
454+
Collections.reverse(reverse);
455+
reducedBuckets = reverse;
456+
} else if (InternalOrder.isKeyAsc(order) == false){
457+
// nothing to do when sorting by key ascending, as data is already sorted since shards return
458+
// sorted buckets and the merge-sort performed by reduceBuckets maintains order.
459+
// otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort
460+
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
461+
}
465462
}
466-
467463
return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,
468464
format, keyed, pipelineAggregators(), getMetaData());
469465
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -421,26 +421,22 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
421421
@Override
422422
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
423423
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
424-
425-
// adding empty buckets if needed
426-
if (minDocCount == 0) {
427-
addEmptyBuckets(reducedBuckets, reduceContext);
428-
}
429-
430-
if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) {
431-
// nothing to do, data are already sorted since shards return
432-
// sorted buckets and the merge-sort performed by reduceBuckets
433-
// maintains order
434-
} else if (InternalOrder.isKeyDesc(order)) {
435-
// we just need to reverse here...
436-
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
437-
Collections.reverse(reverse);
438-
reducedBuckets = reverse;
439-
} else {
440-
// sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort
441-
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
424+
if (reduceContext.isFinalReduce()) {
425+
if (minDocCount == 0) {
426+
addEmptyBuckets(reducedBuckets, reduceContext);
427+
}
428+
if (InternalOrder.isKeyDesc(order)) {
429+
// we just need to reverse here...
430+
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
431+
Collections.reverse(reverse);
432+
reducedBuckets = reverse;
433+
} else if (InternalOrder.isKeyAsc(order) == false){
434+
// nothing to do when sorting by key ascending, as data is already sorted since shards return
435+
// sorted buckets and the merge-sort performed by reduceBuckets maintains order.
436+
// otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort
437+
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
438+
}
442439
}
443-
444440
return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, pipelineAggregators(),
445441
getMetaData());
446442
}

test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@
151151
import static org.elasticsearch.test.XContentTestUtils.insertRandomFields;
152152
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
153153
import static org.hamcrest.Matchers.equalTo;
154+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
154155

155156
public abstract class InternalAggregationTestCase<T extends InternalAggregation> extends AbstractWireSerializingTestCase<T> {
156157
public static final int DEFAULT_MAX_BUCKETS = 100000;
@@ -267,7 +268,14 @@ public void testReduceRandom() {
267268
new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false);
268269
@SuppressWarnings("unchecked")
269270
T reduced = (T) inputs.get(0).reduce(internalAggregations, context);
270-
assertMultiBucketConsumer(reduced, bucketConsumer);
271+
int initialBucketCount = 0;
272+
for (InternalAggregation internalAggregation : internalAggregations) {
273+
initialBucketCount += countInnerBucket(internalAggregation);
274+
}
275+
int reducedBucketCount = countInnerBucket(reduced);
276+
//check that non final reduction never adds buckets
277+
assertThat(reducedBucketCount, lessThanOrEqualTo(initialBucketCount));
278+
assertMultiBucketConsumer(reducedBucketCount, bucketConsumer);
271279
toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize));
272280
toReduce.add(reduced);
273281
}
@@ -332,14 +340,14 @@ protected NamedXContentRegistry xContentRegistry() {
332340

333341
public final void testFromXContent() throws IOException {
334342
final T aggregation = createTestInstance();
335-
final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false);
336-
assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation);
343+
final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false);
344+
assertFromXContent(aggregation, parsedAggregation);
337345
}
338346

339347
public final void testFromXContentWithRandomFields() throws IOException {
340348
final T aggregation = createTestInstance();
341-
final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true);
342-
assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation);
349+
final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true);
350+
assertFromXContent(aggregation, parsedAggregation);
343351
}
344352

345353
protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException;
@@ -423,6 +431,10 @@ protected static DocValueFormat randomNumericDocValueFormat() {
423431
}
424432

425433
public static void assertMultiBucketConsumer(Aggregation agg, MultiBucketConsumer bucketConsumer) {
426-
assertThat(bucketConsumer.getCount(), equalTo(countInnerBucket(agg)));
434+
assertMultiBucketConsumer(countInnerBucket(agg), bucketConsumer);
435+
}
436+
437+
private static void assertMultiBucketConsumer(int innerBucketCount, MultiBucketConsumer bucketConsumer) {
438+
assertThat(bucketConsumer.getCount(), equalTo(innerBucketCount));
427439
}
428440
}

0 commit comments

Comments
 (0)