Skip to content

Only execute one final reduction in InternalAutoDateHistogram #45359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -500,15 +500,24 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);

if (reduceContext.isFinalReduce()) {
// Because auto-date-histo can perform multiple reductions while merging buckets, we need to pretend this is
// not the final reduction to prevent pipeline aggs from creating their result early. However we want
// to reuse the multiBucketConsumer so that max_buckets breaker is correctly accounted for
ReduceContext penultimateReduceContext = new ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(),
reduceContext::consumeBucketsAndMaybeBreak, false);

// adding empty buckets if needed
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext);
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, penultimateReduceContext);

// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
reducedBucketsResult.roundingInfo, reduceContext);
reducedBucketsResult.roundingInfo, penultimateReduceContext);

// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, penultimateReduceContext);

// Perform the final reduction which will mostly be a no-op, except for pipeline aggs
reducedBucketsResult = performFinalReduce(reducedBucketsResult, penultimateReduceContext);
}

BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
Expand Down Expand Up @@ -561,6 +570,28 @@ private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets,
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
}

/**
* Execute a final reduction on `reducedBuckets`. This should be called after all the buckets have been
* merged into the appropriate roundings. After the buckets are stable, this method will perform one last
* reduction with finalReduce: true so that Pipeline aggs can generate their output.
*/
private BucketReduceResult performFinalReduce(BucketReduceResult reducedBuckets, ReduceContext reduceContext) {
// We need to create another reduce context, this time setting finalReduce: true. Unlike the prior
// reduce context, we _do not_ want to reuse the multiBucketConsumer from the reduce context.
// We've already generated (and accounted for) all the buckets we will return, this method just triggers
// a final reduction on un-reduced items like pipelines. If we re-use the multiBucketConsumer we would
// over-count the buckets
ReduceContext finalReduceContext = new ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(), true);

List<Bucket> finalBuckets = new ArrayList<>();
for (int i = 0; i < reducedBuckets.buckets.size(); i++) {
finalBuckets.add(reducedBuckets.buckets.get(i).reduce(Collections.singletonList(reducedBuckets.buckets.get(i)),
reducedBuckets.roundingInfo.rounding, finalReduceContext));
}
assert reducedBuckets.buckets.size() == finalBuckets.size();
return new BucketReduceResult(finalBuckets, reducedBuckets.roundingInfo, reducedBuckets.roundingIdx, reducedBuckets.innerInterval);
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray(CommonFields.BUCKETS.getPreferredName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.InternalStats;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.hamcrest.Matchers;
import org.junit.Assert;
Expand All @@ -58,9 +63,12 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;

public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
private static final String DATE_FIELD = "date";
private static final String INSTANT_FIELD = "instant";
private static final String NUMERIC_FIELD = "numeric";

private static final List<ZonedDateTime> DATES_WITH_TIME = Arrays.asList(
ZonedDateTime.of(2010, 3, 12, 1, 7, 45, 0, ZoneOffset.UTC),
Expand Down Expand Up @@ -718,6 +726,35 @@ public void testIntervalSecond() throws IOException {
);
}

public void testWithPipelineReductions() throws IOException {
testSearchAndReduceCase(DEFAULT_QUERY, DATES_WITH_TIME,
aggregation -> aggregation.setNumBuckets(1).field(DATE_FIELD)
.subAggregation(AggregationBuilders.histogram("histo").field(NUMERIC_FIELD).interval(1)
.subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD))
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "max"))),
histogram -> {
assertTrue(AggregationInspectionHelper.hasValue(histogram));
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
assertEquals(1, buckets.size());

Histogram.Bucket bucket = buckets.get(0);
assertEquals("2010-01-01T00:00:00.000Z", bucket.getKeyAsString());
assertEquals(10, bucket.getDocCount());
assertThat(bucket.getAggregations().asList().size(), equalTo(1));
InternalHistogram histo = (InternalHistogram) bucket.getAggregations().asList().get(0);
assertThat(histo.getBuckets().size(), equalTo(10));
for (int i = 0; i < 10; i++) {
assertThat(histo.getBuckets().get(i).key, equalTo((double)i));
assertThat(((InternalMax)histo.getBuckets().get(i).aggregations.get("max")).getValue(), equalTo((double)i));
if (i > 0) {
assertThat(((InternalSimpleValue)histo.getBuckets().get(i).aggregations.get("deriv")).getValue(), equalTo(1.0));
}
}


});
}

private void testSearchCase(final Query query, final List<ZonedDateTime> dataset,
final Consumer<AutoDateHistogramAggregationBuilder> configure,
final Consumer<InternalAutoDateHistogram> verify) throws IOException {
Expand Down Expand Up @@ -757,6 +794,7 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
final Document document = new Document();
int i = 0;
for (final ZonedDateTime date : dataset) {
if (frequently()) {
indexWriter.commit();
Expand All @@ -765,8 +803,10 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis
final long instant = date.toInstant().toEpochMilli();
document.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
document.add(new LongPoint(INSTANT_FIELD, instant));
document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i));
indexWriter.addDocument(document);
document.clear();
i += 1;
}
}

Expand All @@ -783,11 +823,19 @@ private void executeTestCase(final boolean reduced, final Query query, final Lis
fieldType.setHasDocValues(true);
fieldType.setName(aggregationBuilder.field());

MappedFieldType instantFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
instantFieldType.setName(INSTANT_FIELD);
instantFieldType.setHasDocValues(true);

MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
numericFieldType.setName(NUMERIC_FIELD);
numericFieldType.setHasDocValues(true);

final InternalAutoDateHistogram histogram;
if (reduced) {
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType);
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType);
} else {
histogram = search(indexSearcher, query, aggregationBuilder, fieldType);
histogram = search(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType);
}
verify.accept(histogram);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati

private DocValueFormat format;
private RoundingInfo[] roundingInfos;
private int nbBuckets;

@Override
public void setUp() throws Exception {
super.setUp();
// these need to be the same for each new instance created so that {@link #testReduceRandom()}
// has mergeable instances to work with
format = randomNumericDocValueFormat();
nbBuckets = randomNumberOfBuckets();
}

@Override
Expand All @@ -64,7 +68,7 @@ protected InternalAutoDateHistogram createTestInstance(String name,
InternalAggregations aggregations) {

roundingInfos = AutoDateHistogramAggregationBuilder.buildRoundings(null, null);
int nbBuckets = randomNumberOfBuckets();

int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1);
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>(nbBuckets);

Expand Down