Skip to content

Commit b4d88aa

Browse files
authored
Move top-level pipeline aggs out of QuerySearchResult (elastic#40319)
As part of elastic#40177 we have added top-level pipeline aggs to `InternalAggregations`. Given that `QuerySearchResult` holds an `InternalAggregations` instance, there is no need to keep on setting top-level pipeline aggs separately. Top-level pipeline aggs can then always be transported through `InternalAggregations`. Such change is made in a backwards compatible manner.
1 parent c7ebfa2 commit b4d88aa

File tree

7 files changed

+145
-41
lines changed

7 files changed

+145
-41
lines changed

build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ task verifyVersions {
162162
* after the backport of the backcompat code is complete.
163163
*/
164164

165-
boolean bwc_tests_enabled = true
166-
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
165+
boolean bwc_tests_enabled = false
166+
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/40319" /* place a PR link here when committing bwc changes */
167167
if (bwc_tests_enabled == false) {
168168
if (bwc_tests_disabled_issue.isEmpty()) {
169169
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
486486
}
487487
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
488488
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
489-
InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
489+
InternalAggregations.reduce(aggregationsList, reduceContext);
490490
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
491491
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
492492
reducedCompletionSuggestions);

server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ public void execute(SearchContext context) {
132132
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
133133
}
134134
}
135-
context.queryResult().aggregations(new InternalAggregations(aggregations));
136135
List<PipelineAggregator> pipelineAggregators = context.aggregations().factories().createPipelineAggregators();
137136
List<SiblingPipelineAggregator> siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size());
138137
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
@@ -144,7 +143,7 @@ public void execute(SearchContext context) {
144143
+ "allowed at the top level");
145144
}
146145
}
147-
context.queryResult().pipelineAggregators(siblingPipelineAggregators);
146+
context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators));
148147

149148
// disable aggregations so that they don't run on next pages in case of scrolling
150149
context.aggregations(null);

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

+2-15
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public InternalAggregations(List<InternalAggregation> aggregations, List<Sibling
7777
* Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they
7878
* become part of the list of {@link InternalAggregation}s.
7979
*/
80-
List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
80+
public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
8181
return topLevelPipelineAggregators;
8282
}
8383

