Skip to content

Commit e8e20c9

Browse files
authored
Only execute one final reduction in InternalAutoDateHistogram (#45359)
Because auto-date-histo can perform multiple reductions while merging buckets, we need to ensure that the intermediate reductions are done with a `finalReduce` set to false to prevent Pipeline aggs from generating their output. Once all the buckets have been merged and the output is stable, a mostly-noop reduction can be performed which will allow pipelines to generate their output.
1 parent 18c5376 commit e8e20c9

File tree

3 files changed

+89
-6
lines changed

3 files changed

+89
-6
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,15 +500,24 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
500500
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);
501501

502502
if (reduceContext.isFinalReduce()) {
503+
// Because auto-date-histo can perform multiple reductions while merging buckets, we need to pretend this is
504+
// not the final reduction to prevent pipeline aggs from creating their result early. However we want
505+
// to reuse the multiBucketConsumer so that max_buckets breaker is correctly accounted for
506+
ReduceContext penultimateReduceContext = new ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(),
507+
reduceContext::consumeBucketsAndMaybeBreak, false);
508+
503509
// adding empty buckets if needed
504-
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext);
510+
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, penultimateReduceContext);
505511

506512
// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
507513
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
508-
reducedBucketsResult.roundingInfo, reduceContext);
514+
reducedBucketsResult.roundingInfo, penultimateReduceContext);
509515

510516
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
511-
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
517+
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, penultimateReduceContext);
518+
519+
// Perform the final reduction which will mostly be a no-op, except for pipeline aggs
520+
reducedBucketsResult = performFinalReduce(reducedBucketsResult, penultimateReduceContext);
512521
}
513522

514523
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
@@ -561,6 +570,28 @@ private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets,
561570
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
562571
}
563572

573+
/**
574+
* Execute a final reduction on `reducedBuckets`. This should be called after all the buckets have been
575+
* merged into the appropriate roundings. After the buckets are stable, this method will perform one last
576+
* reduction with finalReduce: true so that Pipeline aggs can generate their output.
577+
*/
578+
private BucketReduceResult performFinalReduce(BucketReduceResult reducedBuckets, ReduceContext reduceContext) {
579+
// We need to create another reduce context, this time setting finalReduce: true. Unlike the prior
580+
// reduce context, we _do not_ want to reuse the multiBucketConsumer from the reduce context.
581+
// We've already generated (and accounted for) all the buckets we will return, this method just triggers
582+
// a final reduction on un-reduced items like pipelines. If we re-use the multiBucketConsumer we would
583+
// over-count the buckets
584+
ReduceContext finalReduceContext = new ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(), true);
585+
586+
List<Bucket> finalBuckets = new ArrayList<>();
587+
for (int i = 0; i < reducedBuckets.buckets.size(); i++) {
588+
finalBuckets.add(reducedBuckets.buckets.get(i).reduce(Collections.singletonList(reducedBuckets.buckets.get(i)),
589+
reducedBuckets.roundingInfo.rounding, finalReduceContext));
590+
}
591+
assert reducedBuckets.buckets.size() == finalBuckets.size();
592+
return new BucketReduceResult(finalBuckets, reducedBuckets.roundingInfo, reducedBuckets.roundingIdx, reducedBuckets.innerInterval);
593+
}
594+
564595
@Override
565596
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
566597
builder.startArray(CommonFields.BUCKETS.getPreferredName());

server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,15 @@
3636
import org.elasticsearch.common.time.DateFormatter;
3737
import org.elasticsearch.index.IndexSettings;
3838
import org.elasticsearch.index.mapper.DateFieldMapper;
39+
import org.elasticsearch.index.mapper.MappedFieldType;
40+
import org.elasticsearch.index.mapper.NumberFieldMapper;
3941
import org.elasticsearch.search.aggregations.AggregationBuilders;
4042
import org.elasticsearch.search.aggregations.AggregatorTestCase;
4143
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
44+
import org.elasticsearch.search.aggregations.metrics.InternalMax;
4245
import org.elasticsearch.search.aggregations.metrics.InternalStats;
46+
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
47+
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
4348
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
4449
import org.hamcrest.Matchers;
4550
import org.junit.Assert;
@@ -58,9 +63,12 @@
5863
import java.util.function.Consumer;
5964
import java.util.stream.Collectors;
6065

66+
import static org.hamcrest.Matchers.equalTo;
67+
6168
public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
6269
private static final String DATE_FIELD = "date";
6370
private static final String INSTANT_FIELD = "instant";
71+
private static final String NUMERIC_FIELD = "numeric";
6472

