Skip to content

Commit 4d97d2c

Browse files
committed
Revert "Only execute one final reduction in InternalAutoDateHistogram (#45359)"
This reverts commit c0ea8a8.
1 parent 2a1f0c7 commit 4d97d2c

File tree

3 files changed

+6
-89
lines changed

3 files changed

+6
-89
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

+3-34
Original file line numberDiff line numberDiff line change
@@ -500,24 +500,15 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
500500
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);
501501

502502
if (reduceContext.isFinalReduce()) {
503-
// Because auto-date-histo can perform multiple reductions while merging buckets, we need to pretend this is
504-
// not the final reduction to prevent pipeline aggs from creating their result early. However we want
505-
// to reuse the multiBucketConsumer so that max_buckets breaker is correctly accounted for
506-
ReduceContext penultimateReduceContext = new ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(),
507-
reduceContext::consumeBucketsAndMaybeBreak, false);
508-
509503
// adding empty buckets if needed
510-
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, penultimateReduceContext);
504+
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext);
511505

512506
// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
513507
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
514-
reducedBucketsResult.roundingInfo, penultimateReduceContext);
508+
reducedBucketsResult.roundingInfo, reduceContext);
515509

516510
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
517-
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, penultimateReduceContext);
518-
519-
// Perform the final reduction which will mostly be a no-op, except for pipeline aggs
520-
reducedBucketsResult = performFinalReduce(reducedBucketsResult, penultimateReduceContext);
511+
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
521512
}
522513

523514
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
@@ -570,28 +561,6 @@ private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets,
570561
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
571562
}
572563

573-
/**
574-
* Execute a final reduction on `reducedBuckets`. This should be called after all the buckets have been
575-
* merged into the appropriate roundings. After the buckets are stable, this method will perform one last
576-
* reduction with finalReduce: true so that Pipeline aggs can generate their output.
577-
*/
578-
private BucketReduceResult performFinalReduce(BucketReduceResult reducedBuckets, ReduceContext reduceContext) {
579-
// We need to create another reduce context, this time setting finalReduce: true. Unlike the prior
580-
// reduce context, we _do not_ want to reuse the multiBucketConsumer from the reduce context.
581-
// We've already generated (and accounted for) all the buckets we will return, this method just triggers
582-
// a final reduction on un-reduced items like pipelines. If we re-use the multiBucketConsumer we would
583-
// over-count the buckets
584-
ReduceContext finalReduceContext = new ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(), true);
585-
586-
List<Bucket> finalBuckets = new ArrayList<>();
587-
for (int i = 0; i < reducedBuckets.buckets.size(); i++) {
588-
finalBuckets.add(reducedBuckets.buckets.get(i).reduce(Collections.singletonList(reducedBuckets.buckets.get(i)),
589-
reducedBuckets.roundingInfo.rounding, finalReduceContext));
590-
}
591-
assert reducedBuckets.buckets.size() == finalBuckets.size();
592-
return new BucketReduceResult(finalBuckets, reducedBuckets.roundingInfo, reducedBuckets.roundingIdx, reducedBuckets.innerInterval);
593-
}
594-
595564
@Override
596565
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
597566
builder.startArray(CommonFields.BUCKETS.getPreferredName());

server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java

+2-50
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,10 @@
3636
import org.elasticsearch.common.time.DateFormatter;
3737
import org.elasticsearch.index.IndexSettings;
3838
import org.elasticsearch.index.mapper.DateFieldMapper;
39-
import org.elasticsearch.index.mapper.MappedFieldType;
40-
import org.elasticsearch.index.mapper.NumberFieldMapper;
4139
import org.elasticsearch.search.aggregations.AggregationBuilders;
4240
import org.elasticsearch.search.aggregations.AggregatorTestCase;
4341
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
44-
import org.elasticsearch.search.aggregations.metrics.InternalMax;
4542
import org.elasticsearch.search.aggregations.metrics.InternalStats;
46-
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
47-
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
4843
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
4944
import org.hamcrest.Matchers;
5045
import org.junit.Assert;
@@ -63,12 +58,9 @@
6358
import java.util.function.Consumer;
6459
import java.util.stream.Collectors;
6560

66-
import static org.hamcrest.Matchers.equalTo;
67-
6861
public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
6962
private static final String DATE_FIELD = "date";
7063
private static final String INSTANT_FIELD = "instant";
71-
private static final String NUMERIC_FIELD = "numeric";
7264

