From 2145444d91ea32563d95c6bf5195987537291740 Mon Sep 17 00:00:00 2001 From: Sylvain Wallez Date: Thu, 17 Jun 2021 23:29:07 +0200 Subject: [PATCH 1/2] Verify that main info response returns correct product headers (#73910) Follow-up to #73434 Ensures that High Level Rest Client is running against a verified Elasticsearch. When the first request is send on HLRC, a request to the info endpoint is made first to verify the product identification and version. --- .../client/RestHighLevelClient.java | 220 +++++++++++++- .../CustomRestHighLevelClientTests.java | 7 +- .../client/MockRestHighLevelTests.java | 1 + .../client/RestHighLevelClientTests.java | 277 +++++++++++++++++- .../elasticsearch/test/RequestMatcher.java | 45 +++ .../org/elasticsearch/client/Cancellable.java | 71 +++-- 6 files changed, 584 insertions(+), 37 deletions(-) create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/test/RequestMatcher.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 67c53932dd780..a44d6e83aab1f 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -9,6 +9,9 @@ package org.elasticsearch.client; import org.apache.http.HttpEntity; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; @@ -64,9 +67,12 @@ import org.elasticsearch.client.core.TermVectorsRequest; import org.elasticsearch.client.core.TermVectorsResponse; import org.elasticsearch.client.tasks.TaskSubmissionResponse; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.common.xcontent.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -205,6 +211,8 @@ import java.util.Optional; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -244,10 +252,16 @@ */ public class RestHighLevelClient implements Closeable { + private static final Logger logger = LogManager.getLogger(RestHighLevelClient.class); + + // To be called using performClientRequest and performClientRequestAsync to ensure version compatibility check private final RestClient client; private final NamedXContentRegistry registry; private final CheckedConsumer doClose; + /** Do not access directly but through getVersionValidationFuture() */ + private volatile ListenableFuture> versionValidationFuture; + private final IndicesClient indicesClient = new IndicesClient(this); private final ClusterClient clusterClient = new ClusterClient(this); private final IngestClient ingestClient = new IngestClient(this); @@ -1715,7 +1729,7 @@ private Resp internalPerformRequest(Req request, req.setOptions(options); Response response; try { - response = client.performRequest(req); + response = performClientRequest(req); } catch (ResponseException e) { if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) { try { @@ -1755,7 +1769,7 @@ protected final Optional performRequestAnd req.setOptions(options); Response response; try { - response = client.performRequest(req); + response = performClientRequest(req); } catch (ResponseException e) { if (RestStatus.NOT_FOUND.getStatus() == e.getResponse().getStatusLine().getStatusCode()) { return Optional.empty(); @@ -1854,7 +1868,7 @@ private Cancellable internalPerformRequestAsync(Req request, req.setOptions(options); ResponseListener responseListener = wrapResponseListener(responseConverter, listener, ignores); - return client.performRequestAsync(req, responseListener); + return performClientRequestAsync(req, responseListener); } @@ -1920,7 +1934,7 @@ protected final Cancellable performRequestAsyncA req.setOptions(options); ResponseListener responseListener = wrapResponseListener404sOptional(response -> parseEntity(response.getEntity(), entityParser), listener); - return client.performRequestAsync(req, responseListener); + return performClientRequestAsync(req, responseListener); } final ResponseListener wrapResponseListener404sOptional(CheckedFunction responseConverter, @@ -2002,6 +2016,204 @@ protected static boolean convertExistsResponse(Response response) { return response.getStatusLine().getStatusCode() == 200; } + private Cancellable performClientRequestAsync(Request request, ResponseListener listener) { + + ListenableFuture> versionCheck = getVersionValidationFuture(); + + // Create a future that tracks cancellation of this method's result and forwards cancellation to the actual LLRC request. + CompletableFuture cancellationForwarder = new CompletableFuture(); + Cancellable result = new Cancellable() { + @Override + public void cancel() { + // Raise the flag by completing the future + FutureUtils.cancel(cancellationForwarder); + } + + @Override + void runIfNotCancelled(Runnable runnable) { + if (cancellationForwarder.isCancelled()) { + throw newCancellationException(); + } + runnable.run(); + } + }; + + // Send the request after we have done the version compatibility check. Note that if it has already happened, the listener will + // be called immediately on the same thread with no asynchronous scheduling overhead. + versionCheck.addListener(new ActionListener>() { + @Override + public void onResponse(Optional validation) { + if (validation.isEmpty()) { + // Send the request and propagate cancellation + Cancellable call = client.performRequestAsync(request, listener); + cancellationForwarder.whenComplete((r, t) -> + // Forward cancellation to the actual request (no need to check parameters as the + // only way for cancellationForwarder to be completed is by being cancelled). + call.cancel() + ); + } else { + // Version validation wasn't successful, fail the request with the validation result. + listener.onFailure(new ElasticsearchException(validation.get())); + } + } + + @Override + public void onFailure(Exception e) { + // Propagate validation request failure. This will be transient since `getVersionValidationFuture` clears the validation + // future if the request fails, leading to retries at the next HLRC request (see comments below). + listener.onFailure(e); + } + }); + + return result; + }; + + private Response performClientRequest(Request request) throws IOException { + + Optional versionValidation; + try { + versionValidation = getVersionValidationFuture().get(); + } catch (InterruptedException | ExecutionException e) { + // Unlikely to happen + throw new ElasticsearchException(e); + } + + if (versionValidation.isEmpty()) { + return client.performRequest(request); + } else { + throw new ElasticsearchException(versionValidation.get()); + } + } + + /** + * Returns a future that asynchronously validates the Elasticsearch product version. Its result is an optional string: if empty then + * validation was successful, if present it contains the validation error. API requests should be chained to this future and check + * the validation result before going further. + *

+ * This future is a memoization of the first successful request to the "/" endpoint and the subsequent compatibility check + * ({@see #versionValidationFuture}). Further client requests reuse its result. + *

+ * If the version check request fails (e.g. network error), {@link #versionValidationFuture} is cleared so that a new validation + * request is sent at the next HLRC request. This allows retries to happen while avoiding a busy retry loop (LLRC retries on the node + * pool still happen). + */ + private ListenableFuture> getVersionValidationFuture() { + ListenableFuture> currentFuture = this.versionValidationFuture; + if (currentFuture != null) { + return currentFuture; + } else { + synchronized (this) { + // Re-check in synchronized block + currentFuture = this.versionValidationFuture; + if (currentFuture != null) { + return currentFuture; + } + ListenableFuture> future = new ListenableFuture<>(); + this.versionValidationFuture = future; + + // Asynchronously call the info endpoint and complete the future with the version validation result. + Request req = new Request("GET", "/"); + // These status codes are nominal in the context of product version verification + req.addParameter("ignore", "401,403"); + client.performRequestAsync(req, new ResponseListener() { + @Override + public void onSuccess(Response response) { + Optional validation; + try { + validation = getVersionValidation(response); + } catch (Exception e) { + logger.error("Failed to parse info response", e); + validation = Optional.of("Failed to parse info response. Check logs for detailed information - " + + e.getMessage()); + } + future.onResponse(validation); + } + + @Override + public void onFailure(Exception exception) { + + // Fail the requests (this one and the ones waiting for it) and clear the future + // so that we retry the next time the client executes a request. + versionValidationFuture = null; + future.onFailure(exception); + } + }); + + return future; + } + } + } + + /** + * Validates that the response info() is a compatible Elasticsearch version. + * + * @return an optional string. If empty, version is compatible. Otherwise, it's the message to return to the application. + */ + private Optional getVersionValidation(Response response) throws IOException { + // Let requests go through if the client doesn't have permissions for the info endpoint. + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 401 || statusCode == 403) { + return Optional.empty(); + } + + MainResponse mainResponse; + try { + mainResponse = parseEntity(response.getEntity(), MainResponse::fromXContent); + } catch (ResponseException e) { + throw parseResponseException(e); + } + + String version = mainResponse.getVersion().getNumber(); + if (Strings.hasLength(version) == false) { + return Optional.of("Missing version.number in info response"); + } + + String[] parts = version.split("\\."); + if (parts.length < 2) { + return Optional.of("Wrong version.number format in info response"); + } + + int major = Integer.parseInt(parts[0]); + int minor = Integer.parseInt(parts[1]); + + if (major < 6) { + return Optional.of("Elasticsearch version 6 or more is required"); + } + + if (major == 6 || (major == 7 && minor < 14)) { + if ("You Know, for Search".equalsIgnoreCase(mainResponse.getTagline()) == false) { + return Optional.of("Invalid or missing tagline [" + mainResponse.getTagline() + "]"); + } + + if (major == 7) { + // >= 7.0 and < 7.14 + String responseFlavor = mainResponse.getVersion().getBuildFlavor(); + if ("default".equals(responseFlavor) == false) { + // Flavor is unknown when running tests, and non-mocked responses will return an unknown flavor + if (Build.CURRENT.flavor() != Build.Flavor.UNKNOWN || "unknown".equals(responseFlavor) == false) { + return Optional.of("Invalid or missing build flavor [" + responseFlavor + "]"); + } + } + } + + return Optional.empty(); + } + + String header = response.getHeader("X-Elastic-Product"); + if (header == null) { + return Optional.of( + "Missing [X-Elastic-Product] header. Please check that you are connecting to an Elasticsearch " + + "instance, and that any networking filters are preserving that header." + ); + } + + if ("Elasticsearch".equals(header) == false) { + return Optional.of("Invalid value [" + header + "] for [X-Elastic-Product] header."); + } + + return Optional.empty(); + } + /** * Ignores deprecation warnings. This is appropriate because it is only * used to parse responses from Elasticsearch. Any deprecation warnings diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java index 44c7627670aed..e960a696672df 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.RequestMatcher; import org.junit.Before; import java.io.IOException; @@ -44,6 +45,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.hasSize; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -61,17 +63,18 @@ public class CustomRestHighLevelClientTests extends ESTestCase { public void initClients() throws IOException { if (restHighLevelClient == null) { final RestClient restClient = mock(RestClient.class); + RestHighLevelClientTests.mockGetRoot(restClient); restHighLevelClient = new CustomRestClient(restClient); doAnswer(inv -> mockPerformRequest((Request) inv.getArguments()[0])) .when(restClient) - .performRequest(any(Request.class)); + .performRequest(argThat(new RequestMatcher("GET", ENDPOINT))); doAnswer(inv -> mockPerformRequestAsync( ((Request) inv.getArguments()[0]), (ResponseListener) inv.getArguments()[1])) .when(restClient) - .performRequestAsync(any(Request.class), any(ResponseListener.class)); + .performRequestAsync(argThat(new RequestMatcher("GET", ENDPOINT)), any(ResponseListener.class)); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MockRestHighLevelTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MockRestHighLevelTests.java index d04bd9268520a..e9812ac68dbeb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MockRestHighLevelTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MockRestHighLevelTests.java @@ -34,6 +34,7 @@ public class MockRestHighLevelTests extends ESTestCase { @Before private void setupClient() throws IOException { final RestClient mockClient = mock(RestClient.class); + RestHighLevelClientTests.mockGetRoot(mockClient); final Response mockResponse = mock(Response.class); when(mockResponse.getHost()).thenReturn(new HttpHost("localhost", 9200)); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 0b7098fe0ff9d..8ffcc8056a5d2 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -13,6 +13,7 @@ import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; import org.apache.http.ProtocolVersion; import org.apache.http.RequestLine; import org.apache.http.StatusLine; @@ -23,7 +24,9 @@ import org.apache.http.message.BasicStatusLine; import org.apache.http.nio.entity.NByteArrayEntity; import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; @@ -33,6 +36,7 @@ import org.elasticsearch.action.search.SearchResponseSections; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.core.GetSourceRequest; import org.elasticsearch.client.core.MainRequest; import org.elasticsearch.client.core.MainResponse; import org.elasticsearch.client.indexlifecycle.AllocateAction; @@ -83,6 +87,7 @@ import org.elasticsearch.client.transform.transforms.SyncConfig; import org.elasticsearch.client.transform.transforms.TimeRetentionPolicyConfig; import org.elasticsearch.client.transform.transforms.TimeSyncConfig; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.Tuple; @@ -92,6 +97,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.cbor.CborXContent; import org.elasticsearch.common.xcontent.smile.SmileXContent; import org.elasticsearch.index.rankeval.DiscountedCumulativeGain; @@ -110,15 +116,18 @@ import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; +import org.elasticsearch.test.RequestMatcher; import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestApi; import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec; import org.hamcrest.Matchers; import org.junit.Before; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.net.SocketTimeoutException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -140,6 +149,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.hasItems; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -171,9 +181,62 @@ public class RestHighLevelClientTests extends ESTestCase { private RestHighLevelClient restHighLevelClient; @Before - public void initClient() { + public void initClient() throws IOException { restClient = mock(RestClient.class); restHighLevelClient = new RestHighLevelClient(restClient, RestClient::close, Collections.emptyList()); + mockGetRoot(restClient); + } + + /** + * Mock rest client to return a valid response to async GET with the current build "/" + */ + static void mockGetRoot(RestClient restClient) throws IOException{ + Build build = new Build( + Build.Flavor.DEFAULT, Build.CURRENT.type(), Build.CURRENT.hash(), + Build.CURRENT.date(), false, Build.CURRENT.getQualifiedVersion() + ); + + mockGetRoot(restClient, build, true); + } + + /** + * Mock rest client to return a valid response to async GET with a specific build version "/" + */ + public static void mockGetRoot(RestClient restClient, Build build, boolean setProductHeader) throws IOException { + org.elasticsearch.action.main.MainResponse mainResp = new org.elasticsearch.action.main.MainResponse( + "node", + Version.fromString(build.getQualifiedVersion().replace("-SNAPSHOT", "")), + new ClusterName("cluster"), + "uuid", + build + ); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), baos); + mainResp.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.close(); + mockGetRoot(restClient, baos.toByteArray(), setProductHeader); + } + + /** + * Mock rest client to return a valid response to async GET with an arbitrary binary payload "/" + */ + public static void mockGetRoot(RestClient restClient, byte[] responseBody, boolean setProductHeader) throws IOException { + NByteArrayEntity entity = new NByteArrayEntity(responseBody, ContentType.APPLICATION_JSON); + Response response = mock(Response.class); + when(response.getStatusLine()).thenReturn(newStatusLine(RestStatus.OK)); + when(response.getEntity()).thenReturn(entity); + if (setProductHeader) { + when(response.getHeader("X-Elastic-Product")).thenReturn("Elasticsearch"); + } + + when(restClient + .performRequestAsync(argThat(new RequestMatcher("GET", "/")), any())) + .thenAnswer(i -> { + ((ResponseListener)i.getArguments()[1]).onSuccess(response); + return Cancellable.NO_OP; + } + ); } public void testCloseIsIdempotent() throws IOException { @@ -955,6 +1018,218 @@ public void testApiNamingConventions() throws Exception { assertThat("Some API are not supported but they should be: " + apiUnsupported, apiUnsupported.size(), equalTo(0)); } + private static void doTestProductCompatibilityCheck( + boolean shouldBeAccepted, String version, boolean setProductHeader) throws Exception { + + // An endpoint different from "/" that returns a boolean + GetSourceRequest apiRequest = new GetSourceRequest("foo", "bar"); + + StatusLine apiStatus = mock(StatusLine.class); + when(apiStatus.getStatusCode()).thenReturn(200); + + Response apiResponse = mock(Response.class); + when(apiResponse.getStatusLine()).thenReturn(apiStatus); + + RestClient restClient = mock(RestClient.class); + + Build build = new Build(Build.Flavor.DEFAULT, Build.Type.UNKNOWN, "hash", "date", false, version); + mockGetRoot(restClient, build, setProductHeader); + when(restClient.performRequest(argThat(new RequestMatcher("HEAD", "/foo/_source/bar")))).thenReturn(apiResponse); + + RestHighLevelClient highLevelClient = new RestHighLevelClient(restClient, RestClient::close, Collections.emptyList()); + + if (shouldBeAccepted) { + assertTrue(highLevelClient.existsSource(apiRequest, RequestOptions.DEFAULT)); + } else { + expectThrows(ElasticsearchException.class, () -> + highLevelClient.existsSource(apiRequest, RequestOptions.DEFAULT) + ); + } + } + + public void testProductCompatibilityCheck() throws Exception { + // Version < 6.0.0 + doTestProductCompatibilityCheck(false, "5.0.0", false); + + // Version < 6.0.0, product header + doTestProductCompatibilityCheck(false, "5.0.0", true); + + // Version 6.x - + doTestProductCompatibilityCheck(true, "6.0.0", false); + + // Version 7.x, x < 14 + doTestProductCompatibilityCheck(true, "7.0.0", false); + + // Version 7.14, no product header + doTestProductCompatibilityCheck(false, "7.14.0", false); + + // Version 7.14, product header + doTestProductCompatibilityCheck(true, "7.14.0", true); + + // Version 8.x, no product header + doTestProductCompatibilityCheck(false, "8.0.0", false); + + // Version 8.x, product header + doTestProductCompatibilityCheck(true, "8.0.0", true); + } + + public void testProductCompatibilityTagline() throws Exception { + + // An endpoint different from "/" that returns a boolean + GetSourceRequest apiRequest = new GetSourceRequest("foo", "bar"); + StatusLine apiStatus = mock(StatusLine.class); + when(apiStatus.getStatusCode()).thenReturn(200); + Response apiResponse = mock(Response.class); + when(apiResponse.getStatusLine()).thenReturn(apiStatus); + when(restClient.performRequest(argThat(new RequestMatcher("HEAD", "/foo/_source/bar")))).thenReturn(apiResponse); + + RestHighLevelClient highLevelClient = new RestHighLevelClient(restClient, RestClient::close, Collections.emptyList()); + + byte[] bytes = ("{" + + " 'cluster_name': '97b2b946a8494276822c3876d78d4f9c', " + + " 'cluster_uuid': 'SUXRYY1fQ5uMKEiykuR5ZA', " + + " 'version': { " + + " 'build_date': '2021-03-18T06:17:15.410153305Z', " + + " 'minimum_wire_compatibility_version': '6.8.0', " + + " 'build_hash': '78722783c38caa25a70982b5b042074cde5d3b3a', " + + " 'number': '7.12.0', " + + " 'lucene_version': '8.8.0', " + + " 'minimum_index_compatibility_version': '6.0.0-beta1', " + + " 'build_flavor': 'default', " + + " 'build_snapshot': false, " + + " 'build_type': 'docker' " + + " }, " + + " 'name': 'instance-0000000000', " + + " 'tagline': 'hello world'" + + "}" + ).replace('\'', '"').getBytes(StandardCharsets.UTF_8); + + mockGetRoot(restClient, bytes, true); + + expectThrows(ElasticsearchException.class, () -> + highLevelClient.existsSource(apiRequest, RequestOptions.DEFAULT) + ); + } + + public void testProductCompatibilityFlavor() throws Exception { + + // An endpoint different from "/" that returns a boolean + GetSourceRequest apiRequest = new GetSourceRequest("foo", "bar"); + StatusLine apiStatus = mock(StatusLine.class); + when(apiStatus.getStatusCode()).thenReturn(200); + Response apiResponse = mock(Response.class); + when(apiResponse.getStatusLine()).thenReturn(apiStatus); + when(restClient.performRequest(argThat(new RequestMatcher("HEAD", "/foo/_source/bar")))).thenReturn(apiResponse); + + RestHighLevelClient highLevelClient = new RestHighLevelClient(restClient, RestClient::close, Collections.emptyList()); + + byte[] + bytes = ("{" + + " 'cluster_name': '97b2b946a8494276822c3876d78d4f9c', " + + " 'cluster_uuid': 'SUXRYY1fQ5uMKEiykuR5ZA', " + + " 'version': { " + + " 'build_date': '2021-03-18T06:17:15.410153305Z', " + + " 'minimum_wire_compatibility_version': '6.8.0', " + + " 'build_hash': '78722783c38caa25a70982b5b042074cde5d3b3a', " + + " 'number': '7.12.0', " + + " 'lucene_version': '8.8.0', " + + " 'minimum_index_compatibility_version': '6.0.0-beta1', " + + // Invalid flavor + " 'build_flavor': 'foo', " + + " 'build_snapshot': false, " + + " 'build_type': 'docker' " + + " }, " + + " 'name': 'instance-0000000000', " + + " 'tagline': 'You Know, for Search'" + + "}" + ).replace('\'', '"').getBytes(StandardCharsets.UTF_8); + + mockGetRoot(restClient, bytes, true); + + expectThrows(ElasticsearchException.class, () -> + highLevelClient.existsSource(apiRequest, RequestOptions.DEFAULT) + ); + } + + public void testProductCompatibilityRequestFailure() throws Exception { + + RestClient restClient = mock(RestClient.class); + + // An endpoint different from "/" that returns a boolean + GetSourceRequest apiRequest = new GetSourceRequest("foo", "bar"); + StatusLine apiStatus = mock(StatusLine.class); + when(apiStatus.getStatusCode()).thenReturn(200); + Response apiResponse = mock(Response.class); + when(apiResponse.getStatusLine()).thenReturn(apiStatus); + when(restClient.performRequest(argThat(new RequestMatcher("HEAD", "/foo/_source/bar")))).thenReturn(apiResponse); + + // Have the verification request fail + when(restClient.performRequestAsync(argThat(new RequestMatcher("GET", "/")), any())) + .thenAnswer(i -> { + ((ResponseListener)i.getArguments()[1]).onFailure(new IOException("Something bad happened")); + return Cancellable.NO_OP; + }); + + RestHighLevelClient highLevelClient = new RestHighLevelClient(restClient, RestClient::close, Collections.emptyList()); + + expectThrows(ElasticsearchException.class, () -> { + highLevelClient.existsSource(apiRequest, RequestOptions.DEFAULT); + }); + + // Now have the validation request succeed + Build build = new Build(Build.Flavor.DEFAULT, Build.Type.UNKNOWN, "hash", "date", false, "7.14.0"); + mockGetRoot(restClient, build, true); + + // API request should now succeed as validation has been retried + assertTrue(highLevelClient.existsSource(apiRequest, RequestOptions.DEFAULT)); + } + + public void testProductCompatibilityWithForbiddenInfoEndpoint() throws Exception { + RestClient restClient = mock(RestClient.class); + + // An endpoint different from "/" that returns a boolean + GetSourceRequest apiRequest = new GetSourceRequest("foo", "bar"); + StatusLine apiStatus = mock(StatusLine.class); + when(apiStatus.getStatusCode()).thenReturn(200); + Response apiResponse = mock(Response.class); + when(apiResponse.getStatusLine()).thenReturn(apiStatus); + when(restClient.performRequest(argThat(new RequestMatcher("HEAD", "/foo/_source/bar")))).thenReturn(apiResponse); + + // Have the info endpoint used for verification return a 403 (forbidden) + when(restClient.performRequestAsync(argThat(new RequestMatcher("GET", "/")), any())) + .thenAnswer(i -> { + StatusLine infoStatus = mock(StatusLine.class); + when(apiStatus.getStatusCode()).thenReturn(HttpStatus.SC_FORBIDDEN); + Response infoResponse = mock(Response.class); + when(apiResponse.getStatusLine()).thenReturn(infoStatus); + ((ResponseListener)i.getArguments()[1]).onSuccess(infoResponse); + return Cancellable.NO_OP; + }); + + RestHighLevelClient highLevelClient = new RestHighLevelClient(restClient, RestClient::close, Collections.emptyList()); + + // API request should succeed + Build build = new Build(Build.Flavor.DEFAULT, Build.Type.UNKNOWN, "hash", "date", false, "7.14.0"); + mockGetRoot(restClient, build, true); + + assertTrue(highLevelClient.existsSource(apiRequest, RequestOptions.DEFAULT)); + } + + public void testCancellationForwarding() throws Exception { + + mockGetRoot(restClient); + Cancellable cancellable = mock(Cancellable.class); + when(restClient.performRequestAsync(argThat(new RequestMatcher("HEAD", "/foo/_source/bar")), any())).thenReturn(cancellable); + + Cancellable result = restHighLevelClient.existsSourceAsync( + new GetSourceRequest("foo", "bar"), + RequestOptions.DEFAULT, ActionListener.wrap(() -> {}) + ); + + result.cancel(); + verify(cancellable, times(1)).cancel(); + } + private static void assertSyncMethod(Method method, String apiName, List booleanReturnMethods) { //A few methods return a boolean rather than a response object if (apiName.equals("ping") || apiName.contains("exist") || booleanReturnMethods.contains(apiName)) { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/test/RequestMatcher.java b/client/rest-high-level/src/test/java/org/elasticsearch/test/RequestMatcher.java new file mode 100644 index 0000000000000..62f217321ba2a --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/test/RequestMatcher.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.test; + +import org.elasticsearch.client.Request; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; + +/** + * A Harmcrest matcher for request method and endpoint + */ +public class RequestMatcher extends BaseMatcher { + + private final String method; + private final String endpoint; + + public RequestMatcher(String method, String endpoint) { + this.method = method; + this.endpoint = endpoint; + } + + @Override + public boolean matches(Object actual) { + if (actual instanceof Request) { + Request req = (Request) actual; + return method.equals(req.getMethod()) && endpoint.equals(req.getEndpoint()); + } + return false; + } + + @Override + public void describeTo(Description description) { + description + .appendText("request to ") + .appendText(method) + .appendText(" ") + .appendText(endpoint); + } +} diff --git a/client/rest/src/main/java/org/elasticsearch/client/Cancellable.java b/client/rest/src/main/java/org/elasticsearch/client/Cancellable.java index 2c60db99f6df5..4fafc4ba124da 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/Cancellable.java +++ b/client/rest/src/main/java/org/elasticsearch/client/Cancellable.java @@ -31,9 +31,19 @@ * Note that cancelling a request does not automatically translate to aborting its execution on the server side, which needs to be * specifically implemented in each API. */ -public class Cancellable { +public abstract class Cancellable { - static final Cancellable NO_OP = new Cancellable(null) { + /** + * Cancels the on-going request that is associated with the current instance of {@link Cancellable}. + */ + public abstract void cancel(); + + /** + * Executes some arbitrary code iff the on-going request has not been cancelled, otherwise throws {@link CancellationException}. + */ + abstract void runIfNotCancelled(Runnable runnable); + + static final Cancellable NO_OP = new Cancellable() { @Override public void cancel() { } @@ -45,40 +55,41 @@ void runIfNotCancelled(Runnable runnable) { }; static Cancellable fromRequest(HttpRequestBase httpRequest) { - return new Cancellable(httpRequest); + return new RequestCancellable(httpRequest); } - private final HttpRequestBase httpRequest; + private static class RequestCancellable extends Cancellable { - private Cancellable(HttpRequestBase httpRequest) { - this.httpRequest = httpRequest; - } + private final HttpRequestBase httpRequest; - /** - * Cancels the on-going request that is associated with the current instance of {@link Cancellable}. - * - */ - public synchronized void cancel() { - this.httpRequest.abort(); - } + private RequestCancellable(HttpRequestBase httpRequest) { + this.httpRequest = httpRequest; + } - /** - * Executes some arbitrary code iff the on-going request has not been cancelled, otherwise throws {@link CancellationException}. - * This is needed to guarantee that cancelling a request works correctly even in case {@link #cancel()} is called between different - * attempts of the same request. The low-level client reuses the same instance of the {@link AbstractExecutionAwareRequest} by calling - * {@link AbstractExecutionAwareRequest#reset()} between subsequent retries. The {@link #cancel()} method can be called at anytime, - * and we need to handle the case where it gets called while there is no request being executed as one attempt may have failed and - * the subsequent attempt has not been started yet. - * If the request has already been cancelled we don't go ahead with the next attempt, and artificially raise the - * {@link CancellationException}, otherwise we run the provided {@link Runnable} which will reset the request and send the next attempt. - * Note that this method must be synchronized as well as the {@link #cancel()} method, to prevent a request from being cancelled - * when there is no future to cancel, which would make cancelling the request a no-op. - */ - synchronized void runIfNotCancelled(Runnable runnable) { - if (this.httpRequest.isAborted()) { - throw newCancellationException(); + public synchronized void cancel() { + this.httpRequest.abort(); + } + + /** + * Executes some arbitrary code iff the on-going request has not been cancelled, otherwise throws {@link CancellationException}. + * This is needed to guarantee that cancelling a request works correctly even in case {@link #cancel()} is called between different + * attempts of the same request. The low-level client reuses the same instance of the {@link AbstractExecutionAwareRequest} by + * calling + * {@link AbstractExecutionAwareRequest#reset()} between subsequent retries. The {@link #cancel()} method can be called at anytime, + * and we need to handle the case where it gets called while there is no request being executed as one attempt may have failed and + * the subsequent attempt has not been started yet. + * If the request has already been cancelled we don't go ahead with the next attempt, and artificially raise the + * {@link CancellationException}, otherwise we run the provided {@link Runnable} which will reset the request and send the next + * attempt. + * Note that this method must be synchronized as well as the {@link #cancel()} method, to prevent a request from being cancelled + * when there is no future to cancel, which would make cancelling the request a no-op. + */ + synchronized void runIfNotCancelled(Runnable runnable) { + if (this.httpRequest.isAborted()) { + throw newCancellationException(); + } + runnable.run(); } - runnable.run(); } static CancellationException newCancellationException() { From 1345a05c06f71c351c57ae7a62cd6d710dedd1a2 Mon Sep 17 00:00:00 2001 From: Sylvain Wallez Date: Thu, 17 Jun 2021 23:59:36 +0200 Subject: [PATCH 2/2] Fix Java 8 compatibility --- .../java/org/elasticsearch/client/RestHighLevelClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index a44d6e83aab1f..02e21267d3cf0 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -2043,7 +2043,7 @@ void runIfNotCancelled(Runnable runnable) { versionCheck.addListener(new ActionListener>() { @Override public void onResponse(Optional validation) { - if (validation.isEmpty()) { + if (validation.isPresent() == false) { // Send the request and propagate cancellation Cancellable call = client.performRequestAsync(request, listener); cancellationForwarder.whenComplete((r, t) -> @@ -2078,7 +2078,7 @@ private Response performClientRequest(Request request) throws IOException { throw new ElasticsearchException(e); } - if (versionValidation.isEmpty()) { + if (versionValidation.isPresent() == false) { return client.performRequest(request); } else { throw new ElasticsearchException(versionValidation.get());