Skip to content

Small speed up of date_histogram with children #67012

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ public AdaptingAggregator(
* agg tree. Thisis how it has always been and some aggs rely on it.
*/
this.delegate = delegate.apply(subAggregators.fixParent(this));
assert this.delegate.parent() == parent : "invalid parent set on delegate";
/*
* Sometimes the delegateBuilder will return `null` to signal that this
* strategy is not valid.
*/
assert this.delegate == null || this.delegate.parent() == parent : "invalid parent set on delegate";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.IndexOrDocValuesQuery;
import org.apache.lucene.search.IndexSortSortedNumericDocValuesRangeQuery;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.PointRangeQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.Weight;
Expand Down Expand Up @@ -203,19 +205,16 @@ public static FilterByFilter buildFilterOrderOrNull(
if (parent != null) {
return null;
}
if (factories.countAggregators() != 0) {
return null;
}
if (otherBucketKey != null) {
return null;
}
return new FiltersAggregator.FilterByFilter(
name,
factories,
keys,
filters,
keyed,
context,
parent,
cardinality,
metadata
);
Expand Down Expand Up @@ -289,15 +288,15 @@ public static class FilterByFilter extends FiltersAggregator {

private FilterByFilter(
String name,
AggregatorFactories factories,
String[] keys,
Query[] filters,
boolean keyed,
AggregationContext context,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
super(name, AggregatorFactories.EMPTY, keys, keyed, null, context, parent, cardinality, metadata);
super(name, factories, keys, keyed, null, context, null, cardinality, metadata);
this.filters = filters;
this.profiling = context.profiling();
}
Expand Down Expand Up @@ -378,9 +377,26 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
// the filter doesn't match any docs
continue;
}
TotalHitCountCollector collector = new TotalHitCountCollector();
scorer.score(collector, live);
incrementBucketDocCount(filterOrd, collector.getTotalHits());
if (sub == LeafBucketCollector.NO_OP_COLLECTOR) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The equality check feels brittle to me here. I wonder if we should put a method on LeafBucketCollector to return a boolean if it's going to do any work, and check that. Might be premature abstraction on my part though, what do you think?

TotalHitCountCollector collector = new TotalHitCountCollector();
scorer.score(collector, live);
incrementBucketDocCount(filterOrd, collector.getTotalHits());
} else {
/*
* We can use the pre-constructed leaf collected for the first
* filter. But it almost certainly not going to work for the
* second one because it'll try to "go backwards". So we build
* a new one for each subsequent filter.
*/
// NOCOMMIT switch to the block collection mechanism if we have more than a single sub-agg instead of this.
/*
* The switch is better because we can better estimate the costs.
*/
LeafBucketCollector filterLeafCollector = filterOrd == 0 ? sub : collectableSubAggregators.getLeafCollector(ctx);
SubCollector collector = new SubCollector(filterOrd, filterLeafCollector);
scorer.score(collector, live);
incrementBucketDocCount(filterOrd, collector.total);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's possible that this could actually be slower than the standard execution mechanism. I wonder if we need an escape hatch so folks can dodge this mechanism if it proves a bad idea.

Also: there is another possible implementation here that involves collecting a block of matches for each filter and then running all of the children in parallel. I'm not sure if it'll be faster or not. It kind of depends on the speed of iterating the doc values. It is a little more complex so I didn't do it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After taking a couple of days away I think the block matching mechanism is probably better here. Its much simpler to estimate the cost. Also - I'd love to know why the old way is so slow - the block based mechanism feels like it'd be fast and it reads quite similarly to the Compatible mechanism. I think the big difference is that we don't get to join the main query with the filter query. So it can't skip matches effectively. Maybe. I've got to play.

}
// Throwing this exception is how we communicate to the collection mechanism that we don't need the segment.
throw new CollectionTerminatedException();
Expand All @@ -397,6 +413,31 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {
add.accept("estimate_cost_time", estimateCostTime);
}
}

