diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 2093139e115a3..d015643c1179d 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.azure; +import com.microsoft.azure.storage.Constants; import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.StorageException; import org.apache.logging.log4j.LogManager; @@ -68,10 +69,8 @@ private boolean blobExists(String blobName) { return false; } - @Override - public InputStream readBlob(String blobName) throws IOException { - logger.trace("readBlob({})", blobName); - + private InputStream openInputStream(String blobName, long position, @Nullable Long length) throws IOException { + logger.trace("readBlob({}) from position [{}] with length [{}]", blobName, position, length != null ? length : "unlimited"); if (blobStore.getLocationMode() == LocationMode.SECONDARY_ONLY && !blobExists(blobName)) { // On Azure, if the location path is a secondary location, and the blob does not // exist, instead of returning immediately from the getInputStream call below @@ -81,9 +80,8 @@ public InputStream readBlob(String blobName) throws IOException { // stream to it. throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } - try { - return blobStore.getInputStream(buildKey(blobName)); + return blobStore.getInputStream(buildKey(blobName), position, length); } catch (StorageException e) { if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new NoSuchFileException(e.getMessage()); @@ -94,6 +92,21 @@ public InputStream readBlob(String blobName) throws IOException { } } + @Override + public InputStream readBlob(String blobName) throws IOException { + return openInputStream(blobName, 0L, null); + } + + @Override + public InputStream readBlob(String blobName, long position, long length) throws IOException { + return openInputStream(blobName, position, length); + } + + @Override + public long readBlobPreferredLength() { + return Constants.DEFAULT_MINIMUM_READ_SIZE_IN_BYTES; + } + @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 714e29edea29d..f1385e3fe43c2 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -22,6 +22,7 @@ import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.StorageException; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; @@ -100,8 +101,8 @@ public DeleteResult deleteBlobDirectory(String path, Executor executor) return service.deleteBlobDirectory(clientName, container, path, executor); } - public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException { - return service.getInputStream(clientName, container, blob); + public InputStream getInputStream(String blob, long position, @Nullable Long length) throws URISyntaxException, StorageException { + return service.getInputStream(clientName, container, blob, position, length); } public Map listBlobsByPrefix(String keyPath, String prefix) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 26ade5bdec624..3207b28977e0c 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -42,6 +42,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; @@ -256,13 +257,13 @@ public void onAfter() { return new DeleteResult(blobsDeleted.get(), bytesDeleted.get()); } - public InputStream getInputStream(String account, String container, String blob) - throws URISyntaxException, StorageException, IOException { + public InputStream getInputStream(String account, String container, String blob, long position, @Nullable Long length) + throws URISyntaxException, StorageException { final Tuple> client = client(account); final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob); logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob)); final BlobInputStream is = SocketAccess.doPrivilegedException(() -> - blockBlobReference.openInputStream(null, null, client.v2().get())); + blockBlobReference.openInputStream(position, length, null, null, client.v2().get())); return giveSocketPermissionsToStream(is); } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index ce3cba065c35b..7ac1b6050139b 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -25,12 +25,14 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; import fixture.azure.AzureHttpHandler; +import org.apache.http.HttpStatus; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; @@ -63,6 +65,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -81,8 +84,10 @@ import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; /** * This class tests how a {@link AzureBlobContainer} and its underlying SDK client are retrying requests when reading or writing blobs. @@ -90,6 +95,8 @@ @SuppressForbidden(reason = "use a http server") public class AzureBlobContainerRetriesTests extends ESTestCase { + private static final long MAX_RANGE_VAL = Long.MAX_VALUE - 1L; + private HttpServer httpServer; private ThreadPool threadPool; @@ -128,7 +135,7 @@ private BlobContainer createBlobContainer(final int maxRetries) { final AzureStorageService service = new AzureStorageService(clientSettings.build()) { @Override RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) { - return new RetryExponentialRetry(1, 100, 500, azureStorageSettings.getMaxRetries()); + return new RetryExponentialRetry(1, 10, 100, azureStorageSettings.getMaxRetries()); } @Override @@ -150,7 +157,16 @@ BlobRequestOptions getBlobRequestOptionsForWriteBlob() { public void testReadNonexistentBlobThrowsNoSuchFileException() { final BlobContainer blobContainer = createBlobContainer(between(1, 5)); - final Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob")); + final Exception exception = expectThrows(NoSuchFileException.class, + () -> { + if (randomBoolean()) { + blobContainer.readBlob("read_nonexistent_blob"); + } else { + final long position = randomLongBetween(0, MAX_RANGE_VAL - 1L); + final long length = randomLongBetween(1, MAX_RANGE_VAL - position); + blobContainer.readBlob("read_nonexistent_blob", position, length); + } + }); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("not found")); } @@ -160,34 +176,35 @@ public void testReadBlobWithRetries() throws Exception { final CountDown countDownGet = new CountDown(maxRetries); final byte[] bytes = randomBlobContent(); httpServer.createContext("/container/read_blob_max_retries", exchange -> { - Streams.readFully(exchange.getRequestBody()); - if ("HEAD".equals(exchange.getRequestMethod())) { - if (countDownHead.countDown()) { - exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); - exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(bytes.length)); - exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); - exchange.close(); - return; + try { + Streams.readFully(exchange.getRequestBody()); + if ("HEAD".equals(exchange.getRequestMethod())) { + if (countDownHead.countDown()) { + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(bytes.length)); + exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + return; + } + } else if ("GET".equals(exchange.getRequestMethod())) { + if (countDownGet.countDown()) { + final int rangeStart = getRangeStart(exchange); + assertThat(rangeStart, lessThan(bytes.length)); + final int length = bytes.length - rangeStart; + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length)); + exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length); + exchange.getResponseBody().write(bytes, rangeStart, length); + return; + } } - } else if ("GET".equals(exchange.getRequestMethod())) { - if (countDownGet.countDown()) { - final int rangeStart = getRangeStart(exchange); - assertThat(rangeStart, lessThan(bytes.length)); - final int length = bytes.length - rangeStart; - exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); - exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length)); - exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length); - exchange.getResponseBody().write(bytes, rangeStart, length); - exchange.close(); - return; + if (randomBoolean()) { + AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); } + } finally { + exchange.close(); } - if (randomBoolean()) { - AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); - } - exchange.close(); }); final BlobContainer blobContainer = createBlobContainer(maxRetries); @@ -198,6 +215,58 @@ public void testReadBlobWithRetries() throws Exception { } } + public void testReadRangeBlobWithRetries() throws Exception { + final int maxRetries = randomIntBetween(1, 5); + final CountDown countDownHead = new CountDown(maxRetries); + final CountDown countDownGet = new CountDown(maxRetries); + final byte[] bytes = randomBlobContent(); + httpServer.createContext("/container/read_range_blob_max_retries", exchange -> { + try { + Streams.readFully(exchange.getRequestBody()); + if ("HEAD".equals(exchange.getRequestMethod())) { + if (countDownHead.countDown()) { + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(bytes.length)); + exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + return; + } + } else if ("GET".equals(exchange.getRequestMethod())) { + if (countDownGet.countDown()) { + final int rangeStart = getRangeStart(exchange); + assertThat(rangeStart, lessThan(bytes.length)); + final Optional rangeEnd = getRangeEnd(exchange); + assertThat(rangeEnd.isPresent(), is(true)); + assertThat(rangeEnd.get(), greaterThanOrEqualTo(rangeStart)); + final int length = (rangeEnd.get() - rangeStart) + 1; + assertThat(length, lessThanOrEqualTo(bytes.length - rangeStart)); + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length)); + exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length); + exchange.getResponseBody().write(bytes, rangeStart, length); + return; + } + } + if (randomBoolean()) { + AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); + } + } finally { + exchange.close(); + } + }); + + final BlobContainer blobContainer = createBlobContainer(maxRetries); + final int position = randomIntBetween(0, bytes.length - 1); + final int length = randomIntBetween(1, bytes.length - position); + try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) { + final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream)); + assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead); + assertThat(countDownHead.isCountedDown(), is(true)); + assertThat(countDownGet.isCountedDown(), is(true)); + } + } + public void testWriteBlobWithRetries() throws Exception { final int maxRetries = randomIntBetween(1, 5); final CountDown countDown = new CountDown(maxRetries); @@ -339,14 +408,56 @@ private static byte[] randomBlobContent() { return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb } - private static int getRangeStart(final HttpExchange exchange) { + private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$"); + + private static Tuple getRanges(HttpExchange exchange) { final String rangeHeader = exchange.getRequestHeaders().getFirst("X-ms-range"); if (rangeHeader == null) { - return 0; + return Tuple.tuple(0L, MAX_RANGE_VAL); } - final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(rangeHeader); + final Matcher matcher = RANGE_PATTERN.matcher(rangeHeader); assertTrue(rangeHeader + " matches expected pattern", matcher.matches()); - return Math.toIntExact(Long.parseLong(matcher.group(1))); + final long rangeStart = Long.parseLong(matcher.group(1)); + final long rangeEnd = Long.parseLong(matcher.group(2)); + assertThat(rangeStart, lessThanOrEqualTo(rangeEnd)); + return Tuple.tuple(rangeStart, rangeEnd); + } + + private static int getRangeStart(HttpExchange exchange) { + return Math.toIntExact(getRanges(exchange).v1()); + } + + private static Optional getRangeEnd(HttpExchange exchange) { + final long rangeEnd = getRanges(exchange).v2(); + if (rangeEnd == MAX_RANGE_VAL) { + return Optional.empty(); + } + return Optional.of(Math.toIntExact(rangeEnd)); + } + + private static void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException { + final int rangeStart = getRangeStart(exchange); + assertThat(rangeStart, lessThan(bytes.length)); + final Optional rangeEnd = getRangeEnd(exchange); + final int length; + if (rangeEnd.isPresent()) { + // adapt range end to be compliant to https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 + final int effectiveRangeEnd = Math.min(rangeEnd.get(), bytes.length - 1); + length = effectiveRangeEnd - rangeStart; + } else { + length = bytes.length - rangeStart - 1; + } + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length)); + exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, length); + final int bytesToSend = randomIntBetween(0, length - 1); + if (bytesToSend > 0) { + exchange.getResponseBody().write(bytes, rangeStart, bytesToSend); + } + if (randomBoolean()) { + exchange.getResponseBody().flush(); + } } } diff --git a/test/fixtures/azure-fixture/docker-compose.yml b/test/fixtures/azure-fixture/docker-compose.yml index ff328c52f3428..61ea9d28a560a 100644 --- a/test/fixtures/azure-fixture/docker-compose.yml +++ b/test/fixtures/azure-fixture/docker-compose.yml @@ -8,3 +8,12 @@ services: - ./testfixtures_shared/shared:/fixture/shared ports: - "8091" + + azure-fixture-other: + build: + context: . + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "8091" diff --git a/x-pack/plugin/searchable-snapshots/qa/azure/build.gradle b/x-pack/plugin/searchable-snapshots/qa/azure/build.gradle new file mode 100644 index 0000000000000..2b5ccfbe220f0 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/qa/azure/build.gradle @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE + +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' + +final Project fixture = project(':test:fixtures:azure-fixture') +final Project repositoryPlugin = project(':plugins:repository-azure') + +dependencies { + testCompile project(path: xpackModule('searchable-snapshots'), configuration: 'testArtifacts') + testCompile repositoryPlugin +} + +restResources { + restApi { + includeCore 'indices', 'search', 'bulk', 'snapshot', 'nodes', '_common' + includeXpack 'searchable_snapshots' + } +} + +boolean useFixture = false +String azureAccount = System.getenv("azure_storage_account") +String azureKey = System.getenv("azure_storage_key") +String azureContainer = System.getenv("azure_storage_container") +String azureBasePath = System.getenv("azure_storage_base_path") +String azureSasToken = System.getenv("azure_storage_sas_token") + +if (!azureAccount && !azureKey && !azureContainer && !azureBasePath && !azureSasToken) { + azureAccount = 'azure_integration_test_account' + azureKey = 'YXp1cmVfaW50ZWdyYXRpb25fdGVzdF9rZXk=' // The key is "azure_integration_test_key" encoded using base64 + azureContainer = 'container' + azureBasePath = '' + azureSasToken = '' + useFixture = true + +} else if (!azureAccount || !azureKey || !azureContainer || !azureBasePath || !azureSasToken) { + throw new IllegalArgumentException("not all options specified to run against external Azure service are present") +} + +if (useFixture) { + apply plugin: 'elasticsearch.test.fixtures' + testFixtures.useFixture(fixture.path, 'azure-fixture-other') +} + +integTest { + dependsOn repositoryPlugin.bundlePlugin + runner { + systemProperty 'test.azure.container', azureContainer + systemProperty 'test.azure.base_path', azureBasePath + "/searchable_snapshots_tests" + } +} + +testClusters.integTest { + testDistribution = 'DEFAULT' + plugin file(repositoryPlugin.bundlePlugin.archiveFile) + + keystore 'azure.client.searchable_snapshots.account', azureAccount + keystore 'azure.client.searchable_snapshots.key', azureKey + keystore 'azure.client.searchable_snapshots.sas_token', azureSasToken + + if (useFixture) { + def fixtureAddress = { fixtureName -> + assert useFixture: 'closure should not be used without a fixture' + int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.${fixtureName}.tcp.8091" + assert ephemeralPort > 0 + '127.0.0.1:' + ephemeralPort + } + setting 'azure.client.searchable_snapshots.endpoint_suffix', + { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${-> fixtureAddress('azure-fixture-other')}" }, IGNORE_VALUE + + } else { + println "Using an external service to test " + project.name + } +} + diff --git a/x-pack/plugin/searchable-snapshots/qa/azure/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java b/x-pack/plugin/searchable-snapshots/qa/azure/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java new file mode 100644 index 0000000000000..dc646c0e9e132 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/qa/azure/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AzureSearchableSnapshotsIT.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots; + +import org.elasticsearch.common.settings.Settings; + +import static org.hamcrest.Matchers.blankOrNullString; +import static org.hamcrest.Matchers.not; + +public class AzureSearchableSnapshotsIT extends AbstractSearchableSnapshotsRestTestCase { + + @Override + protected String repositoryType() { + return "azure"; + } + + @Override + protected Settings repositorySettings() { + final String container = System.getProperty("test.azure.container"); + assertThat(container, not(blankOrNullString())); + + final String basePath = System.getProperty("test.azure.base_path"); + assertThat(basePath, not(blankOrNullString())); + + return Settings.builder() + .put("client", "searchable_snapshots") + .put("container", container) + .put("base_path", basePath) + .build(); + } +} +