diff --git a/docs/reference/modules/cross-cluster-search.asciidoc b/docs/reference/modules/cross-cluster-search.asciidoc index deea9b9587251..da2d5572e1514 100644 --- a/docs/reference/modules/cross-cluster-search.asciidoc +++ b/docs/reference/modules/cross-cluster-search.asciidoc @@ -2,9 +2,11 @@ == {ccs-cap} The _{ccs}_ feature allows any node to act as a federated client across -multiple clusters. In contrast to the <> feature, a {ccs} node won't -join the remote cluster, instead it connects to a remote cluster in a light fashion in order to execute -federated search requests. +multiple clusters. In contrast to the <> feature, +a {ccs} node won't join the remote cluster, instead it connects to a remote +cluster in a light fashion in order to execute federated search requests. +For details on communication and compatibility between different clusters, +see <>. [float] === Using {ccs} @@ -43,7 +45,8 @@ PUT _cluster/settings // TEST[s/127.0.0.1:9300/\${transport_host}/] To search the `twitter` index on remote cluster `cluster_one` the index name -must be prefixed with the cluster alias separated by a `:` character: +must be prefixed with the alias of the remote cluster followed by the `:` +character: [source,js] -------------------------------------------------- @@ -101,8 +104,8 @@ GET /cluster_one:twitter/_search // TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/] -In contrast to the `tribe` feature cross cluster search can also search indices with the same name on different -clusters: +In contrast to the `tribe` feature cross cluster search can also search indices +with the same name on different clusters: [source,js] -------------------------------------------------- @@ -118,10 +121,10 @@ GET /cluster_one:twitter,twitter/_search // CONSOLE // TEST[continued] -Search results are disambiguated the same way as the indices are disambiguated in the request. Even if index names are -identical these indices will be treated as different indices when results are merged. All results retrieved from a -remote index -will be prefixed with their remote cluster name: +Search results are disambiguated the same way as the indices are disambiguated in the request. +Indices with same names are treated as different indices when results are merged. All results +retrieved from an index located in a remote cluster are prefixed with their corresponding +cluster alias: [source,js] -------------------------------------------------- @@ -179,10 +182,11 @@ will be prefixed with their remote cluster name: [float] === Skipping disconnected clusters -By default all remote clusters that are searched via {ccs} need to be available when -the search request is executed, otherwise the whole request fails and no search results are returned -despite some of the clusters are available. Remote clusters can be made optional through the -boolean `skip_unavailable` setting, set to `false` by default. +By default, all remote clusters that are searched via {ccs} need to be +available when the search request is executed. Otherwise, the whole request +fails; even if some of the clusters are available, no search results are +returned. You can use the boolean `skip_unavailable` setting to make remote +clusters optional. By default, it is set to `false`. [source,js] -------------------------------- diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 314d95cc79eeb..b134a626739fa 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -14,16 +14,46 @@ connections to a remote cluster. This functionality is used in endif::[] Remote cluster connections work by configuring a remote cluster and connecting -only to a limited number of nodes in the remote cluster. Each remote cluster is -referenced by a name and a list of seed nodes. When a remote cluster is -registered, its cluster state is retrieved from one of the seed nodes so that by -default up to three _gateway nodes_ are selected to be connected to as part of -remote cluster requests. Remote cluster connections consist of uni-directional -connections from the coordinating node to the previously selected remote nodes -only. You can tag which nodes should be selected by using node attributes (see <>). +only to a limited number of nodes in that remote cluster. Each remote cluster +is referenced by a name and a list of seed nodes. When a remote cluster is +registered, its cluster state is retrieved from one of the seed nodes and up +to three _gateway nodes_ are selected to be connected to as part of remote +cluster requests. All the communication required between different clusters +goes through the <>. Remote cluster +connections consist of uni-directional connections from the coordinating +node to the selected remote _gateway nodes_ only. -Each node in a cluster that has remote clusters configured connects to one or -more _gateway nodes_ and uses them to federate requests to the remote cluster. +[float] +[[gateway-nodes-selection]] +=== Gateway nodes selection + +The _gateway nodes_ selection depends on the following criteria: + +- *version*: Remote nodes must be compatible with the cluster they are +registered to. This is subject to the same rules as <>. +Any node can communicate with any other node on the same major version (e.g. +6.0 can talk to any 6.x node). Only nodes on the last minor version of a +certain major version can communicate with nodes on the following major +version (e.g. 6.7 can communicate with 7.0, as well as any 7.x node, while + 6.6 or earlier cannot talk to any 7.x node). Note that version compatibility + is symmetric, meaning that if 6.7 can communicate with 7.0, 7.0 can also + communicate with 6.7. The matrix below summarizes compatibility as described + above. + +[cols="^,^,^,^,^,^"] +|==== +| Compatibility | 5.0->5.5 | 5.6 | 6.0->6.6 | 6.7 | 7.x +| 5.0->5.5 | Yes | Yes | No | No | No +| 5.6 | Yes | Yes | Yes | Yes | No +| 6.0->6.6 | No | Yes | Yes | Yes | No +| 6.7 | No | Yes | Yes | Yes | Yes +| 7.x | No | No | No | Yes | Yes +|==== + +- *role*: Dedicated master nodes never get selected. +- *attributes*: You can tag which nodes should be selected +(see <>), though such tagged nodes still have +to satisfy the two above requirements. [float] [[configuring-remote-clusters]] @@ -31,7 +61,8 @@ more _gateway nodes_ and uses them to federate requests to the remote cluster. You can configure remote clusters globally by using <>, which you can update dynamically. -Alternatively, you can configure them locally on individual nodes by using the `elasticsearch.yml` file. +Alternatively, you can configure them locally on individual nodes by using the + `elasticsearch.yml` file. If you specify the settings in `elasticsearch.yml` files, only the nodes with those settings can connect to the remote cluster. In other words, functionality @@ -59,7 +90,8 @@ between local and remote indices. For more information about the optional transport settings, see <>. -If you use <>, the remote clusters are available on every node in the cluster. For example: +If you use <>, the remote clusters +are available on every node in the cluster. For example: [source,js] -------------------------------- diff --git a/docs/reference/modules/snapshots.asciidoc b/docs/reference/modules/snapshots.asciidoc index 64a5e4f046661..146722d7a9217 100644 --- a/docs/reference/modules/snapshots.asciidoc +++ b/docs/reference/modules/snapshots.asciidoc @@ -28,6 +28,10 @@ back up a cluster is by using the snapshot and restore functionality. [float] === Version compatibility +IMPORTANT: Version compatibility refers to the underlying Lucene index +compatibility. Follow the <> +when migrating between versions. + A snapshot contains a copy of the on-disk data structures that make up an index. This means that snapshots can only be restored to versions of Elasticsearch that can read the indices: diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 8055b22d22e3b..f6d08c7d7c807 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -39,7 +39,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -62,8 +61,6 @@ import java.util.Map; import java.util.function.Function; import java.util.function.IntFunction; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public final class SearchPhaseController { @@ -496,8 +493,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection aggregationsList) { - ReduceContext reduceContext = reduceContextFunction.apply(false); - return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList, - null, reduceContext); - } - - private static InternalAggregations reduceAggs(List aggregationsList, - List pipelineAggregators, ReduceContext reduceContext) { - InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); - if (pipelineAggregators != null) { - List newAggs = StreamSupport.stream(aggregations.spliterator(), false) - .map((p) -> (InternalAggregation) p) - .collect(Collectors.toList()); - for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) { - InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext); - newAggs.add(newAgg); - } - return new InternalAggregations(newAggs); - } - return aggregations; - } - public static final class ReducedQueryPhase { // the sum of all hits across all reduces shards final long totalHits; @@ -653,7 +624,8 @@ public void consumeResult(SearchPhaseResult result) { private synchronized void consumeInternal(QuerySearchResult querySearchResult) { if (index == bufferSize) { if (hasAggs) { - InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer)); + ReduceContext reduceContext = controller.reduceContextFunction.apply(false); + InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext); Arrays.fill(aggsBuffer, null); aggsBuffer[0] = reducedAggs; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index d38a8835e4e7e..bf8ad33dfb36d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -161,8 +161,7 @@ public AggParseContext(String name) { } } - public static final AggregatorFactories EMPTY = new AggregatorFactories(null, new AggregatorFactory[0], - new ArrayList()); + public static final AggregatorFactories EMPTY = new AggregatorFactories(null, new AggregatorFactory[0], new ArrayList<>()); private AggregatorFactory parent; private AggregatorFactory[] factories; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index eafdbe109776b..b525fd32d918a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -61,7 +61,7 @@ public ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsum /** * Returns true iff the current reduce phase is the final reduce phase. This indicates if operations like * pipeline aggregations should be applied or if specific features like {@code minDocCount} should be taken into account. - * Operations that are potentially loosing information can only be applied during the final reduce phase. + * Operations that are potentially losing information can only be applied during the final reduce phase. */ public boolean isFinalReduce() { return isFinalReduce; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 95140b50d2bdf..70135c2d51e73 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -18,17 +18,22 @@ */ package org.elasticsearch.search.aggregations; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import static java.util.Collections.emptyMap; @@ -48,23 +53,56 @@ public final class InternalAggregations extends Aggregations implements Streamab } }; + private List topLevelPipelineAggregators = Collections.emptyList(); + private InternalAggregations() { } /** - * Constructs a new addAggregation. + * Constructs a new aggregation. */ public InternalAggregations(List aggregations) { super(aggregations); } /** - * Reduces the given lists of addAggregation. - * - * @param aggregationsList A list of aggregation to reduce - * @return The reduced addAggregation + * Constructs a new aggregation providing its {@link InternalAggregation}s and {@link SiblingPipelineAggregator}s + */ + public InternalAggregations(List aggregations, List topLevelPipelineAggregators) { + super(aggregations); + this.topLevelPipelineAggregators = Objects.requireNonNull(topLevelPipelineAggregators); + } + + /** + * Returns the top-level pipeline aggregators. + * Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they + * become part of the list of {@link InternalAggregation}s. */ - public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { + List getTopLevelPipelineAggregators() { + return topLevelPipelineAggregators; + } + + /** + * Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first + * {@link InternalAggregations} object found in the list. + * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. + */ + public static InternalAggregations reduce(List aggregationsList, + ReduceContext context) { + if (aggregationsList.isEmpty()) { + return null; + } + InternalAggregations first = aggregationsList.get(0); + return reduce(aggregationsList, first.topLevelPipelineAggregators, context); + } + + /** + * Reduces the given list of aggregations as well as the provided top-level pipeline aggregators. + * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. + */ + public static InternalAggregations reduce(List aggregationsList, + List topLevelPipelineAggregators, + ReduceContext context) { if (aggregationsList.isEmpty()) { return null; } @@ -89,7 +127,15 @@ public static InternalAggregations reduce(List aggregation InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand reducedAggregations.add(first.reduce(aggregations, context)); } - return new InternalAggregations(reducedAggregations); + + if (context.isFinalReduce()) { + for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) { + InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context); + reducedAggregations.add(newAgg); + } + return new InternalAggregations(reducedAggregations); + } + return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators); } public static InternalAggregations readAggregations(StreamInput in) throws IOException { @@ -104,11 +150,20 @@ public void readFrom(StreamInput in) throws IOException { if (aggregations.isEmpty()) { aggregationsAsMap = emptyMap(); } + if (in.getVersion().onOrAfter(Version.V_6_7_0)) { + this.topLevelPipelineAggregators = in.readList( + stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class)); + } else { + this.topLevelPipelineAggregators = Collections.emptyList(); + } } @Override @SuppressWarnings("unchecked") public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteableList((List)aggregations); + if (out.getVersion().onOrAfter(Version.V_6_7_0)) { + out.writeNamedWriteableList(topLevelPipelineAggregators); + } } } diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 5801f20780b7f..26b9765f168f7 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -35,10 +35,11 @@ import org.elasticsearch.search.suggest.Suggest; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; -import static java.util.Collections.emptyList; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; @@ -50,7 +51,7 @@ public final class QuerySearchResult extends SearchPhaseResult { private DocValueFormat[] sortValueFormats; private InternalAggregations aggregations; private boolean hasAggs; - private List pipelineAggregators; + private List pipelineAggregators = Collections.emptyList(); private Suggest suggest; private boolean searchTimedOut; private Boolean terminatedEarly = null; @@ -79,7 +80,6 @@ public QuerySearchResult queryResult() { return this; } - public void searchTimedOut(boolean searchTimedOut) { this.searchTimedOut = searchTimedOut; } @@ -203,7 +203,7 @@ public List pipelineAggregators() { } public void pipelineAggregators(List pipelineAggregators) { - this.pipelineAggregators = pipelineAggregators; + this.pipelineAggregators = Objects.requireNonNull(pipelineAggregators); } public Suggest suggest() { @@ -337,7 +337,7 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); aggregations.writeTo(out); } - out.writeNamedWriteableList(pipelineAggregators == null ? emptyList() : pipelineAggregators); + out.writeNamedWriteableList(pipelineAggregators); if (suggest == null) { out.writeBoolean(false); } else { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 5f2aaca83131a..bca7e19b1134c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -331,7 +331,7 @@ public void testBuildClusters() { assertEquals(successful, clusters.getSuccessful()); assertEquals(skipped, clusters.getSkipped()); } - + private static OriginalIndices randomOriginalIndices() { int numLocalIndices = randomIntBetween(0, 5); String[] localIndices = new String[numLocalIndices]; diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java new file mode 100644 index 0000000000000..6c99984a0aab6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -0,0 +1,178 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.StringTermsTests; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests; +import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregationBuilder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; +import org.hamcrest.Matchers; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.List; + +public class InternalAggregationsTests extends ESTestCase { + + private final NamedWriteableRegistry registry = new NamedWriteableRegistry( + new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedWriteables()); + + public void testReduceEmptyAggs() { + List aggs = Collections.emptyList(); + InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean()); + assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext)); + } + + public void testNonFinalReduceTopLevelPipelineAggs() throws IOException { + InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), + 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); + List aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms))); + List topLevelPipelineAggs = new ArrayList<>(); + MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); + topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create()); + InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false); + InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext); + assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); + assertEquals(1, reducedAggs.aggregations.size()); + } + + public void testFinalReduceTopLevelPipelineAggs() throws IOException { + InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), + 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); + + MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); + SiblingPipelineAggregator siblingPipelineAggregator = (SiblingPipelineAggregator) maxBucketPipelineAggregationBuilder.create(); + InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, true); + final InternalAggregations reducedAggs; + if (randomBoolean()) { + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), + Collections.singletonList(siblingPipelineAggregator)); + reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); + } else { + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms)); + List topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator); + reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext); + } + assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); + assertEquals(2, reducedAggs.aggregations.size()); + } + + public void testSerialization() throws Exception { + List aggsList = new ArrayList<>(); + if (randomBoolean()) { + StringTermsTests stringTermsTests = new StringTermsTests(); + stringTermsTests.init(); + stringTermsTests.setUp(); + aggsList.add(stringTermsTests.createTestInstance()); + } + if (randomBoolean()) { + InternalDateHistogramTests dateHistogramTests = new InternalDateHistogramTests(); + dateHistogramTests.setUp(); + aggsList.add(dateHistogramTests.createTestInstance()); + } + if (randomBoolean()) { + InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests(); + aggsList.add(simpleValueTests.createTestInstance()); + } + List topLevelPipelineAggs = new ArrayList<>(); + if (randomBoolean()) { + if (randomBoolean()) { + topLevelPipelineAggs.add((SiblingPipelineAggregator)new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); + } + if (randomBoolean()) { + topLevelPipelineAggs.add((SiblingPipelineAggregator)new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create()); + } + if (randomBoolean()) { + topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); + } + } + InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs); + writeToAndReadFrom(aggregations, 0); + } + + private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException { + Version version = VersionUtils.randomVersion(random()); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + aggregations.writeTo(out); + try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytesRef().bytes), registry)) { + in.setVersion(version); + InternalAggregations deserialized = InternalAggregations.readAggregations(in); + assertEquals(aggregations.aggregations, deserialized.aggregations); + if (aggregations.getTopLevelPipelineAggregators() == null) { + assertEquals(0, deserialized.getTopLevelPipelineAggregators().size()); + } else { + if (version.before(Version.V_6_7_0)) { + assertEquals(0, deserialized.getTopLevelPipelineAggregators().size()); + } else { + assertEquals(aggregations.getTopLevelPipelineAggregators().size(), + deserialized.getTopLevelPipelineAggregators().size()); + for (int i = 0; i < aggregations.getTopLevelPipelineAggregators().size(); i++) { + SiblingPipelineAggregator siblingPipelineAggregator1 = aggregations.getTopLevelPipelineAggregators().get(i); + SiblingPipelineAggregator siblingPipelineAggregator2 = deserialized.getTopLevelPipelineAggregators().get(i); + assertArrayEquals(siblingPipelineAggregator1.bucketsPaths(), siblingPipelineAggregator2.bucketsPaths()); + assertEquals(siblingPipelineAggregator1.name(), siblingPipelineAggregator2.name()); + } + } + } + if (iteration < 2) { + //serialize this enough times to make sure that we are able to write again what we read + writeToAndReadFrom(deserialized, iteration + 1); + } + } + } + } + + public void testSerializationFromPre_6_7_0() throws IOException { + String aggsString = "AwZzdGVybXMFb0F0Q0EKCQVsZG5ncgAFeG56RWcFeUFxVmcABXBhQVVpBUtYc2VIAAVaclRESwVqUkxySAAFelp5d1AFRUREcEYABW1" + + "sckF0BU5wWWVFAAVJYVJmZgVURlJVbgAFT0RiU04FUWNwSVoABU1sb09HBUNzZHFlAAVWWmJHaQABAwGIDgNyYXcFAQAADmRhdGVfaGlzdG9ncmFt" + + "BVhHbVl4/wADAAKAurcDA1VUQwABAQAAAWmOhukAAQAAAWmR9dEAAAAAAAAAAAAAAANyYXcACAAAAWmQrDoAUQAAAAFpkRoXAEMAAAABaZGH9AAtA" + + "AAAAWmR9dEAJwAAAAFpkmOuAFwAAAABaZLRiwAYAAAAAWmTP2gAKgAAAAFpk61FABsADHNpbXBsZV92YWx1ZQVsWVNLVv8AB2RlY2ltYWwGIyMjLi" + + "MjQLZWZVy5zBYAAAAAAAAAAAAAAAAAAAAAAAAA"; + + byte[] aggsBytes = Base64.getDecoder().decode(aggsString); + try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(aggsBytes), registry)) { + in.setVersion(VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), + Version.max(Version.CURRENT.minimumCompatibilityVersion(), VersionUtils.getPreviousVersion(Version.CURRENT)))); + InternalAggregations deserialized = InternalAggregations.readAggregations(in); + assertEquals(3, deserialized.aggregations.size()); + assertThat(deserialized.aggregations.get(0), Matchers.instanceOf(StringTerms.class)); + assertThat(deserialized.aggregations.get(1), Matchers.instanceOf(InternalDateHistogram.class)); + assertThat(deserialized.aggregations.get(2), Matchers.instanceOf(InternalSimpleValue.class)); + } + } +}