@@ -91,20 +91,7 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
9191
if (aggregationsList.isEmpty()) {
9292
return null;
9393
}
94-
InternalAggregations first = aggregationsList.get(0);
95-
return reduce(aggregationsList, first.topLevelPipelineAggregators, context);
96-
}
97-
98-
/**
99-
* Reduces the given list of aggregations as well as the provided top-level pipeline aggregators.
100-
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
101-
*/
102-
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
103-
List<SiblingPipelineAggregator> topLevelPipelineAggregators,
104-
ReduceContext context) {
105-
if (aggregationsList.isEmpty()) {
106-
return null;
107-
}
94+
List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();
10895

10996
// first we collect all aggregations of the same type and list them together
11097
Map<String, List<InternalAggregation>> aggByName = new HashMap<>();

server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

+24-13
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121

2222
import org.apache.lucene.search.FieldDoc;
2323
import org.apache.lucene.search.TotalHits;
24+
import org.elasticsearch.Version;
2425
import org.elasticsearch.common.io.stream.StreamInput;
2526
import org.elasticsearch.common.io.stream.StreamOutput;
2627
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
2728
import org.elasticsearch.search.DocValueFormat;
2829
import org.elasticsearch.search.SearchPhaseResult;
2930
import org.elasticsearch.search.SearchShardTarget;
3031
import org.elasticsearch.search.aggregations.Aggregations;
32+
import org.elasticsearch.search.aggregations.InternalAggregation;
3133
import org.elasticsearch.search.aggregations.InternalAggregations;
3234
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
3335
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
@@ -37,7 +39,6 @@
3739
import java.io.IOException;
3840
import java.util.Collections;
3941
import java.util.List;
40-
import java.util.Objects;
4142
import java.util.stream.Collectors;
4243

4344
import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
@@ -54,7 +55,6 @@ public final class QuerySearchResult extends SearchPhaseResult {
5455
private DocValueFormat[] sortValueFormats;
5556
private InternalAggregations aggregations;
5657
private boolean hasAggs;
57-
private List<SiblingPipelineAggregator> pipelineAggregators = Collections.emptyList();
5858
private Suggest suggest;
5959
private boolean searchTimedOut;
6060
private Boolean terminatedEarly = null;
@@ -198,14 +198,6 @@ public void profileResults(ProfileShardResult shardResults) {
198198
hasProfileResults = shardResults != null;
199199
}
200200

201-
public List<SiblingPipelineAggregator> pipelineAggregators() {
202-
return pipelineAggregators;
203-
}
204-
205-
public void pipelineAggregators(List<SiblingPipelineAggregator> pipelineAggregators) {
206-
this.pipelineAggregators = Objects.requireNonNull(pipelineAggregators);
207-
}
208-
209201
public Suggest suggest() {
210202
return suggest;
211203
}
@@ -294,8 +286,18 @@ public void readFromWithId(long id, StreamInput in) throws IOException {
294286
if (hasAggs = in.readBoolean()) {
295287
aggregations = InternalAggregations.readAggregations(in);
296288
}
297-
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a)
298-
.collect(Collectors.toList());
289+
if (in.getVersion().before(Version.V_7_1_0)) {
290+
List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream()
291+
.map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList());
292+
if (hasAggs && pipelineAggregators.isEmpty() == false) {
293+
List<InternalAggregation> internalAggs = aggregations.asList().stream()
294+
.map(agg -> (InternalAggregation) agg).collect(Collectors.toList());
295+
//Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while
296+
//later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
297+
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1.
298+
this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators);
299+
}
300+
}
299301
if (in.readBoolean()) {
300302
suggest = new Suggest(in);
301303
}
@@ -332,7 +334,16 @@ public void writeToNoId(StreamOutput out) throws IOException {
332334
out.writeBoolean(true);
333335
aggregations.writeTo(out);
334336
}
335-
out.writeNamedWriteableList(pipelineAggregators);
337+
if (out.getVersion().before(Version.V_7_1_0)) {
338+
//Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly,
339+
//while later versions expect them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
340+
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1 on.
341+
if (aggregations == null) {
342+
out.writeNamedWriteableList(Collections.emptyList());
343+
} else {
344+
out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators());
345+
}
346+
}
336347
if (suggest == null) {
337348
out.writeBoolean(false);
338349
} else {

server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,19 @@ public class InternalAggregationsTests extends ESTestCase {
5050
public void testReduceEmptyAggs() {
5151
List<InternalAggregations> aggs = Collections.emptyList();
5252
InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean());
53-
assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext));
53+
assertNull(InternalAggregations.reduce(aggs, reduceContext));
5454
}
5555

5656
public void testNonFinalReduceTopLevelPipelineAggs() {
5757
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
5858
10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
59-
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms)));
6059
List<SiblingPipelineAggregator> topLevelPipelineAggs = new ArrayList<>();
6160
MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test");
6261
topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create());
62+
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms),
63+
topLevelPipelineAggs));
6364
InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false);
64-
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext);
65+
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext);
6566
assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size());
6667
assertEquals(1, reducedAggs.aggregations.size());
6768
}
@@ -79,15 +80,15 @@ public void testFinalReduceTopLevelPipelineAggs() {
7980
Collections.singletonList(siblingPipelineAggregator));
8081
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
8182
} else {
82-
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms));
83-
List<SiblingPipelineAggregator> topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator);
84-
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext);
83+
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms),
84+
Collections.singletonList(siblingPipelineAggregator));
85+
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
8586
}
8687
assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size());
8788
assertEquals(2, reducedAggs.aggregations.size());
8889
}
8990

