diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java index 832ecce4ed7eb..932cc8190f82f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java @@ -51,7 +51,11 @@ public AdaptingAggregator( * agg tree. Thisis how it has always been and some aggs rely on it. */ this.delegate = delegate.apply(subAggregators.fixParent(this)); - assert this.delegate.parent() == parent : "invalid parent set on delegate"; + /* + * Sometimes the delegateBuilder will return `null` to signal that this + * strategy is not valid. + */ + assert this.delegate == null || this.delegate.parent() == parent : "invalid parent set on delegate"; } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java index e4600bd373d88..e8c4be604144b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java @@ -26,9 +26,11 @@ import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.IndexSortSortedNumericDocValuesRangeQuery; +import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.PointRangeQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.Weight; @@ -203,19 +205,16 @@ public static FilterByFilter buildFilterOrderOrNull( if (parent != null) { return null; } - if (factories.countAggregators() != 0) { - return null; - } if (otherBucketKey != null) { return null; } return new FiltersAggregator.FilterByFilter( name, + factories, keys, filters, keyed, context, - parent, cardinality, metadata ); @@ -289,15 +288,15 @@ public static class FilterByFilter extends FiltersAggregator { private FilterByFilter( String name, + AggregatorFactories factories, String[] keys, Query[] filters, boolean keyed, AggregationContext context, - Aggregator parent, CardinalityUpperBound cardinality, Map metadata ) throws IOException { - super(name, AggregatorFactories.EMPTY, keys, keyed, null, context, parent, cardinality, metadata); + super(name, factories, keys, keyed, null, context, null, cardinality, metadata); this.filters = filters; this.profiling = context.profiling(); } @@ -378,9 +377,26 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket // the filter doesn't match any docs continue; } - TotalHitCountCollector collector = new TotalHitCountCollector(); - scorer.score(collector, live); - incrementBucketDocCount(filterOrd, collector.getTotalHits()); + if (sub == LeafBucketCollector.NO_OP_COLLECTOR) { + TotalHitCountCollector collector = new TotalHitCountCollector(); + scorer.score(collector, live); + incrementBucketDocCount(filterOrd, collector.getTotalHits()); + } else { + /* + * We can use the pre-constructed leaf collected for the first + * filter. But it almost certainly not going to work for the + * second one because it'll try to "go backwards". So we build + * a new one for each subsequent filter. + */ + // NOCOMMIT switch to the block collection mechanism if we have more than a single sub-agg instead of this. + /* + * The switch is better because we can better estimate the costs. + */ + LeafBucketCollector filterLeafCollector = filterOrd == 0 ? sub : collectableSubAggregators.getLeafCollector(ctx); + SubCollector collector = new SubCollector(filterOrd, filterLeafCollector); + scorer.score(collector, live); + incrementBucketDocCount(filterOrd, collector.total); + } } // Throwing this exception is how we communicate to the collection mechanism that we don't need the segment. throw new CollectionTerminatedException(); @@ -397,6 +413,31 @@ public void collectDebugInfo(BiConsumer add) { add.accept("estimate_cost_time", estimateCostTime); } } + + /** + * Adapts filter-by-filter hit collection into sub-aggregations. + */ + private static class SubCollector implements LeafCollector { + private final int filterOrd; + private final LeafBucketCollector sub; + private int total; + + SubCollector(int filterOrd, LeafBucketCollector sub) { + this.filterOrd = filterOrd; + this.sub = sub; + } + + @Override + public void setScorer(Scorable scorer) throws IOException { + sub.setScorer(scorer); + } + + @Override + public void collect(int doc) throws IOException { + total++; + sub.collect(doc, filterOrd); + } + } } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java index eee2777efff92..0b8d4874d1fdf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java @@ -312,7 +312,7 @@ public static Aggregator build( /* * Looks like it'd be more expensive to use the filter-by-filter * aggregator. Oh well. Snapshot the the filter-by-filter - * aggregator's debug information if we're profiling bececause it + * aggregator's debug information if we're profiling because it * is useful even if the aggregator isn't. */ if (context.profiling()) { @@ -356,7 +356,6 @@ public static FromFilters adaptIntoFiltersOrNull( if (averageDocsPerRange < DOCS_PER_RANGE_TO_USE_FILTERS) { return null; } - // TODO bail here for runtime fields. We should check the cost estimates on the Scorer. if (valuesSourceConfig.fieldType() instanceof DateFieldType && ((DateFieldType) valuesSourceConfig.fieldType()).resolution() == Resolution.NANOSECONDS) { // We don't generate sensible Queries for nanoseconds. @@ -393,37 +392,33 @@ public static FromFilters adaptIntoFiltersOrNull( builder.to(ranges[i].to == Double.POSITIVE_INFINITY ? null : format.format(ranges[i].to)).includeUpper(false); filters[i] = context.buildQuery(builder); } - FiltersAggregator.FilterByFilter delegate = FiltersAggregator.buildFilterOrderOrNull( - name, - factories, - keys, - filters, - false, - null, - context, - parent, - cardinality, - metadata - ); - if (delegate == null) { - return null; - } RangeAggregator.FromFilters fromFilters = new RangeAggregator.FromFilters<>( parent, factories, - subAggregators -> { - if (subAggregators.countAggregators() > 0) { - throw new IllegalStateException("didn't expect to have a delegate if there are child aggs"); - } - return delegate; - }, + subAggregators -> FiltersAggregator.buildFilterOrderOrNull( + name, + subAggregators, + keys, + filters, + false, + null, + context, + parent, + cardinality, + metadata + ), valuesSourceConfig.format(), ranges, keyed, rangeFactory, averageDocsPerRange ); - return fromFilters; + /* + * A null delegate means we weren't able to run the aggregation + * filter by filter so we have to give up and go back to the + * standard range aggregator. + */ + return fromFilters.delegate() == null ? null : fromFilters; } public static Aggregator buildWithoutAttemptedToAdaptToFilters( diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorTests.java index 5c322288a6ed1..3f251f55b2d40 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorTests.java @@ -21,8 +21,10 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.IndexSearcher; @@ -36,6 +38,8 @@ import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper.KeywordFieldType; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; import org.elasticsearch.index.mapper.ObjectMapper; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; @@ -46,10 +50,14 @@ import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator.KeyedFilter; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregatorTests; +import org.elasticsearch.search.aggregations.metrics.InternalMax; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import org.junit.Before; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -297,6 +305,69 @@ public void testFilterByFilterCost() throws IOException { ); } + public void testSubAggs() throws IOException { + MappedFieldType dateFt = new DateFieldMapper.DateFieldType( + "test", + true, + false, + false, + DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER, + Resolution.MILLISECONDS, + null, + null + ); + MappedFieldType intFt = new NumberFieldMapper.NumberFieldType("int", NumberType.INTEGER); + AggregationBuilder builder = new FiltersAggregationBuilder( + "test", + new KeyedFilter("q1", new RangeQueryBuilder("test").from("2010-01-01").to("2010-03-01").includeUpper(false)), + new KeyedFilter("q2", new RangeQueryBuilder("test").from("2020-01-01").to("2020-03-01").includeUpper(false)) + ).subAggregation(new MaxAggregationBuilder("m").field("int")); + List> docs = new ArrayList<>(); + docs.add( + List.of( + new LongPoint("test", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2010-01-02")), + new SortedNumericDocValuesField("int", 100) + ) + ); + docs.add( + List.of( + new LongPoint("test", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2020-01-02")), + new SortedNumericDocValuesField("int", 5) + ) + ); + docs.add( + List.of( + new LongPoint("test", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2020-01-03")), + new SortedNumericDocValuesField("int", 10) + ) + ); + /* + * Shuffle the docs so we collect them in a random order which causes + * bad implementations of filter-by-filter aggregation to fail with + * assertion errors while executing. + */ + Collections.shuffle(docs, random()); + testCase(builder, new MatchAllDocsQuery(), iw -> iw.addDocuments(docs), result -> { + InternalFilters filters = (InternalFilters) result; + assertThat(filters.getBuckets(), hasSize(2)); + + assertThat(filters.getBucketByKey("q1").getDocCount(), equalTo(1L)); + InternalMax max = filters.getBucketByKey("q1").getAggregations().get("m"); + assertThat(max.getValue(), equalTo(100.0)); + + assertThat(filters.getBucketByKey("q2").getDocCount(), equalTo(2L)); + max = filters.getBucketByKey("q2").getAggregations().get("m"); + assertThat(max.getValue(), equalTo(10.0)); + }, dateFt, intFt); + withAggregator(builder, new MatchAllDocsQuery(), iw -> iw.addDocuments(docs), (searcher, aggregator) -> { + assertThat(aggregator, instanceOf(FiltersAggregator.FilterByFilter.class)); + FiltersAggregator.FilterByFilter filterByFilter = (FiltersAggregator.FilterByFilter) aggregator; + int maxDoc = searcher.getIndexReader().maxDoc(); + assertThat(filterByFilter.estimateCost(maxDoc), equalTo(3L)); + assertThat(filterByFilter.scorersCached(), equalTo(true)); + }, dateFt, intFt); + } + /** * Check that we don't accidentally find nested documents when the filter * matches it.