Skip to content

Speed up date_histogram without children (backport of #63643) #64823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,58 @@ setup:
date:
type: date

- do:
bulk:
index: test_2
refresh: true
body:
- '{"index": {}}'
- '{"date": "2000-01-01"}' # This date is intenationally very far in the past so we end up not being able to use the date_histo -> range -> filters optimization
- '{"index": {}}'
- '{"date": "2000-01-02"}'
- '{"index": {}}'
- '{"date": "2016-02-01"}'
- '{"index": {}}'
- '{"date": "2016-03-01"}'

- do:
search:
index: test_2
body:
size: 0
profile: true
aggs:
histo:
date_histogram:
field: date
calendar_interval: month
- match: { hits.total.value: 4 }
- length: { aggregations.histo.buckets: 195 }
- match: { aggregations.histo.buckets.0.key_as_string: "2000-01-01T00:00:00.000Z" }
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator }
- match: { profile.shards.0.aggregations.0.description: histo }
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 }

---
"date_histogram run as filters profiler":
- skip:
version: " - 7.99.99"
reason: optimization added in 7.11.0, backport pending

- do:
indices.create:
index: test_2
body:
settings:
number_of_replicas: 0
number_of_shards: 1
mappings:
properties:
date:
type: date

- do:
bulk:
index: test_2
Expand Down Expand Up @@ -596,10 +648,13 @@ setup:
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key_as_string: "2016-01-01T00:00:00.000Z" }
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator.FromDateRange }
- match: { profile.shards.0.aggregations.0.description: histo }
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 }
# ultimately this ends up as a filters agg that uses filter by filter collection which is tracked in build_leaf_collector
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 0 }
- match: { profile.shards.0.aggregations.0.debug.delegate: RangeAggregator.FromFilters }
- match: { profile.shards.0.aggregations.0.debug.delegate_debug.delegate: FiltersAggregator.FilterByFilter }
- match: { profile.shards.0.aggregations.0.debug.delegate_debug.delegate_debug.segments_with_deleted_docs: 0 }

---
"histogram with hard bounds":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,5 +593,10 @@ public ScoreMode scoreMode() {

@Override
public void preCollection() throws IOException {}

@Override
public Aggregator[] subAggregators() {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down
23 changes: 23 additions & 0 deletions server/src/main/java/org/elasticsearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ public interface Prepared {
* next rounded value in specified units if possible.
*/
double roundingSize(long utcMillis, DateTimeUnit timeUnit);
/**
* If this rounding mechanism precalculates rounding points then
* this array stores dates such that each date between each entry.
* if the rounding mechanism doesn't precalculate points then this
* is {@code null}.
*/
long[] fixedRoundingPoints();
}
/**
* Prepare to round many times.
Expand Down Expand Up @@ -436,6 +443,11 @@ protected Prepared maybeUseArray(long minUtcMillis, long maxUtcMillis, int max)
}
return new ArrayRounding(values, i, this);
}

@Override
public long[] fixedRoundingPoints() {
return null;
}
}

static class TimeUnitRounding extends Rounding {
Expand Down Expand Up @@ -1267,6 +1279,12 @@ public long nextRoundingValue(long utcMillis) {
public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
return delegatePrepared.roundingSize(utcMillis, timeUnit);
}

@Override
public long[] fixedRoundingPoints() {
// TODO we can likely translate here
return null;
}
};
}

Expand Down Expand Up @@ -1349,5 +1367,10 @@ public long nextRoundingValue(long utcMillis) {
public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
return delegate.roundingSize(utcMillis, timeUnit);
}

@Override
public long[] fixedRoundingPoints() {
return Arrays.copyOf(values, max);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.search.profile.aggregation.InternalAggregationProfileTree;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

/**
* An {@linkplain Aggregator} that delegates collection to another
* {@linkplain Aggregator} and then translates its results into the results
* you'd expect from another aggregation.
*/
public abstract class AdaptingAggregator extends Aggregator {
private final Aggregator parent;
private final Aggregator delegate;

public AdaptingAggregator(
Aggregator parent,
AggregatorFactories subAggregators,
CheckedFunction<AggregatorFactories, Aggregator, IOException> delegate
) throws IOException {
// Its important we set parent first or else when we build the sub-aggregators they can fail because they'll call this.parent.
this.parent = parent;
/*
* Lock the parent of the sub-aggregators to *this* instead of to
* the delegate. This keeps the parent link shaped like the requested
* agg tree. Thisis how it has always been and some aggs rely on it.
*/
this.delegate = delegate.apply(subAggregators.fixParent(this));
assert this.delegate.parent() == parent : "invalid parent set on delegate";
}

/**
* Adapt the result from the collecting {@linkplain Aggregator} into the
* result expected by this {@linkplain Aggregator}.
*/
protected abstract InternalAggregation adapt(InternalAggregation delegateResult);

@Override
public final void close() {
delegate.close();
}

@Override
public final ScoreMode scoreMode() {
return delegate.scoreMode();
}

@Override
public final String name() {
return delegate.name();
}

@Override
public final Aggregator parent() {
return parent;
}

@Override
public final Aggregator subAggregator(String name) {
return delegate.subAggregator(name);
}

@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
return delegate.getLeafCollector(ctx);
}

@Override
public final void preCollection() throws IOException {
delegate.preCollection();
}

@Override
public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] delegateResults = delegate.buildAggregations(owningBucketOrds);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = adapt(delegateResults[ordIdx]);
}
return result;
}

@Override
public final InternalAggregation buildEmptyAggregation() {
return adapt(delegate.buildEmptyAggregation());
}

@Override
public final Aggregator[] subAggregators() {
return delegate.subAggregators();
}

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("delegate", InternalAggregationProfileTree.typeFromAggregator(delegate));
Map<String, Object> delegateDebug = new HashMap<>();
delegate.collectDebugInfo(delegateDebug::put);
add.accept("delegate_debug", delegateDebug);
}

public Aggregator delegate() {
return delegate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public final InternalAggregation buildTopLevel() throws IOException {
*/
public void collectDebugInfo(BiConsumer<String, Object> add) {}

/**
* Get the aggregators running under this one.
*/
public abstract Aggregator[] subAggregators();

/** Aggregation mode for sub aggregations. */
public enum SubAggCollectionMode implements Writeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ public Aggregator parent() {
return parent;
}

@Override
public Aggregator[] subAggregators() {
return subAggregators;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,27 @@ public int countAggregators() {
return factories.length;
}

/**
* This returns a copy of {@link AggregatorFactories} modified so that
* calls to {@link #createSubAggregators} will ignore the provided parent
* aggregator and always use {@code fixedParent} provided in to this
* method.
* <p>
* {@link AdaptingAggregator} uses this to make sure that sub-aggregators
* get the {@link AdaptingAggregator} aggregator itself as the parent.
*/
public AggregatorFactories fixParent(Aggregator fixedParent) {
AggregatorFactories previous = this;
return new AggregatorFactories(factories) {
@Override
public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent, CardinalityUpperBound cardinality)
throws IOException {
// Note that we're throwing out the "parent" passed in to this method and using the parent passed to fixParent
return previous.createSubAggregators(searchContext, fixedParent, cardinality);
}
};
}

/**
* A mutable collection of {@link AggregationBuilder}s and
* {@link PipelineAggregationBuilder}s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public Aggregator resolveSortPath(PathElement next, Iterator<PathElement> path)
public BucketComparator bucketComparator(String key, SortOrder order) {
throw new UnsupportedOperationException("Can't sort on deferred aggregations");
}

@Override
public Aggregator[] subAggregators() {
return in.subAggregators();
}
}

}
Loading