6573
private static final List<ZonedDateTime> DATES_WITH_TIME = Arrays.asList(
6674
ZonedDateTime.of(2010, 3, 12, 1, 7, 45, 0, ZoneOffset.UTC),
@@ -718,6 +726,35 @@ public void testIntervalSecond() throws IOException {
718726
);
719727
}
720728

729+
public void testWithPipelineReductions() throws IOException {
730+
testSearchAndReduceCase(DEFAULT_QUERY, DATES_WITH_TIME,
731+
aggregation -> aggregation.setNumBuckets(1).field(DATE_FIELD)
732+
.subAggregation(AggregationBuilders.histogram("histo").field(NUMERIC_FIELD).interval(1)
733+
.subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD))
734+
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "max"))),
735+
histogram -> {
736+
assertTrue(AggregationInspectionHelper.hasValue(histogram));
737+
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
738+
assertEquals(1, buckets.size());
739+
740+
Histogram.Bucket bucket = buckets.get(0);
741+
assertEquals("2010-01-01T00:00:00.000Z", bucket.getKeyAsString());
742+
assertEquals(10, bucket.getDocCount());
743+
assertThat(bucket.getAggregations().asList().size(), equalTo(1));
744+
InternalHistogram histo = (InternalHistogram) bucket.getAggregations().asList().get(0);
745+
assertThat(histo.getBuckets().size(), equalTo(10));
746+
for (int i = 0; i < 10; i++) {
747+
assertThat(histo.getBuckets().get(i).key, equalTo((double)i));
748+
assertThat(((InternalMax)histo.getBuckets().get(i).aggregations.get("max")).getValue(), equalTo((double)i));
749+
if (i > 0) {
750+
assertThat(((InternalSimpleValue)histo.getBuckets().get(i).aggregations.get("deriv")).getValue(), equalTo(1.0));
751+
}
752+
}
753+
754+
755+
});
756+
}
757+
721758
private void testSearchCase(final Query query, final List<ZonedDateTime> dataset,
722759
final Consumer<AutoDateHistogramAggregationBuilder> configure,
723760
final Consumer<InternalAutoDateHistogram> verify) throws IOException {
@@ -757,6 +794,7 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis
757794
try (Directory directory = newDirectory()) {
758795
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
759796
final Document document = new Document();
797+
int i = 0;
760798
for (final ZonedDateTime date : dataset) {
761799
if (frequently()) {
762800
indexWriter.commit();
@@ -765,8 +803,10 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis
765803
final long instant = date.toInstant().toEpochMilli();
766804
document.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
767805
document.add(new LongPoint(INSTANT_FIELD, instant));
806+
document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i));
768807
indexWriter.addDocument(document);
769808
document.clear();
809+
i += 1;
770810
}
771811
}
772812

@@ -783,11 +823,19 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis
783823
fieldType.setHasDocValues(true);
784824
fieldType.setName(aggregationBuilder.field());
785825

826+
MappedFieldType instantFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
827+
instantFieldType.setName(INSTANT_FIELD);
828+
instantFieldType.setHasDocValues(true);
829+
830+
MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
831+
numericFieldType.setName(NUMERIC_FIELD);
832+
numericFieldType.setHasDocValues(true);
833+
786834
final InternalAutoDateHistogram histogram;
787835
if (reduced) {
788-
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType);
836+
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType);
789837
} else {
790-
histogram = search(indexSearcher, query, aggregationBuilder, fieldType);
838+
histogram = search(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType);
791839
}
792840
verify.accept(histogram);
793841
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,15 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
5050

5151
private DocValueFormat format;
5252
private RoundingInfo[] roundingInfos;
53+
private int nbBuckets;
5354

5455
@Override
5556
public void setUp() throws Exception {
5657
super.setUp();
58+
// these need to be the same for each new instance created so that {@link #testReduceRandom()}
59+
// has mergeable instances to work with
5760
format = randomNumericDocValueFormat();
61+
nbBuckets = randomNumberOfBuckets();
5862
}
5963

6064
@Override
@@ -64,7 +68,7 @@ protected InternalAutoDateHistogram createTestInstance(String name,
6468
InternalAggregations aggregations) {
6569

6670
roundingInfos = AutoDateHistogramAggregationBuilder.buildRoundings(null, null);
67-
int nbBuckets = randomNumberOfBuckets();
71+
6872
int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1);
6973
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>(nbBuckets);
7074

0 commit comments

Comments
 (0)