/**
* Adapts filter-by-filter hit collection into sub-aggregations.
*/
private static class SubCollector implements LeafCollector {
private final int filterOrd;
private final LeafBucketCollector sub;
private int total;

SubCollector(int filterOrd, LeafBucketCollector sub) {
this.filterOrd = filterOrd;
this.sub = sub;
}

@Override
public void setScorer(Scorable scorer) throws IOException {
sub.setScorer(scorer);
}

@Override
public void collect(int doc) throws IOException {
total++;
sub.collect(doc, filterOrd);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public static Aggregator build(
/*
* Looks like it'd be more expensive to use the filter-by-filter
* aggregator. Oh well. Snapshot the the filter-by-filter
* aggregator's debug information if we're profiling bececause it
* aggregator's debug information if we're profiling because it
* is useful even if the aggregator isn't.
*/
if (context.profiling()) {
Expand Down Expand Up @@ -356,7 +356,6 @@ public static FromFilters<?> adaptIntoFiltersOrNull(
if (averageDocsPerRange < DOCS_PER_RANGE_TO_USE_FILTERS) {
return null;
}
// TODO bail here for runtime fields. We should check the cost estimates on the Scorer.
if (valuesSourceConfig.fieldType() instanceof DateFieldType
&& ((DateFieldType) valuesSourceConfig.fieldType()).resolution() == Resolution.NANOSECONDS) {
// We don't generate sensible Queries for nanoseconds.
Expand Down Expand Up @@ -393,37 +392,33 @@ public static FromFilters<?> adaptIntoFiltersOrNull(
builder.to(ranges[i].to == Double.POSITIVE_INFINITY ? null : format.format(ranges[i].to)).includeUpper(false);
filters[i] = context.buildQuery(builder);
}
FiltersAggregator.FilterByFilter delegate = FiltersAggregator.buildFilterOrderOrNull(
name,
factories,
keys,
filters,
false,
null,
context,
parent,
cardinality,
metadata
);
if (delegate == null) {
return null;
}
RangeAggregator.FromFilters<?> fromFilters = new RangeAggregator.FromFilters<>(
parent,
factories,
subAggregators -> {
if (subAggregators.countAggregators() > 0) {
throw new IllegalStateException("didn't expect to have a delegate if there are child aggs");
}
return delegate;
},
subAggregators -> FiltersAggregator.buildFilterOrderOrNull(
name,
subAggregators,
keys,
filters,
false,
null,
context,
parent,
cardinality,
metadata
),
valuesSourceConfig.format(),
ranges,
keyed,
rangeFactory,
averageDocsPerRange
);
return fromFilters;
/*
* A null delegate means we weren't able to run the aggregation
* filter by filter so we have to give up and go back to the
* standard range aggregator.
*/
return fromFilters.delegate() == null ? null : fromFilters;
}

public static Aggregator buildWithoutAttemptedToAdaptToFilters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexOrDocValuesQuery;
import org.apache.lucene.search.IndexSearcher;
Expand All @@ -36,6 +38,8 @@
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper.KeywordFieldType;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
Expand All @@ -46,10 +50,14 @@
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator.KeyedFilter;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregatorTests;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -297,6 +305,69 @@ public void testFilterByFilterCost() throws IOException {
);
}

public void testSubAggs() throws IOException {
MappedFieldType dateFt = new DateFieldMapper.DateFieldType(
"test",
true,
false,
false,
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER,
Resolution.MILLISECONDS,
null,
null
);
MappedFieldType intFt = new NumberFieldMapper.NumberFieldType("int", NumberType.INTEGER);
AggregationBuilder builder = new FiltersAggregationBuilder(
"test",
new KeyedFilter("q1", new RangeQueryBuilder("test").from("2010-01-01").to("2010-03-01").includeUpper(false)),
new KeyedFilter("q2", new RangeQueryBuilder("test").from("2020-01-01").to("2020-03-01").includeUpper(false))
).subAggregation(new MaxAggregationBuilder("m").field("int"));
List<List<IndexableField>> docs = new ArrayList<>();
docs.add(
List.of(
new LongPoint("test", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2010-01-02")),
new SortedNumericDocValuesField("int", 100)
)
);
docs.add(
List.of(
new LongPoint("test", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2020-01-02")),
new SortedNumericDocValuesField("int", 5)
)
);
docs.add(
List.of(
new LongPoint("test", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2020-01-03")),
new SortedNumericDocValuesField("int", 10)
)
);
/*
* Shuffle the docs so we collect them in a random order which causes
* bad implementations of filter-by-filter aggregation to fail with
* assertion errors while executing.
*/
Collections.shuffle(docs, random());
testCase(builder, new MatchAllDocsQuery(), iw -> iw.addDocuments(docs), result -> {
InternalFilters filters = (InternalFilters) result;
assertThat(filters.getBuckets(), hasSize(2));

assertThat(filters.getBucketByKey("q1").getDocCount(), equalTo(1L));
InternalMax max = filters.getBucketByKey("q1").getAggregations().get("m");
assertThat(max.getValue(), equalTo(100.0));

assertThat(filters.getBucketByKey("q2").getDocCount(), equalTo(2L));
max = filters.getBucketByKey("q2").getAggregations().get("m");
assertThat(max.getValue(), equalTo(10.0));
}, dateFt, intFt);
withAggregator(builder, new MatchAllDocsQuery(), iw -> iw.addDocuments(docs), (searcher, aggregator) -> {
assertThat(aggregator, instanceOf(FiltersAggregator.FilterByFilter.class));
FiltersAggregator.FilterByFilter filterByFilter = (FiltersAggregator.FilterByFilter) aggregator;
int maxDoc = searcher.getIndexReader().maxDoc();
assertThat(filterByFilter.estimateCost(maxDoc), equalTo(3L));
assertThat(filterByFilter.scorersCached(), equalTo(true));
}, dateFt, intFt);
}

/**
* Check that we don't accidentally find nested documents when the filter
* matches it.
Expand Down