Skip to content

Check product compatibility 7.x #74272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.elasticsearch.client;

import org.apache.http.HttpEntity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -64,9 +67,12 @@
import org.elasticsearch.client.core.TermVectorsRequest;
import org.elasticsearch.client.core.TermVectorsResponse;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -205,6 +211,8 @@
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -244,10 +252,16 @@
*/
public class RestHighLevelClient implements Closeable {

private static final Logger logger = LogManager.getLogger(RestHighLevelClient.class);

// To be called using performClientRequest and performClientRequestAsync to ensure version compatibility check
private final RestClient client;
private final NamedXContentRegistry registry;
private final CheckedConsumer<RestClient, IOException> doClose;

/** Do not access directly but through getVersionValidationFuture() */
private volatile ListenableFuture<Optional<String>> versionValidationFuture;

private final IndicesClient indicesClient = new IndicesClient(this);
private final ClusterClient clusterClient = new ClusterClient(this);
private final IngestClient ingestClient = new IngestClient(this);
Expand Down Expand Up @@ -1715,7 +1729,7 @@ private <Req, Resp> Resp internalPerformRequest(Req request,
req.setOptions(options);
Response response;
try {
response = client.performRequest(req);
response = performClientRequest(req);
} catch (ResponseException e) {
if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) {
try {
Expand Down Expand Up @@ -1755,7 +1769,7 @@ protected final <Req extends Validatable, Resp> Optional<Resp> performRequestAnd
req.setOptions(options);
Response response;
try {
response = client.performRequest(req);
response = performClientRequest(req);
} catch (ResponseException e) {
if (RestStatus.NOT_FOUND.getStatus() == e.getResponse().getStatusLine().getStatusCode()) {
return Optional.empty();
Expand Down Expand Up @@ -1854,7 +1868,7 @@ private <Req, Resp> Cancellable internalPerformRequestAsync(Req request,
req.setOptions(options);

ResponseListener responseListener = wrapResponseListener(responseConverter, listener, ignores);
return client.performRequestAsync(req, responseListener);
return performClientRequestAsync(req, responseListener);
}


Expand Down Expand Up @@ -1920,7 +1934,7 @@ protected final <Req extends Validatable, Resp> Cancellable performRequestAsyncA
req.setOptions(options);
ResponseListener responseListener = wrapResponseListener404sOptional(response -> parseEntity(response.getEntity(),
entityParser), listener);
return client.performRequestAsync(req, responseListener);
return performClientRequestAsync(req, responseListener);
}

final <Resp> ResponseListener wrapResponseListener404sOptional(CheckedFunction<Response, Resp, IOException> responseConverter,
Expand Down Expand Up @@ -2002,6 +2016,204 @@ protected static boolean convertExistsResponse(Response response) {
return response.getStatusLine().getStatusCode() == 200;
}

private Cancellable performClientRequestAsync(Request request, ResponseListener listener) {

ListenableFuture<Optional<String>> versionCheck = getVersionValidationFuture();

// Create a future that tracks cancellation of this method's result and forwards cancellation to the actual LLRC request.
CompletableFuture<Void> cancellationForwarder = new CompletableFuture<Void>();
Cancellable result = new Cancellable() {
@Override
public void cancel() {
// Raise the flag by completing the future
FutureUtils.cancel(cancellationForwarder);
}

@Override
void runIfNotCancelled(Runnable runnable) {
if (cancellationForwarder.isCancelled()) {
throw newCancellationException();
}
runnable.run();
}
};

// Send the request after we have done the version compatibility check. Note that if it has already happened, the listener will
// be called immediately on the same thread with no asynchronous scheduling overhead.
versionCheck.addListener(new ActionListener<Optional<String>>() {
@Override
public void onResponse(Optional<String> validation) {
if (validation.isPresent() == false) {
// Send the request and propagate cancellation
Cancellable call = client.performRequestAsync(request, listener);
cancellationForwarder.whenComplete((r, t) ->
// Forward cancellation to the actual request (no need to check parameters as the
// only way for cancellationForwarder to be completed is by being cancelled).
call.cancel()
);
} else {
// Version validation wasn't successful, fail the request with the validation result.
listener.onFailure(new ElasticsearchException(validation.get()));
}
}

@Override
public void onFailure(Exception e) {
// Propagate validation request failure. This will be transient since `getVersionValidationFuture` clears the validation
// future if the request fails, leading to retries at the next HLRC request (see comments below).
listener.onFailure(e);
}
});

return result;
};

private Response performClientRequest(Request request) throws IOException {

Optional<String> versionValidation;
try {
versionValidation = getVersionValidationFuture().get();
} catch (InterruptedException | ExecutionException e) {
// Unlikely to happen
throw new ElasticsearchException(e);
}

if (versionValidation.isPresent() == false) {
return client.performRequest(request);
} else {
throw new ElasticsearchException(versionValidation.get());
}
}

/**
* Returns a future that asynchronously validates the Elasticsearch product version. Its result is an optional string: if empty then
* validation was successful, if present it contains the validation error. API requests should be chained to this future and check
* the validation result before going further.
* <p>
* This future is a memoization of the first successful request to the "/" endpoint and the subsequent compatibility check
* ({@see #versionValidationFuture}). Further client requests reuse its result.
* <p>
* If the version check request fails (e.g. network error), {@link #versionValidationFuture} is cleared so that a new validation
* request is sent at the next HLRC request. This allows retries to happen while avoiding a busy retry loop (LLRC retries on the node
* pool still happen).
*/
private ListenableFuture<Optional<String>> getVersionValidationFuture() {
ListenableFuture<Optional<String>> currentFuture = this.versionValidationFuture;
if (currentFuture != null) {
return currentFuture;
} else {
synchronized (this) {
// Re-check in synchronized block
currentFuture = this.versionValidationFuture;
if (currentFuture != null) {
return currentFuture;
}
ListenableFuture<Optional<String>> future = new ListenableFuture<>();
this.versionValidationFuture = future;

// Asynchronously call the info endpoint and complete the future with the version validation result.
Request req = new Request("GET", "/");
// These status codes are nominal in the context of product version verification
req.addParameter("ignore", "401,403");
client.performRequestAsync(req, new ResponseListener() {
@Override
public void onSuccess(Response response) {
Optional<String> validation;
try {
validation = getVersionValidation(response);
} catch (Exception e) {
logger.error("Failed to parse info response", e);
validation = Optional.of("Failed to parse info response. Check logs for detailed information - " +
e.getMessage());
}
future.onResponse(validation);
}

@Override
public void onFailure(Exception exception) {

// Fail the requests (this one and the ones waiting for it) and clear the future
// so that we retry the next time the client executes a request.
versionValidationFuture = null;
future.onFailure(exception);
}
});

return future;
}
}
}

/**
* Validates that the response info() is a compatible Elasticsearch version.
*
* @return an optional string. If empty, version is compatible. Otherwise, it's the message to return to the application.
*/
private Optional<String> getVersionValidation(Response response) throws IOException {
// Let requests go through if the client doesn't have permissions for the info endpoint.
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 401 || statusCode == 403) {
return Optional.empty();
}

MainResponse mainResponse;
try {
mainResponse = parseEntity(response.getEntity(), MainResponse::fromXContent);
} catch (ResponseException e) {
throw parseResponseException(e);
}

String version = mainResponse.getVersion().getNumber();
if (Strings.hasLength(version) == false) {
return Optional.of("Missing version.number in info response");
}

String[] parts = version.split("\\.");
if (parts.length < 2) {
return Optional.of("Wrong version.number format in info response");
}

int major = Integer.parseInt(parts[0]);
int minor = Integer.parseInt(parts[1]);

if (major < 6) {
return Optional.of("Elasticsearch version 6 or more is required");
}

if (major == 6 || (major == 7 && minor < 14)) {
if ("You Know, for Search".equalsIgnoreCase(mainResponse.getTagline()) == false) {
return Optional.of("Invalid or missing tagline [" + mainResponse.getTagline() + "]");
}

if (major == 7) {
// >= 7.0 and < 7.14
String responseFlavor = mainResponse.getVersion().getBuildFlavor();
if ("default".equals(responseFlavor) == false) {
// Flavor is unknown when running tests, and non-mocked responses will return an unknown flavor
if (Build.CURRENT.flavor() != Build.Flavor.UNKNOWN || "unknown".equals(responseFlavor) == false) {
return Optional.of("Invalid or missing build flavor [" + responseFlavor + "]");
}
}
}

return Optional.empty();
}

String header = response.getHeader("X-Elastic-Product");
if (header == null) {
return Optional.of(
"Missing [X-Elastic-Product] header. Please check that you are connecting to an Elasticsearch " +
"instance, and that any networking filters are preserving that header."
);
}

if ("Elasticsearch".equals(header) == false) {
return Optional.of("Invalid value [" + header + "] for [X-Elastic-Product] header.");
}

return Optional.empty();
}

/**
* Ignores deprecation warnings. This is appropriate because it is only
* used to parse responses from Elasticsearch. Any deprecation warnings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.RequestMatcher;
import org.junit.Before;

import java.io.IOException;
Expand All @@ -44,6 +45,7 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -61,17 +63,18 @@ public class CustomRestHighLevelClientTests extends ESTestCase {
public void initClients() throws IOException {
if (restHighLevelClient == null) {
final RestClient restClient = mock(RestClient.class);
RestHighLevelClientTests.mockGetRoot(restClient);
restHighLevelClient = new CustomRestClient(restClient);

doAnswer(inv -> mockPerformRequest((Request) inv.getArguments()[0]))
.when(restClient)
.performRequest(any(Request.class));
.performRequest(argThat(new RequestMatcher("GET", ENDPOINT)));

doAnswer(inv -> mockPerformRequestAsync(
((Request) inv.getArguments()[0]),
(ResponseListener) inv.getArguments()[1]))
.when(restClient)
.performRequestAsync(any(Request.class), any(ResponseListener.class));
.performRequestAsync(argThat(new RequestMatcher("GET", ENDPOINT)), any(ResponseListener.class));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class MockRestHighLevelTests extends ESTestCase {
@Before
private void setupClient() throws IOException {
final RestClient mockClient = mock(RestClient.class);
RestHighLevelClientTests.mockGetRoot(mockClient);
final Response mockResponse = mock(Response.class);

when(mockResponse.getHost()).thenReturn(new HttpHost("localhost", 9200));
Expand Down
Loading