Skip to content

Add minimum compatibility version to SearchRequest #67414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,11 @@ private List<Shard> buildShards(String index, Nodes nodes, RestClient client) th
}

private Nodes buildNodeAndVersions() throws IOException {
Response response = client().performRequest(new Request("GET", "_nodes"));
return buildNodeAndVersions(client());
}

static Nodes buildNodeAndVersions(RestClient client) throws IOException {
Response response = client.performRequest(new Request("GET", "_nodes"));
ObjectPath objectPath = ObjectPath.createFromResponse(response);
Map<String, Object> nodesAsMap = objectPath.evaluate("nodes");
Nodes nodes = new Nodes();
Expand All @@ -355,12 +359,12 @@ private Nodes buildNodeAndVersions() throws IOException {
Version.fromString(objectPath.evaluate("nodes." + id + ".version")),
HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address"))));
}
response = client().performRequest(new Request("GET", "_cluster/state"));
response = client.performRequest(new Request("GET", "_cluster/state"));
nodes.setMasterNodeId(ObjectPath.createFromResponse(response).evaluate("master_node"));
return nodes;
}

final class Nodes extends HashMap<String, Node> {
static final class Nodes extends HashMap<String, Node> {

private String masterNodeId = null;

Expand Down Expand Up @@ -413,7 +417,7 @@ public String toString() {
}
}

final class Node {
static final class Node {
private final String id;
private final String nodeName;
private final Version version;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.backwards;

import org.apache.http.HttpHost;
import org.elasticsearch.Version;
import org.elasticsearch.backwards.IndexingIT.Node;
import org.elasticsearch.backwards.IndexingIT.Nodes;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class SearchWithMinCompatibleSearchNodeIT extends ESRestTestCase {

private static String index = "test_min_version";
private static int numShards;
private static int numReplicas = 1;
private static int numDocs;
private static Nodes nodes;
private static List<Node> bwcNodes;
private static List<Node> newNodes;
private static Version bwcVersion;
private static Version newVersion;

@Before
public void prepareTestData() throws IOException {
nodes = IndexingIT.buildNodeAndVersions(client());
numShards = nodes.size();
numDocs = randomIntBetween(numShards, 16);
bwcNodes = new ArrayList<>();
newNodes = new ArrayList<>();
bwcNodes.addAll(nodes.getBWCNodes());
newNodes.addAll(nodes.getNewNodes());
bwcVersion = bwcNodes.get(0).getVersion();
newVersion = newNodes.get(0).getVersion();

if (client().performRequest(new Request("HEAD", "/" + index)).getStatusLine().getStatusCode() == 404) {
createIndex(index, Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), numShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build());
for (int i = 0; i < numDocs; i++) {
Request request = new Request("PUT", index + "/_doc/" + i);
request.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(2) + "\"}");
assertOK(client().performRequest(request));
}
ensureGreen(index);
}
}

public void testMinVersionAsNewVersion() throws Exception {
Request newVersionRequest = new Request("POST",
index + "/_search?min_compatible_shard_node=" + newVersion + "&ccs_minimize_roundtrips=false");
assertWithBwcVersionCheck((client) -> {
ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(newVersionRequest));
assertThat(responseException.getResponse().getStatusLine().getStatusCode(),
equalTo(RestStatus.INTERNAL_SERVER_ERROR.getStatus()));
assertThat(responseException.getMessage(),
containsString("{\"error\":{\"root_cause\":[],\"type\":\"search_phase_execution_exception\""));
assertThat(responseException.getMessage(), containsString("caused_by\":{\"type\":\"version_mismatch_exception\","
+ "\"reason\":\"One of the shards is incompatible with the required minimum version [" + newVersion + "]\""));
}, newVersionRequest);
}

public void testMinVersionAsOldVersion() throws Exception {
Request oldVersionRequest = new Request("POST", index + "/_search?min_compatible_shard_node=" + bwcVersion +
"&ccs_minimize_roundtrips=false");
oldVersionRequest.setJsonEntity("{\"query\":{\"match_all\":{}},\"_source\":false}");
assertWithBwcVersionCheck((client) -> {
Response response = client.performRequest(oldVersionRequest);
ObjectPath responseObject = ObjectPath.createFromResponse(response);
Map<String, Object> shardsResult = responseObject.evaluate("_shards");
assertThat(shardsResult.get("total"), equalTo(numShards));
assertThat(shardsResult.get("successful"), equalTo(numShards));
assertThat(shardsResult.get("failed"), equalTo(0));
Map<String, Object> hitsResult = responseObject.evaluate("hits.total");
assertThat(hitsResult.get("value"), equalTo(numDocs));
assertThat(hitsResult.get("relation"), equalTo("eq"));
}, oldVersionRequest);
}

public void testCcsMinimizeRoundtripsIsFalse() throws Exception {
Version version = randomBoolean() ? newVersion : bwcVersion;

Request request = new Request("POST", index + "/_search?min_compatible_shard_node=" + version + "&ccs_minimize_roundtrips=true");
assertWithBwcVersionCheck((client) -> {
ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request));
assertThat(responseException.getResponse().getStatusLine().getStatusCode(),
equalTo(RestStatus.BAD_REQUEST.getStatus()));
assertThat(responseException.getMessage(),
containsString("{\"error\":{\"root_cause\":[{\"type\":\"action_request_validation_exception\""));
assertThat(responseException.getMessage(), containsString("\"reason\":\"Validation Failed: 1: "
+ "[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version;\""));
}, request);
}

