Skip to content

Commit 85794b7

Browse files
authored
Add minimum compatibility version to SearchRequest (#67414)
* Adds a minimum version request parameter to SearchRequest. The minimum version helps failing a request if any shards involved in the search do not meet the compatibility requirements (all shards need to have a version equal or later than the minimum version provided). (cherry picked from commit e3386e1)
1 parent 46ae45c commit 85794b7

File tree

11 files changed

+628
-15
lines changed

11 files changed

+628
-15
lines changed

qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,11 @@ private List<Shard> buildShards(String index, Nodes nodes, RestClient client) th
344344
}
345345

346346
private Nodes buildNodeAndVersions() throws IOException {
347-
Response response = client().performRequest(new Request("GET", "_nodes"));
347+
return buildNodeAndVersions(client());
348+
}
349+
350+
static Nodes buildNodeAndVersions(RestClient client) throws IOException {
351+
Response response = client.performRequest(new Request("GET", "_nodes"));
348352
ObjectPath objectPath = ObjectPath.createFromResponse(response);
349353
Map<String, Object> nodesAsMap = objectPath.evaluate("nodes");
350354
Nodes nodes = new Nodes();
@@ -355,12 +359,12 @@ private Nodes buildNodeAndVersions() throws IOException {
355359
Version.fromString(objectPath.evaluate("nodes." + id + ".version")),
356360
HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address"))));
357361
}
358-
response = client().performRequest(new Request("GET", "_cluster/state"));
362+
response = client.performRequest(new Request("GET", "_cluster/state"));
359363
nodes.setMasterNodeId(ObjectPath.createFromResponse(response).evaluate("master_node"));
360364
return nodes;
361365
}
362366