90-
public void testSerialization() throws Exception {
91+
public static InternalAggregations createTestInstance() throws Exception {
9192
List<InternalAggregation> aggsList = new ArrayList<>();
9293
if (randomBoolean()) {
9394
StringTermsTests stringTermsTests = new StringTermsTests();
@@ -116,7 +117,11 @@ public void testSerialization() throws Exception {
116117
topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create());
117118
}
118119
}
119-
InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs);
120+
return new InternalAggregations(aggsList, topLevelPipelineAggs);
121+
}
122+
123+
public void testSerialization() throws Exception {
124+
InternalAggregations aggregations = createTestInstance();
120125
writeToAndReadFrom(aggregations, 0);
121126
}
122127

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.query;
21+
22+
import org.apache.lucene.search.ScoreDoc;
23+
import org.apache.lucene.search.TopDocs;
24+
import org.apache.lucene.search.TotalHits;
25+
import org.elasticsearch.Version;
26+
import org.elasticsearch.action.OriginalIndices;
27+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
28+
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
29+
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.index.shard.ShardId;
31+
import org.elasticsearch.search.DocValueFormat;
32+
import org.elasticsearch.search.SearchModule;
33+
import org.elasticsearch.search.SearchShardTarget;
34+
import org.elasticsearch.search.aggregations.Aggregations;
35+
import org.elasticsearch.search.aggregations.InternalAggregations;
36+
import org.elasticsearch.search.aggregations.InternalAggregationsTests;
37+
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
38+
import org.elasticsearch.search.suggest.SuggestTests;
39+
import org.elasticsearch.test.ESTestCase;
40+
import org.elasticsearch.test.VersionUtils;
41+
42+
import java.util.List;
43+
44+
import static java.util.Collections.emptyList;
45+
46+
public class QuerySearchResultTests extends ESTestCase {
47+
48+
private final NamedWriteableRegistry namedWriteableRegistry;
49+
50+
public QuerySearchResultTests() {
51+
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
52+
this.namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
53+
}
54+
55+
private static QuerySearchResult createTestInstance() throws Exception {
56+
ShardId shardId = new ShardId("index", "uuid", randomInt());
57+
QuerySearchResult result = new QuerySearchResult(randomLong(), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE));
58+
if (randomBoolean()) {
59+
result.terminatedEarly(randomBoolean());
60+
}
61+
TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]);
62+
result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]);
63+
result.size(randomInt());
64+
result.from(randomInt());
65+
if (randomBoolean()) {
66+
result.suggest(SuggestTests.createTestItem());
67+
}
68+
if (randomBoolean()) {
69+
result.aggregations(InternalAggregationsTests.createTestInstance());
70+
}
71+
return result;
72+
}
73+
74+
public void testSerialization() throws Exception {
75+
QuerySearchResult querySearchResult = createTestInstance();
76+
Version version = VersionUtils.randomVersion(random());
77+
QuerySearchResult deserialized = copyStreamable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
78+
assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId());
79+
assertNull(deserialized.getSearchShardTarget());
80+
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);
81+
assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits);
82+
assertEquals(querySearchResult.from(), deserialized.from());
83+
assertEquals(querySearchResult.size(), deserialized.size());
84+
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
85+
if (deserialized.hasAggs()) {
86+
Aggregations aggs = querySearchResult.consumeAggs();
87+
Aggregations deserializedAggs = deserialized.consumeAggs();
88+
assertEquals(aggs.asList(), deserializedAggs.asList());
89+
List<SiblingPipelineAggregator> pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators();
90+
List<SiblingPipelineAggregator> deserializedPipelineAggs =
91+
((InternalAggregations) deserializedAggs).getTopLevelPipelineAggregators();
92+
assertEquals(pipelineAggs.size(), deserializedPipelineAggs.size());
93+
for (int i = 0; i < pipelineAggs.size(); i++) {
94+
SiblingPipelineAggregator pipelineAgg = pipelineAggs.get(i);
95+
SiblingPipelineAggregator deserializedPipelineAgg = deserializedPipelineAggs.get(i);
96+
assertArrayEquals(pipelineAgg.bucketsPaths(), deserializedPipelineAgg.bucketsPaths());
97+
assertEquals(pipelineAgg.name(), deserializedPipelineAgg.name());
98+
}
99+
}
100+
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
101+
}
102+
}

0 commit comments

Comments
 (0)