private void assertWithBwcVersionCheck(CheckedConsumer<RestClient, Exception> code, Request request) throws Exception {
try (RestClient client = buildClient(restClientSettings(),
newNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
assertBusy(() -> {
code.accept(client);
});
}
try (RestClient client = buildClient(restClientSettings(),
bwcNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
if (bwcVersion.before(Version.V_7_12_0)) {
ResponseException exception = expectThrows(ResponseException.class, () -> client.performRequest(request));
assertThat(exception.getResponse().getStatusLine().getStatusCode(),
equalTo(RestStatus.BAD_REQUEST.getStatus()));
if (bwcVersion.onOrAfter(Version.V_7_0_0)) {
// min_compatible_shard_node support doesn't exist in older versions and there will be an "unrecognized parameter"
// exception
assertThat(exception.getMessage(), containsString("contains unrecognized parameter: [min_compatible_shard_node]"));
} else {
// ccs_minimize_roundtrips support doesn't exist in 6.x versions and there will be an "unrecognized parameter" exception
assertThat(exception.getMessage(), containsString("contains unrecognized parameters: [ccs_minimize_roundtrips]"));
}
} else {
assertBusy(() -> {
code.accept(client);
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@
"type":"boolean",
"description":"Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
"default":false
},
"min_compatible_shard_node":{
"type":"string",
"description":"The minimum compatible version that all shards involved in search should have for this request to be successful"
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,12 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.transport.NoSeedNodeLeftException.class,
org.elasticsearch.transport.NoSeedNodeLeftException::new,
160,
Version.V_7_10_0);
Version.V_7_10_0),
VERSION_MISMATCH_EXCEPTION(
org.elasticsearch.action.search.VersionMismatchException.class,
org.elasticsearch.action.search.VersionMismatchException::new,
161,
Version.V_7_12_0);

final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,13 @@ public final void run() {
throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
}
}

Version version = request.minCompatibleShardNode();
if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) {
if (checkMinimumVersion(shardsIts) == false) {
throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]",
request.minCompatibleShardNode());
}
}
for (int i = 0; i < shardsIts.size(); i++) {
final SearchShardIterator shardRoutings = shardsIts.get(i);
assert shardRoutings.skip() == false;
Expand All @@ -240,6 +246,22 @@ void skipShard(SearchShardIterator iterator) {
successfulShardExecution(iterator);
}


private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> shardsIts) {
for (SearchShardIterator it : shardsIts) {
if (it.getTargetNodeIds().isEmpty() == false) {
boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> {
Transport.Connection conn = getConnection(it.getClusterAlias(), nodeId);
return conn == null ? true : conn.getVersion().onOrAfter(request.minCompatibleShardNode());
});
if (isCompatible == false) {
return false;
}
}
}
return true;
}

protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
Expand Down Expand Up @@ -660,7 +682,12 @@ final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()

@Override
public final Transport.Connection getConnection(String clusterAlias, String nodeId) {
return nodeIdToConnection.apply(clusterAlias, nodeId);
Transport.Connection conn = nodeIdToConnection.apply(clusterAlias, nodeId);
Version minVersion = request.minCompatibleShardNode();
if (minVersion != null && conn != null && conn.getVersion().before(minVersion)) {
throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion);
}
return conn;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,26 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
private Integer preFilterShardSize;

private String[] types = Strings.EMPTY_ARRAY;
private Boolean ccsMinimizeRoundtrips;

private boolean ccsMinimizeRoundtrips = true;
@Nullable
private Version minCompatibleShardNode;

public static final IndicesOptions DEFAULT_INDICES_OPTIONS =
IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled();

private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;

public SearchRequest() {
this((Version) null);
}

public SearchRequest(Version minCompatibleShardNode) {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
this.finalReduce = true;
this.minCompatibleShardNode = minCompatibleShardNode;
this.ccsMinimizeRoundtrips = minCompatibleShardNode == null;
}

/**
Expand Down Expand Up @@ -185,6 +193,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca
this.localClusterAlias = localClusterAlias;
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
this.minCompatibleShardNode = searchRequest.minCompatibleShardNode;
}

/**
Expand Down Expand Up @@ -230,6 +239,13 @@ public SearchRequest(StreamInput in) throws IOException {
}
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ccsMinimizeRoundtrips = in.readBoolean();
} else {
ccsMinimizeRoundtrips = true;
}
if (in.getVersion().onOrAfter(Version.V_7_12_0)) {
if (in.readBoolean()) {
minCompatibleShardNode = Version.readVersion(in);
}
}
}

Expand Down Expand Up @@ -265,6 +281,12 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(ccsMinimizeRoundtrips);
}
if (out.getVersion().onOrAfter(Version.V_7_12_0)) {
out.writeBoolean(minCompatibleShardNode != null);
if (minCompatibleShardNode != null) {
Version.writeVersion(minCompatibleShardNode, out);
}
}
}

@Override
Expand Down Expand Up @@ -312,6 +334,12 @@ public ActionRequestValidationException validate() {
}
}
}
if (minCompatibleShardNode() != null) {
if (isCcsMinimizeRoundtrips()) {
validationException = addValidationError("[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible "
+ "shard version", validationException);
}
}
return validationException;
}

Expand Down Expand Up @@ -350,6 +378,15 @@ long getAbsoluteStartMillis() {
return absoluteStartMillis;
}

/**
* Returns the minimum compatible shard version the search request needs to run on. If the version is null, then there are no
* restrictions imposed on shards versions part of this search.
*/
@Nullable
public Version minCompatibleShardNode() {
return minCompatibleShardNode;
}

/**
* Sets the indices the search will be executed on.
*/
Expand Down Expand Up @@ -384,7 +421,7 @@ public boolean includeDataStreams() {

/**
* Returns whether network round-trips should be minimized when executing cross-cluster search requests.
* Defaults to <code>true</code>.
* Defaults to <code>true</code>, unless <code>minCompatibleShardNode</code> is set in which case it's set to <code>false</code>.
*/
public boolean isCcsMinimizeRoundtrips() {
return ccsMinimizeRoundtrips;
Expand Down Expand Up @@ -720,14 +757,15 @@ public boolean equals(Object o) {
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
Objects.equals(localClusterAlias, that.localClusterAlias) &&
absoluteStartMillis == that.absoluteStartMillis &&
ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips;
ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips &&
Objects.equals(minCompatibleShardNode, that.minCompatibleShardNode);
}

@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips);
allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, minCompatibleShardNode);
}

@Override
Expand Down
Loading