363-
final class Nodes extends HashMap<String, Node> {
367+
static final class Nodes extends HashMap<String, Node> {
364368

365369
private String masterNodeId = null;
366370

@@ -413,7 +417,7 @@ public String toString() {
413417
}
414418
}
415419

416-
final class Node {
420+
static final class Node {
417421
private final String id;
418422
private final String nodeName;
419423
private final Version version;
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.backwards;
20+
21+
import org.apache.http.HttpHost;
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.backwards.IndexingIT.Node;
24+
import org.elasticsearch.backwards.IndexingIT.Nodes;
25+
import org.elasticsearch.client.Request;
26+
import org.elasticsearch.client.Response;
27+
import org.elasticsearch.client.ResponseException;
28+
import org.elasticsearch.client.RestClient;
29+
import org.elasticsearch.cluster.metadata.IndexMetadata;
30+
import org.elasticsearch.common.CheckedConsumer;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.rest.RestStatus;
33+
import org.elasticsearch.test.rest.ESRestTestCase;
34+
import org.elasticsearch.test.rest.yaml.ObjectPath;
35+
import org.junit.Before;
36+
37+
import java.io.IOException;
38+
import java.util.ArrayList;
39+
import java.util.List;
40+
import java.util.Map;
41+
42+
import static org.hamcrest.Matchers.containsString;
43+
import static org.hamcrest.Matchers.equalTo;
44+
45+
public class SearchWithMinCompatibleSearchNodeIT extends ESRestTestCase {
46+
47+
private static String index = "test_min_version";
48+
private static int numShards;
49+
private static int numReplicas = 1;
50+
private static int numDocs;
51+
private static Nodes nodes;
52+
private static List<Node> bwcNodes;
53+
private static List<Node> newNodes;
54+
private static Version bwcVersion;
55+
private static Version newVersion;
56+
57+
@Before
58+
public void prepareTestData() throws IOException {
59+
nodes = IndexingIT.buildNodeAndVersions(client());
60+
numShards = nodes.size();
61+
numDocs = randomIntBetween(numShards, 16);
62+
bwcNodes = new ArrayList<>();
63+
newNodes = new ArrayList<>();
64+
bwcNodes.addAll(nodes.getBWCNodes());
65+
newNodes.addAll(nodes.getNewNodes());
66+
bwcVersion = bwcNodes.get(0).getVersion();
67+
newVersion = newNodes.get(0).getVersion();
68+
69+
if (client().performRequest(new Request("HEAD", "/" + index)).getStatusLine().getStatusCode() == 404) {
70+
createIndex(index, Settings.builder()
71+
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), numShards)
72+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build());
73+
for (int i = 0; i < numDocs; i++) {
74+
Request request = new Request("PUT", index + "/_doc/" + i);
75+
request.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(2) + "\"}");
76+
assertOK(client().performRequest(request));
77+
}
78+
ensureGreen(index);
79+
}
80+
}
81+
82+
public void testMinVersionAsNewVersion() throws Exception {
83+
Request newVersionRequest = new Request("POST",
84+
index + "/_search?min_compatible_shard_node=" + newVersion + "&ccs_minimize_roundtrips=false");
85+
assertWithBwcVersionCheck((client) -> {
86+
ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(newVersionRequest));
87+
assertThat(responseException.getResponse().getStatusLine().getStatusCode(),
88+
equalTo(RestStatus.INTERNAL_SERVER_ERROR.getStatus()));
89+
assertThat(responseException.getMessage(),
90+
containsString("{\"error\":{\"root_cause\":[],\"type\":\"search_phase_execution_exception\""));
91+
assertThat(responseException.getMessage(), containsString("caused_by\":{\"type\":\"version_mismatch_exception\","
92+
+ "\"reason\":\"One of the shards is incompatible with the required minimum version [" + newVersion + "]\""));
93+
}, newVersionRequest);
94+
}
95+
96+
public void testMinVersionAsOldVersion() throws Exception {
97+
Request oldVersionRequest = new Request("POST", index + "/_search?min_compatible_shard_node=" + bwcVersion +
98+
"&ccs_minimize_roundtrips=false");
99+
oldVersionRequest.setJsonEntity("{\"query\":{\"match_all\":{}},\"_source\":false}");
100+
assertWithBwcVersionCheck((client) -> {
101+
Response response = client.performRequest(oldVersionRequest);
102+
ObjectPath responseObject = ObjectPath.createFromResponse(response);
103+
Map<String, Object> shardsResult = responseObject.evaluate("_shards");
104+
assertThat(shardsResult.get("total"), equalTo(numShards));
105+
assertThat(shardsResult.get("successful"), equalTo(numShards));
106+
assertThat(shardsResult.get("failed"), equalTo(0));
107+
Map<String, Object> hitsResult = responseObject.evaluate("hits.total");
108+
assertThat(hitsResult.get("value"), equalTo(numDocs));
109+
assertThat(hitsResult.get("relation"), equalTo("eq"));
110+
}, oldVersionRequest);
111+
}
112+
113+
public void testCcsMinimizeRoundtripsIsFalse() throws Exception {
114+
Version version = randomBoolean() ? newVersion : bwcVersion;
115+
116+
Request request = new Request("POST", index + "/_search?min_compatible_shard_node=" + version + "&ccs_minimize_roundtrips=true");
117+
assertWithBwcVersionCheck((client) -> {
118+
ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request));
119+
assertThat(responseException.getResponse().getStatusLine().getStatusCode(),
120+
equalTo(RestStatus.BAD_REQUEST.getStatus()));
121+
assertThat(responseException.getMessage(),
122+
containsString("{\"error\":{\"root_cause\":[{\"type\":\"action_request_validation_exception\""));
123+
assertThat(responseException.getMessage(), containsString("\"reason\":\"Validation Failed: 1: "
124+
+ "[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version;\""));
125+
}, request);
126+
}
127+
128+
private void assertWithBwcVersionCheck(CheckedConsumer<RestClient, Exception> code, Request request) throws Exception {
129+
try (RestClient client = buildClient(restClientSettings(),
130+
newNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
131+
assertBusy(() -> {
132+
code.accept(client);
133+
});
134+
}
135+
try (RestClient client = buildClient(restClientSettings(),
136+
bwcNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
137+
if (bwcVersion.before(Version.V_7_12_0)) {
138+
ResponseException exception = expectThrows(ResponseException.class, () -> client.performRequest(request));
139+
assertThat(exception.getResponse().getStatusLine().getStatusCode(),
140+
equalTo(RestStatus.BAD_REQUEST.getStatus()));
141+
if (bwcVersion.onOrAfter(Version.V_7_0_0)) {
142+
// min_compatible_shard_node support doesn't exist in older versions and there will be an "unrecognized parameter"
143+
// exception
144+
assertThat(exception.getMessage(), containsString("contains unrecognized parameter: [min_compatible_shard_node]"));
145+
} else {
146+
// ccs_minimize_roundtrips support doesn't exist in 6.x versions and there will be an "unrecognized parameter" exception
147+
assertThat(exception.getMessage(), containsString("contains unrecognized parameters: [ccs_minimize_roundtrips]"));
148+
}
149+
} else {
150+
assertBusy(() -> {
151+
code.accept(client);
152+
});
153+
}
154+
}
155+
}
156+
}

rest-api-spec/src/main/resources/rest-api-spec/api/search.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,10 @@
248248
"type":"boolean",
249249
"description":"Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
250250
"default":false
251+
},
252+
"min_compatible_shard_node":{
253+
"type":"string",
254+
"description":"The minimum compatible version that all shards involved in search should have for this request to be successful"
251255
}
252256
},
253257
"body":{

server/src/main/java/org/elasticsearch/ElasticsearchException.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1047,7 +1047,12 @@ private enum ElasticsearchExceptionHandle {
10471047
org.elasticsearch.transport.NoSeedNodeLeftException.class,
10481048
org.elasticsearch.transport.NoSeedNodeLeftException::new,
10491049
160,
1050-
Version.V_7_10_0);
1050+
Version.V_7_10_0),
1051+
VERSION_MISMATCH_EXCEPTION(
1052+
org.elasticsearch.action.search.VersionMismatchException.class,
1053+
org.elasticsearch.action.search.VersionMismatchException::new,
1054+
161,
1055+
Version.V_7_12_0);
10511056

