diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index e2f155e401f76..da54d5470be6a 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -344,7 +344,11 @@ private List buildShards(String index, Nodes nodes, RestClient client) th } private Nodes buildNodeAndVersions() throws IOException { - Response response = client().performRequest(new Request("GET", "_nodes")); + return buildNodeAndVersions(client()); + } + + static Nodes buildNodeAndVersions(RestClient client) throws IOException { + Response response = client.performRequest(new Request("GET", "_nodes")); ObjectPath objectPath = ObjectPath.createFromResponse(response); Map nodesAsMap = objectPath.evaluate("nodes"); Nodes nodes = new Nodes(); @@ -355,12 +359,12 @@ private Nodes buildNodeAndVersions() throws IOException { Version.fromString(objectPath.evaluate("nodes." + id + ".version")), HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address")))); } - response = client().performRequest(new Request("GET", "_cluster/state")); + response = client.performRequest(new Request("GET", "_cluster/state")); nodes.setMasterNodeId(ObjectPath.createFromResponse(response).evaluate("master_node")); return nodes; } - final class Nodes extends HashMap { + static final class Nodes extends HashMap { private String masterNodeId = null; @@ -413,7 +417,7 @@ public String toString() { } } - final class Node { + static final class Node { private final String id; private final String nodeName; private final Version version; diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java new file mode 100644 index 0000000000000..6b83a504a2850 --- /dev/null +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java @@ -0,0 +1,156 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.backwards; + +import org.apache.http.HttpHost; +import org.elasticsearch.Version; +import org.elasticsearch.backwards.IndexingIT.Node; +import org.elasticsearch.backwards.IndexingIT.Nodes; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class SearchWithMinCompatibleSearchNodeIT extends ESRestTestCase { + + private static String index = "test_min_version"; + private static int numShards; + private static int numReplicas = 1; + private static int numDocs; + private static Nodes nodes; + private static List bwcNodes; + private static List newNodes; + private static Version bwcVersion; + private static Version newVersion; + + @Before + public void prepareTestData() throws IOException { + nodes = IndexingIT.buildNodeAndVersions(client()); + numShards = nodes.size(); + numDocs = randomIntBetween(numShards, 16); + bwcNodes = new ArrayList<>(); + newNodes = new ArrayList<>(); + bwcNodes.addAll(nodes.getBWCNodes()); + newNodes.addAll(nodes.getNewNodes()); + bwcVersion = bwcNodes.get(0).getVersion(); + newVersion = newNodes.get(0).getVersion(); + + if (client().performRequest(new Request("HEAD", "/" + index)).getStatusLine().getStatusCode() == 404) { + createIndex(index, Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), numShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build()); + for (int i = 0; i < numDocs; i++) { + Request request = new Request("PUT", index + "/_doc/" + i); + request.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(2) + "\"}"); + assertOK(client().performRequest(request)); + } + ensureGreen(index); + } + } + + public void testMinVersionAsNewVersion() throws Exception { + Request newVersionRequest = new Request("POST", + index + "/_search?min_compatible_shard_node=" + newVersion + "&ccs_minimize_roundtrips=false"); + assertWithBwcVersionCheck((client) -> { + ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(newVersionRequest)); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), + equalTo(RestStatus.INTERNAL_SERVER_ERROR.getStatus())); + assertThat(responseException.getMessage(), + containsString("{\"error\":{\"root_cause\":[],\"type\":\"search_phase_execution_exception\"")); + assertThat(responseException.getMessage(), containsString("caused_by\":{\"type\":\"version_mismatch_exception\"," + + "\"reason\":\"One of the shards is incompatible with the required minimum version [" + newVersion + "]\"")); + }, newVersionRequest); + } + + public void testMinVersionAsOldVersion() throws Exception { + Request oldVersionRequest = new Request("POST", index + "/_search?min_compatible_shard_node=" + bwcVersion + + "&ccs_minimize_roundtrips=false"); + oldVersionRequest.setJsonEntity("{\"query\":{\"match_all\":{}},\"_source\":false}"); + assertWithBwcVersionCheck((client) -> { + Response response = client.performRequest(oldVersionRequest); + ObjectPath responseObject = ObjectPath.createFromResponse(response); + Map shardsResult = responseObject.evaluate("_shards"); + assertThat(shardsResult.get("total"), equalTo(numShards)); + assertThat(shardsResult.get("successful"), equalTo(numShards)); + assertThat(shardsResult.get("failed"), equalTo(0)); + Map hitsResult = responseObject.evaluate("hits.total"); + assertThat(hitsResult.get("value"), equalTo(numDocs)); + assertThat(hitsResult.get("relation"), equalTo("eq")); + }, oldVersionRequest); + } + + public void testCcsMinimizeRoundtripsIsFalse() throws Exception { + Version version = randomBoolean() ? newVersion : bwcVersion; + + Request request = new Request("POST", index + "/_search?min_compatible_shard_node=" + version + "&ccs_minimize_roundtrips=true"); + assertWithBwcVersionCheck((client) -> { + ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request)); + assertThat(responseException.getResponse().getStatusLine().getStatusCode(), + equalTo(RestStatus.BAD_REQUEST.getStatus())); + assertThat(responseException.getMessage(), + containsString("{\"error\":{\"root_cause\":[{\"type\":\"action_request_validation_exception\"")); + assertThat(responseException.getMessage(), containsString("\"reason\":\"Validation Failed: 1: " + + "[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version;\"")); + }, request); + } + + private void assertWithBwcVersionCheck(CheckedConsumer code, Request request) throws Exception { + try (RestClient client = buildClient(restClientSettings(), + newNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + assertBusy(() -> { + code.accept(client); + }); + } + try (RestClient client = buildClient(restClientSettings(), + bwcNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + if (bwcVersion.before(Version.V_7_12_0)) { + ResponseException exception = expectThrows(ResponseException.class, () -> client.performRequest(request)); + assertThat(exception.getResponse().getStatusLine().getStatusCode(), + equalTo(RestStatus.BAD_REQUEST.getStatus())); + if (bwcVersion.onOrAfter(Version.V_7_0_0)) { + // min_compatible_shard_node support doesn't exist in older versions and there will be an "unrecognized parameter" + // exception + assertThat(exception.getMessage(), containsString("contains unrecognized parameter: [min_compatible_shard_node]")); + } else { + // ccs_minimize_roundtrips support doesn't exist in 6.x versions and there will be an "unrecognized parameter" exception + assertThat(exception.getMessage(), containsString("contains unrecognized parameters: [ccs_minimize_roundtrips]")); + } + } else { + assertBusy(() -> { + code.accept(client); + }); + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index 4db4f0e3b7c8d..d127b188c7736 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -248,6 +248,10 @@ "type":"boolean", "description":"Indicates whether hits.total should be rendered as an integer or an object in the rest search response", "default":false + }, + "min_compatible_shard_node":{ + "type":"string", + "description":"The minimum compatible version that all shards involved in search should have for this request to be successful" } }, "body":{ diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index cf5650e253d0d..9170a5d03ab68 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1047,7 +1047,12 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.transport.NoSeedNodeLeftException.class, org.elasticsearch.transport.NoSeedNodeLeftException::new, 160, - Version.V_7_10_0); + Version.V_7_10_0), + VERSION_MISMATCH_EXCEPTION( + org.elasticsearch.action.search.VersionMismatchException.class, + org.elasticsearch.action.search.VersionMismatchException::new, + 161, + Version.V_7_12_0); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 79e6375302a1b..c7357268b87d1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -222,7 +222,13 @@ public final void run() { throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY); } } - + Version version = request.minCompatibleShardNode(); + if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) { + if (checkMinimumVersion(shardsIts) == false) { + throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", + request.minCompatibleShardNode()); + } + } for (int i = 0; i < shardsIts.size(); i++) { final SearchShardIterator shardRoutings = shardsIts.get(i); assert shardRoutings.skip() == false; @@ -240,6 +246,22 @@ void skipShard(SearchShardIterator iterator) { successfulShardExecution(iterator); } + + private boolean checkMinimumVersion(GroupShardsIterator shardsIts) { + for (SearchShardIterator it : shardsIts) { + if (it.getTargetNodeIds().isEmpty() == false) { + boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> { + Transport.Connection conn = getConnection(it.getClusterAlias(), nodeId); + return conn == null ? true : conn.getVersion().onOrAfter(request.minCompatibleShardNode()); + }); + if (isCompatible == false) { + return false; + } + } + } + return true; + } + protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { /* * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the @@ -660,7 +682,12 @@ final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() @Override public final Transport.Connection getConnection(String clusterAlias, String nodeId) { - return nodeIdToConnection.apply(clusterAlias, nodeId); + Transport.Connection conn = nodeIdToConnection.apply(clusterAlias, nodeId); + Version minVersion = request.minCompatibleShardNode(); + if (minVersion != null && conn != null && conn.getVersion().before(minVersion)) { + throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion); + } + return conn; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 87117b14c2bee..fd6874e233f50 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -96,8 +96,10 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private Integer preFilterShardSize; private String[] types = Strings.EMPTY_ARRAY; + private Boolean ccsMinimizeRoundtrips; - private boolean ccsMinimizeRoundtrips = true; + @Nullable + private Version minCompatibleShardNode; public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled(); @@ -105,9 +107,15 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; public SearchRequest() { + this((Version) null); + } + + public SearchRequest(Version minCompatibleShardNode) { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; this.finalReduce = true; + this.minCompatibleShardNode = minCompatibleShardNode; + this.ccsMinimizeRoundtrips = minCompatibleShardNode == null; } /** @@ -185,6 +193,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca this.localClusterAlias = localClusterAlias; this.absoluteStartMillis = absoluteStartMillis; this.finalReduce = finalReduce; + this.minCompatibleShardNode = searchRequest.minCompatibleShardNode; } /** @@ -230,6 +239,13 @@ public SearchRequest(StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.V_7_0_0)) { ccsMinimizeRoundtrips = in.readBoolean(); + } else { + ccsMinimizeRoundtrips = true; + } + if (in.getVersion().onOrAfter(Version.V_7_12_0)) { + if (in.readBoolean()) { + minCompatibleShardNode = Version.readVersion(in); + } } } @@ -265,6 +281,12 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_0_0)) { out.writeBoolean(ccsMinimizeRoundtrips); } + if (out.getVersion().onOrAfter(Version.V_7_12_0)) { + out.writeBoolean(minCompatibleShardNode != null); + if (minCompatibleShardNode != null) { + Version.writeVersion(minCompatibleShardNode, out); + } + } } @Override @@ -312,6 +334,12 @@ public ActionRequestValidationException validate() { } } } + if (minCompatibleShardNode() != null) { + if (isCcsMinimizeRoundtrips()) { + validationException = addValidationError("[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible " + + "shard version", validationException); + } + } return validationException; } @@ -350,6 +378,15 @@ long getAbsoluteStartMillis() { return absoluteStartMillis; } + /** + * Returns the minimum compatible shard version the search request needs to run on. If the version is null, then there are no + * restrictions imposed on shards versions part of this search. + */ + @Nullable + public Version minCompatibleShardNode() { + return minCompatibleShardNode; + } + /** * Sets the indices the search will be executed on. */ @@ -384,7 +421,7 @@ public boolean includeDataStreams() { /** * Returns whether network round-trips should be minimized when executing cross-cluster search requests. - * Defaults to true. + * Defaults to true, unless minCompatibleShardNode is set in which case it's set to false. */ public boolean isCcsMinimizeRoundtrips() { return ccsMinimizeRoundtrips; @@ -720,14 +757,15 @@ public boolean equals(Object o) { Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) && Objects.equals(localClusterAlias, that.localClusterAlias) && absoluteStartMillis == that.absoluteStartMillis && - ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips; + ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips && + Objects.equals(minCompatibleShardNode, that.minCompatibleShardNode); } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, - allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips); + allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, minCompatibleShardNode); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java b/server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java new file mode 100644 index 0000000000000..5527ea28c37ff --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public class VersionMismatchException extends ElasticsearchException { + + public VersionMismatchException(String msg, Object... args) { + super(msg, args); + } + + public VersionMismatchException(StreamInput in) throws IOException { + super(in); + } + +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 760c5d3744ba0..9c70c0908eec7 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.rest.action.search; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchContextId; @@ -99,7 +100,12 @@ public List routes() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - SearchRequest searchRequest = new SearchRequest(); + SearchRequest searchRequest; + if (request.hasParam("min_compatible_shard_node")) { + searchRequest = new SearchRequest(Version.fromString(request.param("min_compatible_shard_node"))); + } else { + searchRequest = new SearchRequest(); + } /* * We have to pull out the call to `source().size(size)` because * _update_by_query and _delete_by_query uses this same parsing diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index f31fbd0a25f2b..9858b5f835bbf 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -29,12 +29,14 @@ import org.elasticsearch.action.TimestampParsingException; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.search.VersionMismatchException; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.client.AbstractClientHeadersTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; import org.elasticsearch.cluster.coordination.NoMasterBlockService; +import org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IllegalShardRoutingStateException; import org.elasticsearch.cluster.routing.ShardRouting; @@ -72,7 +74,6 @@ import org.elasticsearch.indices.recovery.PeerRecoveryNotFound; import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException; import org.elasticsearch.ingest.IngestProcessorException; -import org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException; @@ -832,6 +833,7 @@ public void testIds() { ids.put(158, PeerRecoveryNotFound.class); ids.put(159, NodeHealthCheckFailureException.class); ids.put(160, NoSeedNodeLeftException.class); + ids.put(161, VersionMismatchException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 525ebb494e08b..207be8cdffd7b 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -25,15 +25,20 @@ import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.grouping.CollapseTopFieldDocs; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchPhaseResult; @@ -47,9 +52,12 @@ import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.transport.Transport; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -57,6 +65,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static java.util.Collections.singletonList; +import static org.elasticsearch.test.VersionUtils.allVersions; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -79,8 +89,8 @@ private void testCase(boolean withScroll, boolean withCollapse) throws Exception new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); Map lookup = new ConcurrentHashMap<>(); - DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); - DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode primaryNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode replicaNode = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT); lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); @@ -198,4 +208,303 @@ public void run() { assertThat(((FieldDoc) phase.sortedTopDocs.scoreDocs[0]).fields.length, equalTo(1)); assertThat(((FieldDoc) phase.sortedTopDocs.scoreDocs[0]).fields[0], equalTo(0)); } + + public void testMinimumVersionSameAsNewVersion() throws Exception { + Version newVersion = Version.CURRENT; + Version oldVersion = randomPreviousCompatibleVersion(newVersion); + testMixedVersionsShardsSearch(newVersion, oldVersion, newVersion); + } + + public void testMinimumVersionBetweenNewAndOldVersion() throws Exception { + Version oldVersion = VersionUtils.getFirstVersion(); + Version newVersion = VersionUtils.maxCompatibleVersion(oldVersion); + Version minVersion = VersionUtils.randomVersionBetween(random(), + allVersions().get(allVersions().indexOf(oldVersion) + 1), newVersion); + testMixedVersionsShardsSearch(newVersion, oldVersion, minVersion); + } + + private void testMixedVersionsShardsSearch(Version oldVersion, Version newVersion, Version minVersion) throws Exception { + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + int numConcurrent = randomIntBetween(1, 4); + + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode newVersionNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), newVersion); + DiscoveryNode oldVersionNode = new DiscoveryNode("node2", buildNewFakeTransportAddress(), oldVersion); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(newVersionNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(oldVersionNode)); + + OriginalIndices idx = new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS); + ArrayList list = new ArrayList<>(); + ShardRouting routingNewVersionShard = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 0), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingNewVersionShard = routingNewVersionShard.initialize(newVersionNode.getId(), "p0", 0); + routingNewVersionShard.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 0), singletonList(routingNewVersionShard), idx)); + + ShardRouting routingOldVersionShard = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 1), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p1", 0); + routingOldVersionShard.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingOldVersionShard), idx)); + + GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); + final SearchRequest searchRequest = new SearchRequest(minVersion); + searchRequest.setMaxConcurrentShardRequests(numConcurrent); + searchRequest.setBatchedReduceSize(2); + searchRequest.source(new SearchSourceBuilder().size(1)); + searchRequest.allowPartialSearchResults(false); + + Executor executor = EsExecutors.newDirectExecutorService(); + SearchTransportService searchTransportService = new SearchTransportService(null, null, null); + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(searchRequest, executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), + shardsIter.size(), exc -> {}); + final List responses = new ArrayList<>(); + SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, + searchTransportService, (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), controller, executor, + resultConsumer, searchRequest, new ActionListener() { + @Override + public void onFailure(Exception e) { + responses.add(e); + } + public void onResponse(SearchResponse response) { + responses.add(response); + }; + }, shardsIter, timeProvider, null, task, SearchResponse.Clusters.EMPTY); + + newSearchAsyncAction.start(); + assertEquals(1, responses.size()); + assertTrue(responses.get(0) instanceof SearchPhaseExecutionException); + SearchPhaseExecutionException e = (SearchPhaseExecutionException) responses.get(0); + assertTrue(e.getCause() instanceof VersionMismatchException); + assertThat(e.getCause().getMessage(), + equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")); + } + + public void testMinimumVersionSameAsOldVersion() throws Exception { + Version newVersion = Version.CURRENT; + Version oldVersion = randomPreviousCompatibleVersion(newVersion); + Version minVersion = oldVersion; + + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + AtomicInteger successfulOps = new AtomicInteger(); + + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode newVersionNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), newVersion); + DiscoveryNode oldVersionNode = new DiscoveryNode("node2", buildNewFakeTransportAddress(), oldVersion); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(newVersionNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(oldVersionNode)); + + OriginalIndices idx = new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS); + ArrayList list = new ArrayList<>(); + ShardRouting routingNewVersionShard = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 0), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingNewVersionShard = routingNewVersionShard.initialize(newVersionNode.getId(), "p0", 0); + routingNewVersionShard.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 0), singletonList(routingNewVersionShard), idx)); + + ShardRouting routingOldVersionShard = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 1), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p1", 0); + routingOldVersionShard.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingOldVersionShard), idx)); + + GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); + final SearchRequest searchRequest = new SearchRequest(minVersion); + searchRequest.allowPartialSearchResults(false); + searchRequest.source(new SearchSourceBuilder() + .size(1) + .sort(SortBuilders.fieldSort("timestamp"))); + + Executor executor = EsExecutors.newDirectExecutorService(); + SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { + @Override + public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest request, + SearchTask task, SearchActionListener listener) { + int shardId = request.shardId().id(); + QuerySearchResult queryResult = new QuerySearchResult(new ShardSearchContextId("N/A", 123), + new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null, OriginalIndices.NONE), null); + SortField sortField = new SortField("timestamp", SortField.Type.LONG); + if (shardId == 0) { + queryResult.topDocs(new TopDocsAndMaxScore(new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[]{ + new FieldDoc(randomInt(1000), Float.NaN, new Object[]{shardId}) + }, new SortField[]{sortField}), Float.NaN), + new DocValueFormat[]{DocValueFormat.RAW}); + } else if (shardId == 1) { + queryResult.topDocs(new TopDocsAndMaxScore(new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[]{ + new FieldDoc(randomInt(1000), Float.NaN, new Object[]{shardId}) + }, new SortField[]{sortField}), Float.NaN), + new DocValueFormat[]{DocValueFormat.RAW}); + } + queryResult.from(0); + queryResult.size(1); + successfulOps.incrementAndGet(); + new Thread(() -> listener.onResponse(queryResult)).start(); + } + }; + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(searchRequest, executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), + shardsIter.size(), exc -> {}); + CountDownLatch latch = new CountDownLatch(1); + SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, + searchTransportService, (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), controller, executor, + resultConsumer, searchRequest, null, shardsIter, timeProvider, null, + task, SearchResponse.Clusters.EMPTY) { + @Override + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new SearchPhase("test") { + @Override + public void run() { + latch.countDown(); + } + }; + } + }; + + action.start(); + latch.await(); + assertThat(successfulOps.get(), equalTo(2)); + SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); + assertThat(phase.numReducePhases, greaterThanOrEqualTo(1)); + assertThat(phase.totalHits.value, equalTo(2L)); + assertThat(phase.totalHits.relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)); + } + + public void testMinimumVersionShardDuringPhaseExecution() throws Exception { + Version newVersion = Version.CURRENT; + Version oldVersion = randomPreviousCompatibleVersion(newVersion); + Version minVersion = newVersion; + + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + AtomicInteger successfulOps = new AtomicInteger(); + + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode newVersionNode1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), newVersion); + DiscoveryNode newVersionNode2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), newVersion); + DiscoveryNode oldVersionNode = new DiscoveryNode("node3", buildNewFakeTransportAddress(), oldVersion); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(newVersionNode1)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(newVersionNode2)); + lookup.put("node3", new SearchAsyncActionTests.MockConnection(oldVersionNode)); + + OriginalIndices idx = new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS); + ArrayList list = new ArrayList<>(); + ShardRouting routingNewVersionShard1 = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 0), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingNewVersionShard1 = routingNewVersionShard1.initialize(newVersionNode1.getId(), "p0", 0); + routingNewVersionShard1.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 0), singletonList(routingNewVersionShard1), idx)); + + ShardRouting routingNewVersionShard2 = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 1), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routingNewVersionShard2 = routingNewVersionShard2.initialize(newVersionNode2.getId(), "p1", 0); + routingNewVersionShard2.started(); + list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingNewVersionShard2), idx)); + + GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); + final SearchRequest searchRequest = new SearchRequest(minVersion); + searchRequest.allowPartialSearchResults(false); + searchRequest.source(new SearchSourceBuilder() + .size(1) + .sort(SortBuilders.fieldSort("timestamp"))); + + Executor executor = EsExecutors.newDirectExecutorService(); + SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { + @Override + public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest request, + SearchTask task, SearchActionListener listener) { + int shardId = request.shardId().id(); + QuerySearchResult queryResult = new QuerySearchResult(new ShardSearchContextId("N/A", 123), + new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null, OriginalIndices.NONE), null); + SortField sortField = new SortField("timestamp", SortField.Type.LONG); + if (shardId == 0) { + queryResult.topDocs(new TopDocsAndMaxScore(new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[]{ + new FieldDoc(randomInt(1000), Float.NaN, new Object[]{shardId}) + }, new SortField[]{sortField}), Float.NaN), + new DocValueFormat[]{DocValueFormat.RAW}); + } else if (shardId == 1) { + queryResult.topDocs(new TopDocsAndMaxScore(new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[]{ + new FieldDoc(randomInt(1000), Float.NaN, new Object[]{shardId}) + }, new SortField[]{sortField}), Float.NaN), + new DocValueFormat[]{DocValueFormat.RAW}); + } + queryResult.from(0); + queryResult.size(1); + successfulOps.incrementAndGet(); + new Thread(() -> listener.onResponse(queryResult)).start(); + } + }; + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(searchRequest, executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), controller, task.getProgressListener(), writableRegistry(), + shardsIter.size(), exc -> {}); + CountDownLatch latch = new CountDownLatch(1); + SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, + searchTransportService, (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), controller, executor, + resultConsumer, searchRequest, null, shardsIter, timeProvider, null, + task, SearchResponse.Clusters.EMPTY) { + @Override + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new SearchPhase("test") { + @Override + public void run() { + latch.countDown(); + } + }; + } + }; + ShardRouting routingOldVersionShard = ShardRouting.newUnassigned(new ShardId(new Index("idx", "_na_"), 2), true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + SearchShardIterator shardIt = new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 2), + singletonList(routingOldVersionShard), idx); + routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p2", 0); + routingOldVersionShard.started(); + action.start(); + latch.await(); + assertThat(successfulOps.get(), equalTo(2)); + SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); + assertThat(phase.numReducePhases, greaterThanOrEqualTo(1)); + assertThat(phase.totalHits.value, equalTo(2L)); + assertThat(phase.totalHits.relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)); + + SearchShardTarget searchShardTarget = new SearchShardTarget("node3", shardIt.shardId(), null, OriginalIndices.NONE); + SearchActionListener listener = new SearchActionListener(searchShardTarget, 0) { + @Override + public void onFailure(Exception e) { } + + @Override + protected void innerOnResponse(SearchPhaseResult response) { } + }; + Exception e = expectThrows(VersionMismatchException.class, () -> action.executePhaseOnShard(shardIt, searchShardTarget, listener)); + assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")); + } + + private Version randomPreviousCompatibleVersion(Version version) { + return VersionUtils.randomVersionBetween(random(), version.minimumIndexCompatibilityVersion(), + VersionUtils.getPreviousVersion(version)); + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index 5b875e8c0a13a..e05cc7412b537 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -222,6 +222,31 @@ public void testValidate() throws IOException { assertEquals(1, validationErrors.validationErrors().size()); assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0)); } + { + // Minimum compatible shard node version with ccs_minimize_roundtrips + SearchRequest searchRequest; + boolean isMinCompatibleShardVersion = randomBoolean(); + if (isMinCompatibleShardVersion) { + searchRequest = new SearchRequest(VersionUtils.randomVersion(random())); + } else { + searchRequest = new SearchRequest(); + } + + boolean shouldSetCcsMinimizeRoundtrips = randomBoolean(); + if (shouldSetCcsMinimizeRoundtrips) { + searchRequest.setCcsMinimizeRoundtrips(true); + } + ActionRequestValidationException validationErrors = searchRequest.validate(); + + if (isMinCompatibleShardVersion && shouldSetCcsMinimizeRoundtrips) { + assertNotNull(validationErrors); + assertEquals(1, validationErrors.validationErrors().size()); + assertEquals("[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version", + validationErrors.validationErrors().get(0)); + } else { + assertNull(validationErrors); + } + } } public void testCopyConstructor() throws IOException {