7365
private static final List<ZonedDateTime> DATES_WITH_TIME = Arrays.asList(
7466
ZonedDateTime.of(2010, 3, 12, 1, 7, 45, 0, ZoneOffset.UTC),
@@ -726,35 +718,6 @@ public void testIntervalSecond() throws IOException {
726718
);
727719
}
728720

729-
public void testWithPipelineReductions() throws IOException {
730-
testSearchAndReduceCase(DEFAULT_QUERY, DATES_WITH_TIME,
731-
aggregation -> aggregation.setNumBuckets(1).field(DATE_FIELD)
732-
.subAggregation(AggregationBuilders.histogram("histo").field(NUMERIC_FIELD).interval(1)
733-
.subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD))
734-
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "max"))),
735-
histogram -> {
736-
assertTrue(AggregationInspectionHelper.hasValue(histogram));
737-
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
738-
assertEquals(1, buckets.size());
739-
740-
Histogram.Bucket bucket = buckets.get(0);
741-
assertEquals("2010-01-01T00:00:00.000Z", bucket.getKeyAsString());
742-
assertEquals(10, bucket.getDocCount());
743-
assertThat(bucket.getAggregations().asList().size(), equalTo(1));
744-
InternalHistogram histo = (InternalHistogram) bucket.getAggregations().asList().get(0);
745-
assertThat(histo.getBuckets().size(), equalTo(10));
746-
for (int i = 0; i < 10; i++) {
747-
assertThat(histo.getBuckets().get(i).key, equalTo((double)i));
748-
assertThat(((InternalMax)histo.getBuckets().get(i).aggregations.get("max")).getValue(), equalTo((double)i));
749-
if (i > 0) {
750-
assertThat(((InternalSimpleValue)histo.getBuckets().get(i).aggregations.get("deriv")).getValue(), equalTo(1.0));
751-
}
752-
}
753-
754-
755-
});
756-
}
757-
758721
private void testSearchCase(final Query query, final List<ZonedDateTime> dataset,
759722
final Consumer<AutoDateHistogramAggregationBuilder> configure,
760723
final Consumer<InternalAutoDateHistogram> verify) throws IOException {
@@ -794,7 +757,6 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis
794757
try (Directory directory = newDirectory()) {
795758
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
796759
final Document document = new Document();
797-
int i = 0;
798760
for (final ZonedDateTime date : dataset) {
799761
if (frequently()) {
800762
indexWriter.commit();
@@ -803,10 +765,8 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis
803765
final long instant = date.toInstant().toEpochMilli();
804766
document.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
805767
document.add(new LongPoint(INSTANT_FIELD, instant));
806-
document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i));
807768
indexWriter.addDocument(document);
808769
document.clear();
809-
i += 1;
810770
}
811771
}
812772

@@ -823,19 +783,11 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis
823783
fieldType.setHasDocValues(true);
824784
fieldType.setName(aggregationBuilder.field());
825785

826-
MappedFieldType instantFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
827-
instantFieldType.setName(INSTANT_FIELD);
828-
instantFieldType.setHasDocValues(true);
829-
830-
MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
831-
numericFieldType.setName(NUMERIC_FIELD);
832-
numericFieldType.setHasDocValues(true);
833-
834786
final InternalAutoDateHistogram histogram;
835787
if (reduced) {
836-
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType);
788+
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType);
837789
} else {
838-
histogram = search(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType);
790+
histogram = search(indexSearcher, query, aggregationBuilder, fieldType);
839791
}
840792
verify.accept(histogram);
841793
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,11 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
5050

5151
private DocValueFormat format;
5252
private RoundingInfo[] roundingInfos;
53-
private int nbBuckets;
5453

5554
@Override
5655
public void setUp() throws Exception {
5756
super.setUp();
58-
// these need to be the same for each new instance created so that {@link #testReduceRandom()}
59-
// has mergeable instances to work with
6057
format = randomNumericDocValueFormat();
61-
nbBuckets = randomNumberOfBuckets();
6258
}
6359

6460
@Override
@@ -68,7 +64,7 @@ protected InternalAutoDateHistogram createTestInstance(String name,
6864
InternalAggregations aggregations) {
6965

7066
roundingInfos = AutoDateHistogramAggregationBuilder.buildRoundings(null, null);
71-
67+
int nbBuckets = randomNumberOfBuckets();
7268
int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1);
7369
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>(nbBuckets);
7470

0 commit comments

Comments
 (0)