10521057
final Class<? extends ElasticsearchException> exceptionClass;
10531058
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,13 @@ public final void run() {
222222
throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
223223
}
224224
}
225-
225+
Version version = request.minCompatibleShardNode();
226+
if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) {
227+
if (checkMinimumVersion(shardsIts) == false) {
228+
throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]",
229+
request.minCompatibleShardNode());
230+
}
231+
}
226232
for (int i = 0; i < shardsIts.size(); i++) {
227233
final SearchShardIterator shardRoutings = shardsIts.get(i);
228234
assert shardRoutings.skip() == false;
@@ -240,6 +246,22 @@ void skipShard(SearchShardIterator iterator) {
240246
successfulShardExecution(iterator);
241247
}
242248

249+
250+
private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> shardsIts) {
251+
for (SearchShardIterator it : shardsIts) {
252+
if (it.getTargetNodeIds().isEmpty() == false) {
253+
boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> {
254+
Transport.Connection conn = getConnection(it.getClusterAlias(), nodeId);
255+
return conn == null ? true : conn.getVersion().onOrAfter(request.minCompatibleShardNode());
256+
});
257+
if (isCompatible == false) {
258+
return false;
259+
}
260+
}
261+
}
262+
return true;
263+
}
264+
243265
protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
244266
/*
245267
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
@@ -660,7 +682,12 @@ final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
660682

661683
@Override
662684
public final Transport.Connection getConnection(String clusterAlias, String nodeId) {
663-
return nodeIdToConnection.apply(clusterAlias, nodeId);
685+
Transport.Connection conn = nodeIdToConnection.apply(clusterAlias, nodeId);
686+
Version minVersion = request.minCompatibleShardNode();
687+
if (minVersion != null && conn != null && conn.getVersion().before(minVersion)) {
688+
throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion);
689+
}
690+
return conn;
664691
}
665692

666693
@Override

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,18 +96,26 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
9696
private Integer preFilterShardSize;
9797

9898
private String[] types = Strings.EMPTY_ARRAY;
99+
private Boolean ccsMinimizeRoundtrips;
99100

100-
private boolean ccsMinimizeRoundtrips = true;
101+
@Nullable
102+
private Version minCompatibleShardNode;
101103

102104
public static final IndicesOptions DEFAULT_INDICES_OPTIONS =
103105
IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled();
104106

105107
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
106108

107109
public SearchRequest() {
110+
this((Version) null);
111+
}
112+
113+
public SearchRequest(Version minCompatibleShardNode) {
108114
this.localClusterAlias = null;
109115
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
110116
this.finalReduce = true;
117+
this.minCompatibleShardNode = minCompatibleShardNode;
118+
this.ccsMinimizeRoundtrips = minCompatibleShardNode == null;
111119
}
112120

113121
/**
@@ -185,6 +193,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca
185193
this.localClusterAlias = localClusterAlias;
186194
this.absoluteStartMillis = absoluteStartMillis;
187195
this.finalReduce = finalReduce;
196+
this.minCompatibleShardNode = searchRequest.minCompatibleShardNode;
188197
}
189198

190199
/**
@@ -230,6 +239,13 @@ public SearchRequest(StreamInput in) throws IOException {
230239
}
231240
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
232241
ccsMinimizeRoundtrips = in.readBoolean();
242+
} else {
243+
ccsMinimizeRoundtrips = true;
244+
}
245+
if (in.getVersion().onOrAfter(Version.V_7_12_0)) {
246+
if (in.readBoolean()) {
247+
minCompatibleShardNode = Version.readVersion(in);
248+
}
233249
}
234250
}
235251

@@ -265,6 +281,12 @@ public void writeTo(StreamOutput out) throws IOException {
265281
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
266282
out.writeBoolean(ccsMinimizeRoundtrips);
267283
}
284+
if (out.getVersion().onOrAfter(Version.V_7_12_0)) {
285+
out.writeBoolean(minCompatibleShardNode != null);
286+
if (minCompatibleShardNode != null) {
287+
Version.writeVersion(minCompatibleShardNode, out);
288+
}
289+
}
268290
}
269291

270292
@Override
@@ -312,6 +334,12 @@ public ActionRequestValidationException validate() {
312334
}
313335
}
314336
}
337+
if (minCompatibleShardNode() != null) {
338+
if (isCcsMinimizeRoundtrips()) {
339+
validationException = addValidationError("[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible "
340+
+ "shard version", validationException);
341+
}
342+
}
315343
return validationException;
316344
}
317345

@@ -350,6 +378,15 @@ long getAbsoluteStartMillis() {
350378
return absoluteStartMillis;
351379
}
352380

381+
/**
382+
* Returns the minimum compatible shard version the search request needs to run on. If the version is null, then there are no
383+
* restrictions imposed on shards versions part of this search.
384+
*/
385+
@Nullable
386+
public Version minCompatibleShardNode() {
387+
return minCompatibleShardNode;
388+
}
389+
353390
/**
354391
* Sets the indices the search will be executed on.
355392
*/
@@ -384,7 +421,7 @@ public boolean includeDataStreams() {
384421

385422
/**
386423
* Returns whether network round-trips should be minimized when executing cross-cluster search requests.
387-
* Defaults to <code>true</code>.
424+
* Defaults to <code>true</code>, unless <code>minCompatibleShardNode</code> is set in which case it's set to <code>false</code>.
388425
*/
389426
public boolean isCcsMinimizeRoundtrips() {
390427
return ccsMinimizeRoundtrips;
@@ -720,14 +757,15 @@ public boolean equals(Object o) {
720757
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
721758
Objects.equals(localClusterAlias, that.localClusterAlias) &&
722759
absoluteStartMillis == that.absoluteStartMillis &&
723-
ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips;
760+
ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips &&
761+
Objects.equals(minCompatibleShardNode, that.minCompatibleShardNode);
724762
}
725763

726764
@Override
727765
public int hashCode() {
728766
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
729767
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
730-
allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips);
768+
allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, minCompatibleShardNode);
731769
}
732770

733771
@Override

0 commit comments

Comments
 (0)