diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index 0a34e7a92b8f1..bb0b2ad0a3050 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -109,6 +109,22 @@ public InternalAggregation reduce(List aggregations, Reduce return newAggregation(getName(), docCount, aggs); } + /** + * Unlike {@link InternalAggregation#reducePipelines(InternalAggregation, ReduceContext)}, a single-bucket + * agg needs to first reduce the aggs in it's bucket (and their parent pipelines) before allowing sibling pipelines + * to reduce + */ + @Override + public final InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) { + assert reduceContext.isFinalReduce(); + List aggs = new ArrayList<>(); + for (Aggregation agg : getAggregations().asList()) { + aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext)); + } + InternalAggregations reducedSubAggs = new InternalAggregations(aggs); + return super.reducePipelines(create(reducedSubAggs), reduceContext); + } + @Override public Object getProperty(List path) { if (path.isEmpty()) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java index f3c1ea7ca529a..4983575860f86 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java @@ -39,6 +39,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.search.Queries; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; @@ -47,11 +48,19 @@ import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptEngine; +import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms; +import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; @@ -62,12 +71,16 @@ import org.elasticsearch.search.aggregations.metrics.Min; import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import org.elasticsearch.search.aggregations.support.ValueType; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -86,6 +99,7 @@ public class NestedAggregatorTests extends AggregatorTestCase { private static final String NESTED_AGG = "nestedAgg"; private static final String MAX_AGG_NAME = "maxAgg"; private static final String SUM_AGG_NAME = "sumAgg"; + private static final String INVERSE_SCRIPT = "inverse"; private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); @@ -99,6 +113,18 @@ protected Map getFieldAliases(MappedFieldType... fieldT Function.identity())); } + @Override + protected ScriptService getMockScriptService() { + Map, Object>> scripts = new HashMap<>(); + scripts.put(INVERSE_SCRIPT, vars -> -((Number) vars.get("_value")).doubleValue()); + MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, + scripts, + Collections.emptyMap()); + Map engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine); + + return new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS); + } + public void testNoDocs() throws IOException { try (Directory directory = newDirectory()) { try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) { @@ -709,6 +735,66 @@ public void testFieldAlias() throws IOException { } } + /** + * This tests to make sure pipeline aggs embedded under a SingleBucket agg (like nested) + * are properly reduced + */ + public void testNestedWithPipeline() throws IOException { + int numRootDocs = randomIntBetween(1, 20); + int expectedNestedDocs = 0; + double expectedMaxValue = Double.NEGATIVE_INFINITY; + try (Directory directory = newDirectory()) { + try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) { + for (int i = 0; i < numRootDocs; i++) { + List documents = new ArrayList<>(); + expectedMaxValue = Math.max(expectedMaxValue, + generateMaxDocs(documents, 1, i, NESTED_OBJECT, VALUE_FIELD_NAME)); + expectedNestedDocs += 1; + + Document document = new Document(); + document.add(new Field(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(i)), IdFieldMapper.Defaults.FIELD_TYPE)); + document.add(new Field(TypeFieldMapper.NAME, "test", + TypeFieldMapper.Defaults.FIELD_TYPE)); + document.add(sequenceIDFields.primaryTerm); + documents.add(document); + iw.addDocuments(documents); + } + iw.commit(); + } + try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) { + NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT) + .subAggregation(new TermsAggregationBuilder("terms", ValueType.NUMERIC).field(VALUE_FIELD_NAME) + .subAggregation(new MaxAggregationBuilder(MAX_AGG_NAME).field(VALUE_FIELD_NAME)) + .subAggregation(new BucketScriptPipelineAggregationBuilder("bucketscript", + Collections.singletonMap("_value", MAX_AGG_NAME), + new Script(ScriptType.INLINE, MockScriptEngine.NAME, INVERSE_SCRIPT, Collections.emptyMap())))); + + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + fieldType.setName(VALUE_FIELD_NAME); + + InternalNested nested = searchAndReduce(newSearcher(indexReader, false, true), + new MatchAllDocsQuery(), nestedBuilder, fieldType); + + assertEquals(expectedNestedDocs, nested.getDocCount()); + assertEquals(NESTED_AGG, nested.getName()); + assertEquals(expectedNestedDocs, nested.getDocCount()); + + InternalTerms terms = (InternalTerms) nested.getProperty("terms"); + assertNotNull(terms); + + for (LongTerms.Bucket bucket : terms.getBuckets()) { + InternalMax max = (InternalMax) bucket.getAggregations().asMap().get(MAX_AGG_NAME); + InternalSimpleValue bucketScript = (InternalSimpleValue) bucket.getAggregations().asMap().get("bucketscript"); + assertNotNull(max); + assertNotNull(bucketScript); + assertEquals(max.getValue(), -bucketScript.getValue(), Double.MIN_VALUE); + } + + assertTrue(AggregationInspectionHelper.hasValue(nested)); + } + } + } + private double generateMaxDocs(List documents, int numNestedDocs, int id, String path, String fieldName) { return DoubleStream.of(generateDocuments(documents, numNestedDocs, id, path, fieldName)) .max().orElse(Double.NEGATIVE_INFINITY);