Skip to content

Commit 521933a

Browse files
committed
SingleBucket aggs need to reduce their bucket's pipelines first (#50103)
When decoupling the pipeline reduction from regular agg reduction, MultiBucket aggs were modified to reduce their bucket's pipeline aggs first before reducing the sibling aggs. This modification was missed on SingleBucket aggs, meaning any SingleBucket would fail to reduce any pipeline sub-aggs
1 parent 364eb2d commit 521933a

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java

+16
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,22 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
109109
return newAggregation(getName(), docCount, aggs);
110110
}
111111

112+
/**
113+
* Unlike {@link InternalAggregation#reducePipelines(InternalAggregation, ReduceContext)}, a single-bucket
114+
* agg needs to first reduce the aggs in it's bucket (and their parent pipelines) before allowing sibling pipelines
115+
* to reduce
116+
*/
117+
@Override
118+
public final InternalAggregation reducePipelines(InternalAggregation reducedAggs, ReduceContext reduceContext) {
119+
assert reduceContext.isFinalReduce();
120+
List<InternalAggregation> aggs = new ArrayList<>();
121+
for (Aggregation agg : getAggregations().asList()) {
122+
aggs.add(((InternalAggregation)agg).reducePipelines((InternalAggregation)agg, reduceContext));
123+
}
124+
InternalAggregations reducedSubAggs = new InternalAggregations(aggs);
125+
return super.reducePipelines(create(reducedSubAggs), reduceContext);
126+
}
127+
112128
@Override
113129
public Object getProperty(List<String> path) {
114130
if (path.isEmpty()) {

server/src/test/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorTests.java

+86
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.Version;
4141
import org.elasticsearch.common.collect.Tuple;
4242
import org.elasticsearch.common.lucene.search.Queries;
43+
import org.elasticsearch.common.settings.Settings;
4344
import org.elasticsearch.index.mapper.IdFieldMapper;
4445
import org.elasticsearch.index.mapper.KeywordFieldMapper;
4546
import org.elasticsearch.index.mapper.MappedFieldType;
@@ -48,11 +49,19 @@
4849
import org.elasticsearch.index.mapper.TypeFieldMapper;
4950
import org.elasticsearch.index.mapper.Uid;
5051
import org.elasticsearch.index.query.MatchAllQueryBuilder;
52+
import org.elasticsearch.script.MockScriptEngine;
53+
import org.elasticsearch.script.Script;
54+
import org.elasticsearch.script.ScriptEngine;
55+
import org.elasticsearch.script.ScriptModule;
56+
import org.elasticsearch.script.ScriptService;
57+
import org.elasticsearch.script.ScriptType;
5158
import org.elasticsearch.search.aggregations.AggregatorTestCase;
5259
import org.elasticsearch.search.aggregations.BucketOrder;
5360
import org.elasticsearch.search.aggregations.InternalAggregation;
5461
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
5562
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
63+
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
64+
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
5665
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
5766
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
5867
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
@@ -63,13 +72,17 @@
6372
import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
6473
import org.elasticsearch.search.aggregations.metrics.InternalSum;
6574
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
75+
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
76+
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
6677
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
6778
import org.elasticsearch.search.aggregations.support.ValueType;
6879
import org.elasticsearch.test.VersionUtils;
6980

7081
import java.io.IOException;
7182
import java.util.ArrayList;
7283
import java.util.Arrays;
84+
import java.util.Collections;
85+
import java.util.HashMap;
7386
import java.util.List;
7487
import java.util.Locale;
7588
import java.util.Map;
@@ -88,6 +101,7 @@ public class NestedAggregatorTests extends AggregatorTestCase {
88101
private static final String NESTED_AGG = "nestedAgg";
89102
private static final String MAX_AGG_NAME = "maxAgg";
90103
private static final String SUM_AGG_NAME = "sumAgg";
104+
private static final String INVERSE_SCRIPT = "inverse";
91105

92106
private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
93107

@@ -101,6 +115,18 @@ protected Map<String, MappedFieldType> getFieldAliases(MappedFieldType... fieldT
101115
Function.identity()));
102116
}
103117

118+
@Override
119+
protected ScriptService getMockScriptService() {
120+
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
121+
scripts.put(INVERSE_SCRIPT, vars -> -((Number) vars.get("_value")).doubleValue());
122+
MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME,
123+
scripts,
124+
Collections.emptyMap());
125+
Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
126+
127+
return new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS);
128+
}
129+
104130
public void testNoDocs() throws IOException {
105131
try (Directory directory = newDirectory()) {
106132
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
@@ -711,6 +737,66 @@ public void testFieldAlias() throws IOException {
711737
}
712738
}
713739

740+
/**
741+
* This tests to make sure pipeline aggs embedded under a SingleBucket agg (like nested)
742+
* are properly reduced
743+
*/
744+
public void testNestedWithPipeline() throws IOException {
745+
int numRootDocs = randomIntBetween(1, 20);
746+
int expectedNestedDocs = 0;
747+
double expectedMaxValue = Double.NEGATIVE_INFINITY;
748+
try (Directory directory = newDirectory()) {
749+
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
750+
for (int i = 0; i < numRootDocs; i++) {
751+
List<Document> documents = new ArrayList<>();
752+
expectedMaxValue = Math.max(expectedMaxValue,
753+
generateMaxDocs(documents, 1, i, NESTED_OBJECT, VALUE_FIELD_NAME));
754+
expectedNestedDocs += 1;
755+
756+
Document document = new Document();
757+
document.add(new Field(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(i)), IdFieldMapper.Defaults.FIELD_TYPE));
758+
document.add(new Field(TypeFieldMapper.NAME, "test",
759+
TypeFieldMapper.Defaults.FIELD_TYPE));
760+
document.add(sequenceIDFields.primaryTerm);
761+
documents.add(document);
762+
iw.addDocuments(documents);
763+
}
764+
iw.commit();
765+
}
766+
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
767+
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT)
768+
.subAggregation(new TermsAggregationBuilder("terms", ValueType.NUMERIC).field(VALUE_FIELD_NAME)
769+
.subAggregation(new MaxAggregationBuilder(MAX_AGG_NAME).field(VALUE_FIELD_NAME))
770+
.subAggregation(new BucketScriptPipelineAggregationBuilder("bucketscript",
771+
Collections.singletonMap("_value", MAX_AGG_NAME),
772+
new Script(ScriptType.INLINE, MockScriptEngine.NAME, INVERSE_SCRIPT, Collections.emptyMap()))));
773+
774+
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
775+
fieldType.setName(VALUE_FIELD_NAME);
776+
777+
InternalNested nested = searchAndReduce(newSearcher(indexReader, false, true),
778+
new MatchAllDocsQuery(), nestedBuilder, fieldType);
779+
780+
assertEquals(expectedNestedDocs, nested.getDocCount());
781+
assertEquals(NESTED_AGG, nested.getName());
782+
assertEquals(expectedNestedDocs, nested.getDocCount());
783+
784+
InternalTerms<?, LongTerms.Bucket> terms = (InternalTerms) nested.getProperty("terms");
785+
assertNotNull(terms);
786+
787+
for (LongTerms.Bucket bucket : terms.getBuckets()) {
788+
InternalMax max = (InternalMax) bucket.getAggregations().asMap().get(MAX_AGG_NAME);
789+
InternalSimpleValue bucketScript = (InternalSimpleValue) bucket.getAggregations().asMap().get("bucketscript");
790+
assertNotNull(max);
791+
assertNotNull(bucketScript);
792+
assertEquals(max.getValue(), -bucketScript.getValue(), Double.MIN_VALUE);
793+
}
794+
795+
assertTrue(AggregationInspectionHelper.hasValue(nested));
796+
}
797+
}
798+
}
799+
714800
private double generateMaxDocs(List<Document> documents, int numNestedDocs, int id, String path, String fieldName) {
715801
return DoubleStream.of(generateDocuments(documents, numNestedDocs, id, path, fieldName))
716802
.max().orElse(Double.NEGATIVE_INFINITY);

0 commit comments

Comments
 (0)