From 1c41a9c07b764090260b82677dbb5503dbded1a3 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Wed, 11 Nov 2020 11:48:28 +0200 Subject: [PATCH 01/10] Adds a minimum version parameter to the SearchRequest, not exposed to REST. The minimum version helps failing a request if any shards involved in the search do not meet the compatibility requirements (all shards need to have a version equal or later than the minimum version provided). --- .../elasticsearch/ElasticsearchException.java | 7 +- .../search/AbstractSearchAsyncAction.java | 20 ++ .../SearchQueryThenFetchAsyncAction.java | 10 +- .../action/search/SearchRequest.java | 27 +- .../search/VersionMismatchException.java | 37 +++ .../ExceptionSerializationTests.java | 4 +- .../SearchQueryThenFetchAsyncActionTests.java | 305 +++++++++++++++++- 7 files changed, 402 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index b8a04cccdf13b..dccc11e1f9e4d 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1047,7 +1047,12 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.transport.NoSeedNodeLeftException.class, org.elasticsearch.transport.NoSeedNodeLeftException::new, 160, - Version.V_7_10_0); + Version.V_7_10_0), + VERSION_MISMATCH_EXCEPTION( + org.elasticsearch.action.search.VersionMismatchException.class, + org.elasticsearch.action.search.VersionMismatchException::new, + 161, + Version.V_7_11_0); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index b580ba0704b64..c503ce5b4e6cb 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -210,6 +210,10 @@ public final void run() { throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY); } } + if (checkMinimumVersion(shardsIts) == false) { + throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", + request.minVersion()); + } for (int index = 0; index < shardsIts.size(); index++) { final SearchShardIterator shardRoutings = shardsIts.get(index); assert shardRoutings.skip() == false; @@ -225,6 +229,22 @@ void skipShard(SearchShardIterator iterator) { successfulShardExecution(iterator); } + + private boolean checkMinimumVersion(GroupShardsIterator shardsIts) { + for (SearchShardIterator it : shardsIts) { + if (it.getTargetNodeIds().isEmpty() == false) { + boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> { + Transport.Connection conn = getConnection(it.getClusterAlias(), nodeId); + return conn == null ? true : conn.getVersion().onOrAfter(request.minVersion()); + }); + if (isCompatible == false) { + return false; + } + } + } + return true; + } + private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { /* * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 79f5e5ca9571e..74a07684b1ee8 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.TopFieldDocs; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -49,6 +50,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction nodeIdToConnection, final Map aliasFilter, @@ -66,6 +69,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener) { ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt)); - getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener); + Transport.Connection conn = getConnection(shard.getClusterAlias(), shard.getNodeId()); + if (minVersion != null && conn.getVersion().before(minVersion)) { + throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion); + } + getSearchTransport().sendExecuteQuery(conn, request, getTask(), listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index e6e07dbc67904..f4fefd8eba242 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -94,6 +94,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private boolean ccsMinimizeRoundtrips = true; + private Version minVersion = Version.CURRENT.minimumCompatibilityVersion(); + public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled(); @@ -157,6 +159,16 @@ static SearchRequest subSearchRequest(SearchRequest originalSearchRequest, Strin return new SearchRequest(originalSearchRequest, indices, clusterAlias, absoluteStartMillis, finalReduce); } + public static SearchRequest withMinimumVersion(SearchRequest searchRequest, Version minVersion) { + return new SearchRequest(searchRequest, minVersion); + } + + private SearchRequest(SearchRequest searchRequest, Version minVersion) { + this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, + searchRequest.absoluteStartMillis, searchRequest.finalReduce); + this.minVersion = minVersion; + } + private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis, boolean finalReduce) { this.allowPartialSearchResults = searchRequest.allowPartialSearchResults; @@ -214,6 +226,9 @@ public SearchRequest(StreamInput in) throws IOException { finalReduce = true; } ccsMinimizeRoundtrips = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_7_11_0)) { + minVersion = Version.readVersion(in); + } } @Override @@ -241,7 +256,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(finalReduce); } out.writeBoolean(ccsMinimizeRoundtrips); - + if (out.getVersion().onOrAfter(Version.V_7_11_0)) { + Version.writeVersion(minVersion, out); + } } @Override @@ -319,6 +336,9 @@ long getAbsoluteStartMillis() { return absoluteStartMillis; } + Version minVersion() { + return minVersion; + } /** * Sets the indices the search will be executed on. */ @@ -656,14 +676,15 @@ public boolean equals(Object o) { Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) && Objects.equals(localClusterAlias, that.localClusterAlias) && absoluteStartMillis == that.absoluteStartMillis && - ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips; + ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips && + Objects.equals(minVersion, that.minVersion); } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, scroll, indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, - allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips); + allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, minVersion); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java b/server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java new file mode 100644 index 0000000000000..5527ea28c37ff --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public class VersionMismatchException extends ElasticsearchException { + + public VersionMismatchException(String msg, Object... args) { + super(msg, args); + } + + public VersionMismatchException(StreamInput in) throws IOException { + super(in); + } + +} diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 9ef23656547b0..0d6f3a80e9a9e 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -29,12 +29,14 @@ import org.elasticsearch.action.TimestampParsingException; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.search.VersionMismatchException; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.client.AbstractClientHeadersTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; import org.elasticsearch.cluster.coordination.NoMasterBlockService; +import org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IllegalShardRoutingStateException; import org.elasticsearch.cluster.routing.ShardRouting; @@ -72,7 +74,6 @@ import org.elasticsearch.indices.recovery.PeerRecoveryNotFound; import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException; import org.elasticsearch.ingest.IngestProcessorException; -import org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException; @@ -831,6 +832,7 @@ public void testIds() { ids.put(158, PeerRecoveryNotFound.class); ids.put(159, NodeHealthCheckFailureException.class); ids.put(160, NoSeedNodeLeftException.class); + ids.put(161, VersionMismatchException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 47a1b3f618d42..89aab2b0794f6 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -25,15 +25,20 @@ import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.grouping.CollapseTopFieldDocs; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchPhaseResult; @@ -47,9 +52,12 @@ import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.transport.Transport; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -57,6 +65,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static java.util.Collections.singletonList; +import static org.elasticsearch.test.VersionUtils.allVersions; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -79,8 +89,8 @@ private void testCase(boolean withScroll, boolean withCollapse) throws Exception new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); Map lookup = new ConcurrentHashMap<>(); - DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); - DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode primaryNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode replicaNode = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT); lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); @@ -198,4 +208,295 @@ public void run() { assertThat(((FieldDoc) phase.sortedTopDocs.scoreDocs[0]).fields.length, equalTo(1)); assertThat(((FieldDoc) phase.sortedTopDocs.scoreDocs[0]).fields[0], equalTo(0)); } + + public void testMinimumVersionSameAsNewVersion() throws Exception { + Version newVersion = Version.CURRENT; + Version oldVersion = VersionUtils.randomPreviousCompatibleVersion(random(), newVersion); + testMixedVersionsShardsSearch(newVersion, oldVersion, newVersion); + } + + public void testMinimumVersionBetweenNewAndOldVersion() throws Exception { + Version oldVersion = VersionUtils.getFirstVersion(); + Version newVersion = VersionUtils.maxCompatibleVersion(oldVersion); + Version minVersion = VersionUtils.randomVersionBetween(random(), + allVersions().get(allVersions().indexOf(oldVersion) + 1), newVersion); + testMixedVersionsShardsSearch(newVersion, oldVersion, minVersion); + } + + private void testMixedVersionsShardsSearch(Version oldVersion, Version newVersion, Version minVersion) throws Exception { + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + int numConcurrent = randomIntBetween(1, 4); + + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode newVersionNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), newVersion); + DiscoveryNode oldVersionNode = new DiscoveryNode("node2", buildNewFakeTransportAddress(), oldVersion); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(newVersionNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(oldVersionNode)); + + OriginalIndices idx = new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS); + ArrayList list = new ArrayList<>(); + ShardRouting routingNewVersionShard = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 0), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingNewVersionShard = routingNewVersionShard.initialize(newVersionNode.getId(), "p0", 0); + routingNewVersionShard.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 0), singletonList(routingNewVersionShard), idx)); + + ShardRouting routingOldVersionShard = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 1), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p1", 0); + routingOldVersionShard.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingOldVersionShard), idx)); + + GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.setMaxConcurrentShardRequests(numConcurrent); + searchRequest.setBatchedReduceSize(2); + searchRequest.source(new SearchSourceBuilder().size(1)); + searchRequest.allowPartialSearchResults(false); + final SearchRequest withMinVersionSearchRequest = SearchRequest.withMinimumVersion(searchRequest, minVersion); + + Executor executor = EsExecutors.newDirectExecutorService(); + SearchTransportService searchTransportService = new SearchTransportService(null, null, null); + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(withMinVersionSearchRequest, executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), + shardsIter.size(), exc -> {}); + final List responses = new ArrayList<>(); + SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, + searchTransportService, (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), Collections.emptyMap(), controller, executor, + resultConsumer, withMinVersionSearchRequest, new ActionListener() { + @Override + public void onFailure(Exception e) { + responses.add(e); + } + public void onResponse(SearchResponse response) { + responses.add(response); + }; + }, shardsIter, timeProvider, null, + task, SearchResponse.Clusters.EMPTY); + + newSearchAsyncAction.start(); + assertEquals(1, responses.size()); + assertTrue(responses.get(0) instanceof SearchPhaseExecutionException); + SearchPhaseExecutionException e = (SearchPhaseExecutionException) responses.get(0); + assertTrue(e.getCause() instanceof VersionMismatchException); + assertThat(e.getCause().getMessage(), + equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")); + } + + public void testMinimumVersionSameAsOldVersion() throws Exception { + Version newVersion = Version.CURRENT; + Version oldVersion = VersionUtils.randomPreviousCompatibleVersion(random(), newVersion); + Version minVersion = oldVersion; + + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + AtomicInteger successfulOps = new AtomicInteger(); + + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode newVersionNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), newVersion); + DiscoveryNode oldVersionNode = new DiscoveryNode("node2", buildNewFakeTransportAddress(), oldVersion); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(newVersionNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(oldVersionNode)); + + OriginalIndices idx = new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS); + ArrayList list = new ArrayList<>(); + ShardRouting routingNewVersionShard = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 0), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingNewVersionShard = routingNewVersionShard.initialize(newVersionNode.getId(), "p0", 0); + routingNewVersionShard.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 0), singletonList(routingNewVersionShard), idx)); + + ShardRouting routingOldVersionShard = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 1), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p1", 0); + routingOldVersionShard.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingOldVersionShard), idx)); + + GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.allowPartialSearchResults(false); + searchRequest.source(new SearchSourceBuilder() + .size(1) + .sort(SortBuilders.fieldSort("timestamp"))); + final SearchRequest withMinVersionSearchRequest = SearchRequest.withMinimumVersion(searchRequest, minVersion); + + Executor executor = EsExecutors.newDirectExecutorService(); + SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { + @Override + public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest request, + SearchTask task, SearchActionListener listener) { + int shardId = request.shardId().id(); + QuerySearchResult queryResult = new QuerySearchResult(new ShardSearchContextId("N/A", 123), + new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null, OriginalIndices.NONE), null); + SortField sortField = new SortField("timestamp", SortField.Type.LONG); + if (shardId == 0) { + queryResult.topDocs(new TopDocsAndMaxScore(new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[]{ + new FieldDoc(randomInt(1000), Float.NaN, new Object[]{shardId}) + }, new SortField[]{sortField}), Float.NaN), + new DocValueFormat[]{DocValueFormat.RAW}); + } else if (shardId == 1) { + queryResult.topDocs(new TopDocsAndMaxScore(new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[]{ + new FieldDoc(randomInt(1000), Float.NaN, new Object[]{shardId}) + }, new SortField[]{sortField}), Float.NaN), + new DocValueFormat[]{DocValueFormat.RAW}); + } + queryResult.from(0); + queryResult.size(1); + successfulOps.incrementAndGet(); + new Thread(() -> listener.onResponse(queryResult)).start(); + } + }; + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(withMinVersionSearchRequest, executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), + shardsIter.size(), exc -> {}); + CountDownLatch latch = new CountDownLatch(1); + SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, + searchTransportService, (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), Collections.emptyMap(), controller, executor, + resultConsumer, withMinVersionSearchRequest, null, shardsIter, timeProvider, null, + task, SearchResponse.Clusters.EMPTY) { + @Override + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new SearchPhase("test") { + @Override + public void run() { + latch.countDown(); + } + }; + } + }; + + action.start(); + latch.await(); + assertThat(successfulOps.get(), equalTo(2)); + SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); + assertThat(phase.numReducePhases, greaterThanOrEqualTo(1)); + assertThat(phase.totalHits.value, equalTo(2L)); + assertThat(phase.totalHits.relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)); + } + + public void testMinimumVersionShardDuringPhaseExecution() throws Exception { + Version newVersion = Version.CURRENT; + Version oldVersion = VersionUtils.randomPreviousCompatibleVersion(random(), newVersion); + Version minVersion = newVersion; + + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + AtomicInteger successfulOps = new AtomicInteger(); + + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode newVersionNode1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), newVersion); + DiscoveryNode newVersionNode2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), newVersion); + DiscoveryNode oldVersionNode = new DiscoveryNode("node3", buildNewFakeTransportAddress(), oldVersion); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(newVersionNode1)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(newVersionNode2)); + lookup.put("node3", new SearchAsyncActionTests.MockConnection(oldVersionNode)); + + OriginalIndices idx = new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS); + ArrayList list = new ArrayList<>(); + ShardRouting routingNewVersionShard1 = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 0), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingNewVersionShard1 = routingNewVersionShard1.initialize(newVersionNode1.getId(), "p0", 0); + routingNewVersionShard1.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 0), singletonList(routingNewVersionShard1), idx)); + + ShardRouting routingNewVersionShard2 = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 1), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingNewVersionShard2 = routingNewVersionShard2.initialize(newVersionNode2.getId(), "p1", 0); + routingNewVersionShard2.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingNewVersionShard2), idx)); + + GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.allowPartialSearchResults(false); + searchRequest.source(new SearchSourceBuilder() + .size(1) + .sort(SortBuilders.fieldSort("timestamp"))); + final SearchRequest withMinVersionSearchRequest = SearchRequest.withMinimumVersion(searchRequest, minVersion); + + Executor executor = EsExecutors.newDirectExecutorService(); + SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { + @Override + public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest request, + SearchTask task, SearchActionListener listener) { + int shardId = request.shardId().id(); + QuerySearchResult queryResult = new QuerySearchResult(new ShardSearchContextId("N/A", 123), + new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null, OriginalIndices.NONE), null); + SortField sortField = new SortField("timestamp", SortField.Type.LONG); + if (shardId == 0) { + queryResult.topDocs(new TopDocsAndMaxScore(new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[]{ + new FieldDoc(randomInt(1000), Float.NaN, new Object[]{shardId}) + }, new SortField[]{sortField}), Float.NaN), + new DocValueFormat[]{DocValueFormat.RAW}); + } else if (shardId == 1) { + queryResult.topDocs(new TopDocsAndMaxScore(new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[]{ + new FieldDoc(randomInt(1000), Float.NaN, new Object[]{shardId}) + }, new SortField[]{sortField}), Float.NaN), + new DocValueFormat[]{DocValueFormat.RAW}); + } + queryResult.from(0); + queryResult.size(1); + successfulOps.incrementAndGet(); + new Thread(() -> listener.onResponse(queryResult)).start(); + } + }; + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(withMinVersionSearchRequest, executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), + shardsIter.size(), exc -> {}); + CountDownLatch latch = new CountDownLatch(1); + SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, + searchTransportService, (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), Collections.emptyMap(), controller, executor, + resultConsumer, withMinVersionSearchRequest, null, shardsIter, timeProvider, null, + task, SearchResponse.Clusters.EMPTY) { + @Override + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new SearchPhase("test") { + @Override + public void run() { + latch.countDown(); + } + }; + } + }; + ShardRouting routingOldVersionShard = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 2), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + SearchShardIterator shardIt = new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 2), + singletonList(routingOldVersionShard), idx); + routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p2", 0); + routingOldVersionShard.started(); + action.start(); + latch.await(); + assertThat(successfulOps.get(), equalTo(2)); + SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); + assertThat(phase.numReducePhases, greaterThanOrEqualTo(1)); + assertThat(phase.totalHits.value, equalTo(2L)); + assertThat(phase.totalHits.relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)); + + Exception e = expectThrows(VersionMismatchException.class, () -> action.executePhaseOnShard(shardIt, new SearchShardTarget("node3", + shardIt.shardId(), null, OriginalIndices.NONE), null)); + assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")); + } } From 2bcb4aad4ed2c63f3fe23017da5fdd6b3bb9df88 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 8 Dec 2020 00:58:45 +0200 Subject: [PATCH 02/10] Address reviews --- .../org/elasticsearch/ElasticsearchException.java | 2 +- .../action/search/AbstractSearchAsyncAction.java | 15 +++++++++++---- .../search/SearchQueryThenFetchAsyncAction.java | 10 +--------- .../action/search/SearchRequest.java | 4 ++-- 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index dccc11e1f9e4d..6f190e91603b2 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1052,7 +1052,7 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.action.search.VersionMismatchException.class, org.elasticsearch.action.search.VersionMismatchException::new, 161, - Version.V_7_11_0); + Version.V_8_0_0); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index c503ce5b4e6cb..87a8c520078aa 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -210,9 +210,11 @@ public final void run() { throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY); } } - if (checkMinimumVersion(shardsIts) == false) { - throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", - request.minVersion()); + if (Version.CURRENT.minimumCompatibilityVersion().equals(request.minVersion()) == false) { + if (checkMinimumVersion(shardsIts) == false) { + throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", + request.minVersion()); + } } for (int index = 0; index < shardsIts.size(); index++) { final SearchShardIterator shardRoutings = shardsIts.get(index); @@ -652,7 +654,12 @@ final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() @Override public final Transport.Connection getConnection(String clusterAlias, String nodeId) { - return nodeIdToConnection.apply(clusterAlias, nodeId); + Transport.Connection conn = nodeIdToConnection.apply(clusterAlias, nodeId); + Version minVersion = request.minVersion(); + if (minVersion != null && conn != null && conn.getVersion().before(minVersion)) { + throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion); + } + return conn; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 74a07684b1ee8..79f5e5ca9571e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -21,7 +21,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.TopFieldDocs; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -50,8 +49,6 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction nodeIdToConnection, final Map aliasFilter, @@ -69,7 +66,6 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener) { ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt)); - Transport.Connection conn = getConnection(shard.getClusterAlias(), shard.getNodeId()); - if (minVersion != null && conn.getVersion().before(minVersion)) { - throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion); - } - getSearchTransport().sendExecuteQuery(conn, request, getTask(), listener); + getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index f4fefd8eba242..ffe517ac65d82 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -226,7 +226,7 @@ public SearchRequest(StreamInput in) throws IOException { finalReduce = true; } ccsMinimizeRoundtrips = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.V_7_11_0)) { + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { minVersion = Version.readVersion(in); } } @@ -256,7 +256,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(finalReduce); } out.writeBoolean(ccsMinimizeRoundtrips); - if (out.getVersion().onOrAfter(Version.V_7_11_0)) { + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { Version.writeVersion(minVersion, out); } } From 5410f2612ff331afa3c06fc9ca96afa40899072a Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 8 Dec 2020 01:10:47 +0200 Subject: [PATCH 03/10] Adjustment after merge --- .../SearchQueryThenFetchAsyncActionTests.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 0a61344b9e05c..397df2c83a077 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -268,7 +268,7 @@ private void testMixedVersionsShardsSearch(Version oldVersion, Version newVersio SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), controller, executor, + Collections.emptyMap(), controller, executor, resultConsumer, withMinVersionSearchRequest, new ActionListener() { @Override public void onFailure(Exception e) { @@ -277,8 +277,7 @@ public void onFailure(Exception e) { public void onResponse(SearchResponse response) { responses.add(response); }; - }, shardsIter, timeProvider, null, - task, SearchResponse.Clusters.EMPTY); + }, shardsIter, timeProvider, null, task, SearchResponse.Clusters.EMPTY); newSearchAsyncAction.start(); assertEquals(1, responses.size()); @@ -366,7 +365,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), controller, executor, + Collections.emptyMap(), controller, executor, resultConsumer, withMinVersionSearchRequest, null, shardsIter, timeProvider, null, task, SearchResponse.Clusters.EMPTY) { @Override @@ -468,7 +467,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), controller, executor, + Collections.emptyMap(), controller, executor, resultConsumer, withMinVersionSearchRequest, null, shardsIter, timeProvider, null, task, SearchResponse.Clusters.EMPTY) { @Override @@ -495,8 +494,15 @@ public void run() { assertThat(phase.totalHits.value, equalTo(2L)); assertThat(phase.totalHits.relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)); - Exception e = expectThrows(VersionMismatchException.class, () -> action.executePhaseOnShard(shardIt, new SearchShardTarget("node3", - shardIt.shardId(), null, OriginalIndices.NONE), null)); + SearchShardTarget searchShardTarget = new SearchShardTarget("node3", shardIt.shardId(), null, OriginalIndices.NONE); + SearchActionListener listener = new SearchActionListener(searchShardTarget, 0) { + @Override + public void onFailure(Exception e) { } + + @Override + protected void innerOnResponse(SearchPhaseResult response) { } + }; + Exception e = expectThrows(VersionMismatchException.class, () -> action.executePhaseOnShard(shardIt, searchShardTarget, listener)); assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")); } } From c262054708d2ed0b343a0b15b7aa91731c1e6d9c Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Wed, 9 Dec 2020 14:31:45 +0200 Subject: [PATCH 04/10] Add min_compatible_shard_node as request parameter --- .../elasticsearch/backwards/IndexingIT.java | 12 +- .../SearchWithMinCompatibleSearchNodeIT.java | 135 ++++++++++++++++++ .../resources/rest-api-spec/api/search.json | 4 + .../search/AbstractSearchAsyncAction.java | 9 +- .../action/search/SearchRequest.java | 47 +++--- .../rest/action/search/RestSearchAction.java | 5 + .../search/MultiSearchRequestTests.java | 3 + .../SearchQueryThenFetchAsyncActionTests.java | 18 +-- .../action/search/SearchRequestTests.java | 13 ++ .../search/RandomSearchRequestGenerator.java | 7 + 10 files changed, 219 insertions(+), 34 deletions(-) create mode 100644 qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 0837a6377c323..59021ce0031fc 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -400,7 +400,11 @@ private List buildShards(String index, Nodes nodes, RestClient client) th } private Nodes buildNodeAndVersions() throws IOException { - Response response = client().performRequest(new Request("GET", "_nodes")); + return buildNodeAndVersions(client()); + } + + static Nodes buildNodeAndVersions(RestClient client) throws IOException { + Response response = client.performRequest(new Request("GET", "_nodes")); ObjectPath objectPath = ObjectPath.createFromResponse(response); Map nodesAsMap = objectPath.evaluate("nodes"); Nodes nodes = new Nodes(); @@ -411,12 +415,12 @@ private Nodes buildNodeAndVersions() throws IOException { Version.fromString(objectPath.evaluate("nodes." + id + ".version")), HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address")))); } - response = client().performRequest(new Request("GET", "_cluster/state")); + response = client.performRequest(new Request("GET", "_cluster/state")); nodes.setMasterNodeId(ObjectPath.createFromResponse(response).evaluate("master_node")); return nodes; } - final class Nodes extends HashMap { + final static class Nodes extends HashMap { private String masterNodeId = null; @@ -469,7 +473,7 @@ public String toString() { } } - final class Node { + final static class Node { private final String id; private final String nodeName; private final Version version; diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java new file mode 100644 index 0000000000000..054557cd90d1e --- /dev/null +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java @@ -0,0 +1,135 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.backwards; + +import org.apache.http.HttpHost; +import org.elasticsearch.Version; +import org.elasticsearch.backwards.IndexingIT.Node; +import org.elasticsearch.backwards.IndexingIT.Nodes; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class SearchWithMinCompatibleSearchNodeIT extends ESRestTestCase { + + private static String index = "test_min_version"; + private static int numShards; + private static int numReplicas = 1; + private static int numDocs; + private static Nodes nodes; + private static List allNodes; + private static Version bwcVersion; + private static Version newVersion; + + @Before + public void prepareTestData() throws IOException { + nodes = IndexingIT.buildNodeAndVersions(client()); + numShards = nodes.size(); + numDocs = randomIntBetween(numShards, 16); + allNodes = new ArrayList<>(); + allNodes.addAll(nodes.getBWCNodes()); + allNodes.addAll(nodes.getNewNodes()); + bwcVersion = nodes.getBWCNodes().get(0).getVersion(); + newVersion = nodes.getNewNodes().get(0).getVersion(); + + if (client().performRequest(new Request("HEAD", "/" + index)).getStatusLine().getStatusCode() == 404) { + createIndex(index, Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), numShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build()); + for (int i = 0; i < numDocs; i++) { + Request request = new Request("PUT", index + "/_doc/" + i); + request.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(2) + "\"}"); + assertOK(client().performRequest(request)); + } + ensureGreen(index); + } + } + + public void testMinVersionAsNewVersion() throws Exception { + try (RestClient client = buildClient(restClientSettings(), + allNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + Request newVersionRequest = new Request("POST", + index + "/_search?min_compatible_shard_node=" + newVersion + "&ccs_minimize_roundtrips=false"); + assertBusy(() -> { + ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(newVersionRequest)); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), + equalTo(RestStatus.INTERNAL_SERVER_ERROR.getStatus())); + assertThat(responseException.getMessage(), + containsString("{\"error\":{\"root_cause\":[],\"type\":\"search_phase_execution_exception\"")); + assertThat(responseException.getMessage(), containsString("caused_by\":{\"type\":\"version_mismatch_exception\"," + + "\"reason\":\"One of the shards is incompatible with the required minimum version [" + newVersion + "]\"")); + }); + } + } + + public void testMinVersionAsOldVersion() throws Exception { + try (RestClient client = buildClient(restClientSettings(), + allNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + Request oldVersionRequest = new Request("POST", index + "/_search?min_compatible_shard_node=" + bwcVersion + + "&ccs_minimize_roundtrips=false"); + oldVersionRequest.setJsonEntity("{\"query\":{\"match_all\":{}},\"_source\":false}"); + assertBusy(() -> { + Response response = client.performRequest(oldVersionRequest); + ObjectPath responseObject = ObjectPath.createFromResponse(response); + Map shardsResult = responseObject.evaluate("_shards"); + assertThat(shardsResult.get("total"), equalTo(numShards)); + assertThat(shardsResult.get("successful"), equalTo(numShards)); + assertThat(shardsResult.get("failed"), equalTo(0)); + Map hitsResult = responseObject.evaluate("hits.total"); + assertThat(hitsResult.get("value"), equalTo(numDocs)); + assertThat(hitsResult.get("relation"), equalTo("eq")); + }); + } + } + + public void testCcsMinimizeRoundtripsIsFalse() throws Exception { + try (RestClient client = buildClient(restClientSettings(), + allNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + Version version = randomBoolean() ? newVersion : bwcVersion; + boolean shouldSetCcsMinimizeRoundtrips = randomBoolean(); + + Request request = new Request("POST", index + "/_search?min_compatible_shard_node=" + version + + (shouldSetCcsMinimizeRoundtrips ? "&ccs_minimize_roundtrips=true" : "")); + assertBusy(() -> { + ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request)); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), + equalTo(RestStatus.BAD_REQUEST.getStatus())); + assertThat(responseException.getMessage(), + containsString("{\"error\":{\"root_cause\":[{\"type\":\"action_request_validation_exception\"")); + assertThat(responseException.getMessage(), containsString("\"reason\":\"Validation Failed: 1: " + + "[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version;\"")); + }); + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index 5dfeb8a9fcbec..00dd8cb0d15e1 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -230,6 +230,10 @@ "type":"boolean", "description":"Indicates whether hits.total should be rendered as an integer or an object in the rest search response", "default":false + }, + "min_compatible_shard_node":{ + "type":"string", + "description":"The minimum compatible version that all shards involved in search should have for this request to be successful" } }, "body":{ diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 3f8e8fbf0c57f..10791c8688538 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -219,10 +219,11 @@ public final void run() { throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY); } } - if (Version.CURRENT.minimumCompatibilityVersion().equals(request.minVersion()) == false) { + Version version = request.minCompatibleShardNode(); + if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) { if (checkMinimumVersion(shardsIts) == false) { throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", - request.minVersion()); + request.minCompatibleShardNode()); } } for (int i = 0; i < shardsIts.size(); i++) { @@ -248,7 +249,7 @@ private boolean checkMinimumVersion(GroupShardsIterator sha if (it.getTargetNodeIds().isEmpty() == false) { boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> { Transport.Connection conn = getConnection(it.getClusterAlias(), nodeId); - return conn == null ? true : conn.getVersion().onOrAfter(request.minVersion()); + return conn == null ? true : conn.getVersion().onOrAfter(request.minCompatibleShardNode()); }); if (isCompatible == false) { return false; @@ -671,7 +672,7 @@ final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() @Override public final Transport.Connection getConnection(String clusterAlias, String nodeId) { Transport.Connection conn = nodeIdToConnection.apply(clusterAlias, nodeId); - Version minVersion = request.minVersion(); + Version minVersion = request.minCompatibleShardNode(); if (minVersion != null && conn != null && conn.getVersion().before(minVersion)) { throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 41c428a1b79f8..d0b937d46044b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -97,7 +97,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private boolean ccsMinimizeRoundtrips = true; - private Version minVersion = Version.CURRENT.minimumCompatibilityVersion(); + @Nullable + private Version minCompatibleShardNode; public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled(); @@ -166,16 +167,6 @@ static SearchRequest subSearchRequest(TaskId parentTaskId, SearchRequest origina return request; } - public static SearchRequest withMinimumVersion(SearchRequest searchRequest, Version minVersion) { - return new SearchRequest(searchRequest, minVersion); - } - - private SearchRequest(SearchRequest searchRequest, Version minVersion) { - this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, - searchRequest.absoluteStartMillis, searchRequest.finalReduce); - this.minVersion = minVersion; - } - private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis, boolean finalReduce) { this.allowPartialSearchResults = searchRequest.allowPartialSearchResults; @@ -194,6 +185,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca this.localClusterAlias = localClusterAlias; this.absoluteStartMillis = absoluteStartMillis; this.finalReduce = finalReduce; + this.minCompatibleShardNode = searchRequest.minCompatibleShardNode; } /** @@ -234,7 +226,9 @@ public SearchRequest(StreamInput in) throws IOException { } ccsMinimizeRoundtrips = in.readBoolean(); if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - minVersion = Version.readVersion(in); + if (in.readBoolean()) { + minCompatibleShardNode = Version.readVersion(in); + } } } @@ -264,7 +258,10 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeBoolean(ccsMinimizeRoundtrips); if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - Version.writeVersion(minVersion, out); + out.writeBoolean(minCompatibleShardNode != null); + if (minCompatibleShardNode != null) { + Version.writeVersion(minCompatibleShardNode, out); + } } } @@ -313,6 +310,12 @@ public ActionRequestValidationException validate() { } } } + if (minCompatibleShardNode() != null) { + if (isCcsMinimizeRoundtrips()) { + validationException = addValidationError("[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible " + + "shard version", validationException); + } + } return validationException; } @@ -351,9 +354,19 @@ long getAbsoluteStartMillis() { return absoluteStartMillis; } - Version minVersion() { - return minVersion; + /** + * Returns the minimum compatible shard version the search request needs to run on. If the version is null, then there are no + * restrictions imposed on shards versions part of this search. + */ + @Nullable + public Version minCompatibleShardNode() { + return minCompatibleShardNode; + } + + public void setMinCompatibleShardNode(Version minCompatibleShardNode) { + this.minCompatibleShardNode = minCompatibleShardNode; } + /** * Sets the indices the search will be executed on. */ @@ -692,14 +705,14 @@ public boolean equals(Object o) { Objects.equals(localClusterAlias, that.localClusterAlias) && absoluteStartMillis == that.absoluteStartMillis && ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips && - Objects.equals(minVersion, that.minVersion); + Objects.equals(minCompatibleShardNode, that.minCompatibleShardNode); } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, scroll, indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, - allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, minVersion); + allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, minCompatibleShardNode); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 4511a9a145b73..3fc620fd17dc2 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.rest.action.search; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchContextId; @@ -172,6 +173,10 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r searchRequest.preference(request.param("preference")); searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions())); + if (request.hasParam("min_compatible_shard_node")) { + searchRequest.setMinCompatibleShardNode(Version.fromString(request.param("min_compatible_shard_node"))); + } + checkRestTotalHits(request, searchRequest); if (searchRequest.pointInTimeBuilder() != null) { diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index bb2d70c5dc149..95f654f6e6a6f 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -342,6 +342,9 @@ private static MultiSearchRequest createMultiSearchRequest() { msearchDefault.ignoreThrottled() )); + // min_compatible_shard_node is unsupported in msearch api, so unset it + searchRequest.setMinCompatibleShardNode(null); + request.add(searchRequest); } return request; diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 397df2c83a077..d396558508336 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -254,14 +254,14 @@ private void testMixedVersionsShardsSearch(Version oldVersion, Version newVersio searchRequest.setBatchedReduceSize(2); searchRequest.source(new SearchSourceBuilder().size(1)); searchRequest.allowPartialSearchResults(false); - final SearchRequest withMinVersionSearchRequest = SearchRequest.withMinimumVersion(searchRequest, minVersion); + searchRequest.setMinCompatibleShardNode(minVersion); Executor executor = EsExecutors.newDirectExecutorService(); SearchTransportService searchTransportService = new SearchTransportService(null, null, null); SearchPhaseController controller = new SearchPhaseController( writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); - QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(withMinVersionSearchRequest, executor, + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(searchRequest, executor, new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), shardsIter.size(), exc -> {}); final List responses = new ArrayList<>(); @@ -269,7 +269,7 @@ private void testMixedVersionsShardsSearch(Version oldVersion, Version newVersio searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), Collections.emptyMap(), controller, executor, - resultConsumer, withMinVersionSearchRequest, new ActionListener() { + resultConsumer, searchRequest, new ActionListener() { @Override public void onFailure(Exception e) { responses.add(e); @@ -323,7 +323,7 @@ public void testMinimumVersionSameAsOldVersion() throws Exception { searchRequest.source(new SearchSourceBuilder() .size(1) .sort(SortBuilders.fieldSort("timestamp"))); - final SearchRequest withMinVersionSearchRequest = SearchRequest.withMinimumVersion(searchRequest, minVersion); + searchRequest.setMinCompatibleShardNode(minVersion); Executor executor = EsExecutors.newDirectExecutorService(); SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { @@ -358,7 +358,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest SearchPhaseController controller = new SearchPhaseController( writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); - QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(withMinVersionSearchRequest, executor, + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(searchRequest, executor, new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), shardsIter.size(), exc -> {}); CountDownLatch latch = new CountDownLatch(1); @@ -366,7 +366,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), Collections.emptyMap(), controller, executor, - resultConsumer, withMinVersionSearchRequest, null, shardsIter, timeProvider, null, + resultConsumer, searchRequest, null, shardsIter, timeProvider, null, task, SearchResponse.Clusters.EMPTY) { @Override protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { @@ -425,7 +425,7 @@ public void testMinimumVersionShardDuringPhaseExecution() throws Exception { searchRequest.source(new SearchSourceBuilder() .size(1) .sort(SortBuilders.fieldSort("timestamp"))); - final SearchRequest withMinVersionSearchRequest = SearchRequest.withMinimumVersion(searchRequest, minVersion); + searchRequest.setMinCompatibleShardNode(minVersion); Executor executor = EsExecutors.newDirectExecutorService(); SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { @@ -460,7 +460,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest SearchPhaseController controller = new SearchPhaseController( writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); - QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(withMinVersionSearchRequest, executor, + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(searchRequest, executor, new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), shardsIter.size(), exc -> {}); CountDownLatch latch = new CountDownLatch(1); @@ -468,7 +468,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), Collections.emptyMap(), controller, executor, - resultConsumer, withMinVersionSearchRequest, null, shardsIter, timeProvider, null, + resultConsumer, searchRequest, null, shardsIter, timeProvider, null, task, SearchResponse.Clusters.EMPTY) { @Override protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index e03e00eb13840..04b1af990f61a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -181,6 +181,19 @@ public void testValidate() throws IOException { assertEquals(1, validationErrors.validationErrors().size()); assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0)); } + { + // Minimum compatible shard node version with ccs_minimize_roundtrips + SearchRequest searchRequest = new SearchRequest(); + searchRequest.setMinCompatibleShardNode(VersionUtils.randomVersion(random())); + if (randomBoolean()) { + searchRequest.setCcsMinimizeRoundtrips(true); + } + ActionRequestValidationException validationErrors = searchRequest.validate(); + assertNotNull(validationErrors); + assertEquals(1, validationErrors.validationErrors().size()); + assertEquals("[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version", + validationErrors.validationErrors().get(0)); + } } public void testCopyConstructor() throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java index 67aaedf7d818f..dda42d399bb09 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java @@ -49,6 +49,8 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.test.AbstractQueryTestCase; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.util.ArrayList; @@ -60,6 +62,7 @@ import static org.elasticsearch.test.ESTestCase.between; import static org.elasticsearch.test.ESTestCase.generateRandomStringArray; import static org.elasticsearch.test.ESTestCase.mockScript; +import static org.elasticsearch.test.ESTestCase.random; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomByte; @@ -115,6 +118,10 @@ public static SearchRequest randomSearchRequest(Supplier ra if (randomBoolean()) { searchRequest.source(randomSearchSourceBuilder.get()); } + if (randomBoolean()) { + searchRequest.setMinCompatibleShardNode(VersionUtils.randomVersion(random())); + searchRequest.setCcsMinimizeRoundtrips(false); + } return searchRequest; } From 7c6ab18b46496268a40562f729fa25f970964c94 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Mon, 4 Jan 2021 18:16:26 +0200 Subject: [PATCH 05/10] Small fixes --- .../java/org/elasticsearch/client/RequestConvertersTests.java | 2 ++ .../src/test/java/org/elasticsearch/backwards/IndexingIT.java | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index c0b6b93c4544f..9ed0ef45d1a6b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -1191,6 +1191,8 @@ public void testMultiSearch() throws IOException { }); // scroll is not supported in the current msearch api, so unset it: searchRequest.scroll((Scroll) null); + // min_compatible_shard_node is unsupported in msearch api, so unset it + searchRequest.setMinCompatibleShardNode(null); // only expand_wildcards, ignore_unavailable and allow_no_indices can be // specified from msearch api, so unset other options: IndicesOptions randomlyGenerated = searchRequest.indicesOptions(); diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 59021ce0031fc..8705154c03308 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -420,7 +420,7 @@ static Nodes buildNodeAndVersions(RestClient client) throws IOException { return nodes; } - final static class Nodes extends HashMap { + static final class Nodes extends HashMap { private String masterNodeId = null; @@ -473,7 +473,7 @@ public String toString() { } } - final static class Node { + static final class Node { private final String id; private final String nodeName; private final Version version; From e677515ac8434089adb584b68a078d695be7e3c3 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 5 Jan 2021 17:31:49 +0200 Subject: [PATCH 06/10] Address reviews --- .../client/RequestConvertersTests.java | 2 -- .../action/search/SearchRequest.java | 14 +++++----- .../rest/action/search/RestSearchAction.java | 11 ++++---- .../search/MultiSearchRequestTests.java | 3 --- .../SearchQueryThenFetchAsyncActionTests.java | 9 +++---- .../action/search/SearchRequestTests.java | 26 ++++++++++++++----- .../search/RandomSearchRequestGenerator.java | 7 ----- 7 files changed, 36 insertions(+), 36 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 9ed0ef45d1a6b..c0b6b93c4544f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -1191,8 +1191,6 @@ public void testMultiSearch() throws IOException { }); // scroll is not supported in the current msearch api, so unset it: searchRequest.scroll((Scroll) null); - // min_compatible_shard_node is unsupported in msearch api, so unset it - searchRequest.setMinCompatibleShardNode(null); // only expand_wildcards, ignore_unavailable and allow_no_indices can be // specified from msearch api, so unset other options: IndicesOptions randomlyGenerated = searchRequest.indicesOptions(); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index d0b937d46044b..0f47074cc7362 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -95,7 +95,7 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private Integer preFilterShardSize; - private boolean ccsMinimizeRoundtrips = true; + private Boolean ccsMinimizeRoundtrips; @Nullable private Version minCompatibleShardNode; @@ -106,9 +106,15 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; public SearchRequest() { + this((Version) null); + } + + public SearchRequest(Version minCompatibleShardNode) { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; this.finalReduce = true; + this.minCompatibleShardNode = minCompatibleShardNode; + this.ccsMinimizeRoundtrips = minCompatibleShardNode == null; } /** @@ -363,10 +369,6 @@ public Version minCompatibleShardNode() { return minCompatibleShardNode; } - public void setMinCompatibleShardNode(Version minCompatibleShardNode) { - this.minCompatibleShardNode = minCompatibleShardNode; - } - /** * Sets the indices the search will be executed on. */ @@ -401,7 +403,7 @@ public boolean includeDataStreams() { /** * Returns whether network round-trips should be minimized when executing cross-cluster search requests. - * Defaults to true. + * Defaults to true, unless minCompatibleShardNode is set in which case it's set to false. */ public boolean isCcsMinimizeRoundtrips() { return ccsMinimizeRoundtrips; diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 3fc620fd17dc2..28ea76dd50231 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -90,7 +90,12 @@ public List routes() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - SearchRequest searchRequest = new SearchRequest(); + SearchRequest searchRequest; + if (request.hasParam("min_compatible_shard_node")) { + searchRequest = new SearchRequest(Version.fromString(request.param("min_compatible_shard_node"))); + } else { + searchRequest = new SearchRequest(); + } /* * We have to pull out the call to `source().size(size)` because * _update_by_query and _delete_by_query uses this same parsing @@ -173,10 +178,6 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r searchRequest.preference(request.param("preference")); searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions())); - if (request.hasParam("min_compatible_shard_node")) { - searchRequest.setMinCompatibleShardNode(Version.fromString(request.param("min_compatible_shard_node"))); - } - checkRestTotalHits(request, searchRequest); if (searchRequest.pointInTimeBuilder() != null) { diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index 95f654f6e6a6f..bb2d70c5dc149 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -342,9 +342,6 @@ private static MultiSearchRequest createMultiSearchRequest() { msearchDefault.ignoreThrottled() )); - // min_compatible_shard_node is unsupported in msearch api, so unset it - searchRequest.setMinCompatibleShardNode(null); - request.add(searchRequest); } return request; diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index d396558508336..0952d132d18cf 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -249,12 +249,11 @@ private void testMixedVersionsShardsSearch(Version oldVersion, Version newVersio list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingOldVersionShard), idx)); GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); - final SearchRequest searchRequest = new SearchRequest(); + final SearchRequest searchRequest = new SearchRequest(minVersion); searchRequest.setMaxConcurrentShardRequests(numConcurrent); searchRequest.setBatchedReduceSize(2); searchRequest.source(new SearchSourceBuilder().size(1)); searchRequest.allowPartialSearchResults(false); - searchRequest.setMinCompatibleShardNode(minVersion); Executor executor = EsExecutors.newDirectExecutorService(); SearchTransportService searchTransportService = new SearchTransportService(null, null, null); @@ -318,12 +317,11 @@ public void testMinimumVersionSameAsOldVersion() throws Exception { list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingOldVersionShard), idx)); GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); - final SearchRequest searchRequest = new SearchRequest(); + final SearchRequest searchRequest = new SearchRequest(minVersion); searchRequest.allowPartialSearchResults(false); searchRequest.source(new SearchSourceBuilder() .size(1) .sort(SortBuilders.fieldSort("timestamp"))); - searchRequest.setMinCompatibleShardNode(minVersion); Executor executor = EsExecutors.newDirectExecutorService(); SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { @@ -420,12 +418,11 @@ public void testMinimumVersionShardDuringPhaseExecution() throws Exception { list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingNewVersionShard2), idx)); GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); - final SearchRequest searchRequest = new SearchRequest(); + final SearchRequest searchRequest = new SearchRequest(minVersion); searchRequest.allowPartialSearchResults(false); searchRequest.source(new SearchSourceBuilder() .size(1) .sort(SortBuilders.fieldSort("timestamp"))); - searchRequest.setMinCompatibleShardNode(minVersion); Executor executor = EsExecutors.newDirectExecutorService(); SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index 04b1af990f61a..b6848bf111bc4 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -183,16 +183,28 @@ public void testValidate() throws IOException { } { // Minimum compatible shard node version with ccs_minimize_roundtrips - SearchRequest searchRequest = new SearchRequest(); - searchRequest.setMinCompatibleShardNode(VersionUtils.randomVersion(random())); - if (randomBoolean()) { + SearchRequest searchRequest; + boolean isMinCompatibleShardVersion = randomBoolean(); + if (isMinCompatibleShardVersion) { + searchRequest = new SearchRequest(VersionUtils.randomVersion(random())); + } else { + searchRequest = new SearchRequest(); + } + + boolean shouldSetCcsMinimizeRoundtrips = randomBoolean(); + if (shouldSetCcsMinimizeRoundtrips) { searchRequest.setCcsMinimizeRoundtrips(true); } ActionRequestValidationException validationErrors = searchRequest.validate(); - assertNotNull(validationErrors); - assertEquals(1, validationErrors.validationErrors().size()); - assertEquals("[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version", - validationErrors.validationErrors().get(0)); + + if (isMinCompatibleShardVersion && shouldSetCcsMinimizeRoundtrips) { + assertNotNull(validationErrors); + assertEquals(1, validationErrors.validationErrors().size()); + assertEquals("[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version", + validationErrors.validationErrors().get(0)); + } else { + assertNull(validationErrors); + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java index dda42d399bb09..67aaedf7d818f 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java @@ -49,8 +49,6 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.test.AbstractQueryTestCase; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.util.ArrayList; @@ -62,7 +60,6 @@ import static org.elasticsearch.test.ESTestCase.between; import static org.elasticsearch.test.ESTestCase.generateRandomStringArray; import static org.elasticsearch.test.ESTestCase.mockScript; -import static org.elasticsearch.test.ESTestCase.random; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomByte; @@ -118,10 +115,6 @@ public static SearchRequest randomSearchRequest(Supplier ra if (randomBoolean()) { searchRequest.source(randomSearchSourceBuilder.get()); } - if (randomBoolean()) { - searchRequest.setMinCompatibleShardNode(VersionUtils.randomVersion(random())); - searchRequest.setCcsMinimizeRoundtrips(false); - } return searchRequest; } From 9a9a919757489493fc98b87b89d1bdbfeb5072b0 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 12 Jan 2021 19:07:45 +0200 Subject: [PATCH 07/10] Differentiate between pre 7.12.0 mixed version cluster tests and post. --- .../SearchWithMinCompatibleSearchNodeIT.java | 49 +++++++++++++------ 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java index 054557cd90d1e..5a13746dce43c 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.rest.ESRestTestCase; @@ -100,15 +101,17 @@ public void testMinVersionAsOldVersion() throws Exception { "&ccs_minimize_roundtrips=false"); oldVersionRequest.setJsonEntity("{\"query\":{\"match_all\":{}},\"_source\":false}"); assertBusy(() -> { - Response response = client.performRequest(oldVersionRequest); - ObjectPath responseObject = ObjectPath.createFromResponse(response); - Map shardsResult = responseObject.evaluate("_shards"); - assertThat(shardsResult.get("total"), equalTo(numShards)); - assertThat(shardsResult.get("successful"), equalTo(numShards)); - assertThat(shardsResult.get("failed"), equalTo(0)); - Map hitsResult = responseObject.evaluate("hits.total"); - assertThat(hitsResult.get("value"), equalTo(numDocs)); - assertThat(hitsResult.get("relation"), equalTo("eq")); + assertWithBwcVersionCheck(() -> { + Response response = client.performRequest(oldVersionRequest); + ObjectPath responseObject = ObjectPath.createFromResponse(response); + Map shardsResult = responseObject.evaluate("_shards"); + assertThat(shardsResult.get("total"), equalTo(numShards)); + assertThat(shardsResult.get("successful"), equalTo(numShards)); + assertThat(shardsResult.get("failed"), equalTo(0)); + Map hitsResult = responseObject.evaluate("hits.total"); + assertThat(hitsResult.get("value"), equalTo(numDocs)); + assertThat(hitsResult.get("relation"), equalTo("eq")); + }, client, oldVersionRequest); }); } } @@ -122,14 +125,28 @@ public void testCcsMinimizeRoundtripsIsFalse() throws Exception { Request request = new Request("POST", index + "/_search?min_compatible_shard_node=" + version + (shouldSetCcsMinimizeRoundtrips ? "&ccs_minimize_roundtrips=true" : "")); assertBusy(() -> { - ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request)); - assertThat(responseException.getResponse().getStatusLine().getStatusCode(), - equalTo(RestStatus.BAD_REQUEST.getStatus())); - assertThat(responseException.getMessage(), - containsString("{\"error\":{\"root_cause\":[{\"type\":\"action_request_validation_exception\"")); - assertThat(responseException.getMessage(), containsString("\"reason\":\"Validation Failed: 1: " - + "[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version;\"")); + assertWithBwcVersionCheck(() -> { + ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request)); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), + equalTo(RestStatus.BAD_REQUEST.getStatus())); + assertThat(responseException.getMessage(), + containsString("{\"error\":{\"root_cause\":[{\"type\":\"action_request_validation_exception\"")); + assertThat(responseException.getMessage(), containsString("\"reason\":\"Validation Failed: 1: " + + "[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version;\"")); + }, client, request); }); } } + + private void assertWithBwcVersionCheck(CheckedRunnable code, RestClient client, Request request) throws Exception { + if (bwcVersion.before(Version.V_7_12_0)) { + // min_compatible_shard_node support doesn't exist in older versions and there will be an "unrecognized parameter" exception + ResponseException exception = expectThrows(ResponseException.class, () -> client.performRequest(request)); + assertThat(exception.getResponse().getStatusLine().getStatusCode(), + equalTo(RestStatus.BAD_REQUEST.getStatus())); + assertThat(exception.getMessage(), containsString("contains unrecognized parameter: [min_compatible_shard_node]")); + } else { + code.run(); + } + } } From b1acef2ca2f3c267959ae75df1e06f3952087b18 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 12 Jan 2021 19:15:20 +0200 Subject: [PATCH 08/10] Disable bwc tests. --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index c57e418a9bd4e..a10b4dcbb2742 100644 --- a/build.gradle +++ b/build.gradle @@ -175,8 +175,8 @@ tasks.register("verifyVersions") { * after the backport of the backcompat code is complete. */ -boolean bwc_tests_enabled = true -String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */ +boolean bwc_tests_enabled = false +String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/67373" /* place a PR link here when committing bwc changes */ /* * FIPS 140-2 behavior was fixed in 7.11.0. Before that there is no way to run elasticsearch in a * JVM that is properly configured to be in fips mode with BCFIPS. For now we need to disable From d51a352fd7cdad48f561b17ae6ab89bee88b9094 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 12 Jan 2021 19:39:13 +0200 Subject: [PATCH 09/10] Re-enable bwc tests --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index a10b4dcbb2742..c57e418a9bd4e 100644 --- a/build.gradle +++ b/build.gradle @@ -175,8 +175,8 @@ tasks.register("verifyVersions") { * after the backport of the backcompat code is complete. */ -boolean bwc_tests_enabled = false -String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/67373" /* place a PR link here when committing bwc changes */ +boolean bwc_tests_enabled = true +String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */ /* * FIPS 140-2 behavior was fixed in 7.11.0. Before that there is no way to run elasticsearch in a * JVM that is properly configured to be in fips mode with BCFIPS. For now we need to disable From 54365b78becf3f0431b4e6e723f4c65adb867469 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 12 Jan 2021 23:53:23 +0200 Subject: [PATCH 10/10] Just to please the bwc checks, since the code is not in 7.x yet, use 8.0.0 as the version to test against. --- .../backwards/SearchWithMinCompatibleSearchNodeIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java index 5a13746dce43c..ed94f4f187de7 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java @@ -139,7 +139,7 @@ public void testCcsMinimizeRoundtripsIsFalse() throws Exception { } private void assertWithBwcVersionCheck(CheckedRunnable code, RestClient client, Request request) throws Exception { - if (bwcVersion.before(Version.V_7_12_0)) { + if (bwcVersion.before(Version.V_8_0_0)) { // min_compatible_shard_node support doesn't exist in older versions and there will be an "unrecognized parameter" exception ResponseException exception = expectThrows(ResponseException.class, () -> client.performRequest(request)); assertThat(exception.getResponse().getStatusLine().getStatusCode(),