diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index 0d8e6806a43e3..db0b92a894053 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -8,6 +8,7 @@ package org.elasticsearch.common.blobstore.url; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; @@ -20,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URL; import java.nio.file.NoSuchFileException; import java.security.AccessController; @@ -121,6 +123,14 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b throw new UnsupportedOperationException("URL repository doesn't support this operation"); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + throw new UnsupportedOperationException("URL repository doesn't support this operation"); + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { throw new UnsupportedOperationException("URL repository doesn't support this operation"); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 6365b248470b2..005c1027c9310 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.util.Throwables; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; @@ -22,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.NoSuchFileException; import java.util.Iterator; import java.util.Map; @@ -99,6 +101,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer); + } + @Override public DeleteResult delete() throws IOException { return blobStore.deleteBlobDirectory(keyPath); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index e7683fb959e25..35482bbe93fa0 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -33,6 +33,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; @@ -46,6 +48,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.repositories.azure.AzureRepository.Repository; +import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -53,6 +56,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; @@ -82,6 +86,8 @@ public class AzureBlobStore implements BlobStore { private final AzureStorageService service; + private final BigArrays bigArrays; + private final String clientName; private final String container; private final LocationMode locationMode; @@ -90,10 +96,11 @@ public class AzureBlobStore implements BlobStore { private final Stats stats = new Stats(); private final BiConsumer statsConsumer; - public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service) { + public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, BigArrays bigArrays) { this.container = Repository.CONTAINER_SETTING.get(metadata.settings()); this.clientName = Repository.CLIENT_NAME.get(metadata.settings()); this.service = service; + this.bigArrays = bigArrays; // locationMode is set per repository, not per client this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings()); this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings()); @@ -383,6 +390,49 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea executeSingleUpload(blobName, byteBufferFlux, bytes.length(), failIfAlreadyExists); } + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + CheckedConsumer writer) throws IOException { + final BlockBlobAsyncClient blockBlobAsyncClient = asyncClient().getBlobContainerAsyncClient(container) + .getBlobAsyncClient(blobName).getBlockBlobAsyncClient(); + try (ChunkedBlobOutputStream out = new ChunkedBlobOutputStream<>(bigArrays, getUploadBlockSize()) { + + @Override + protected void flushBuffer() { + if (buffer.size() == 0) { + return; + } + final String blockId = makeMultipartBlockId(); + SocketAccess.doPrivilegedVoidException(() -> blockBlobAsyncClient.stageBlock( + blockId, + Flux.fromArray(BytesReference.toByteBuffers(buffer.bytes())), + buffer.size() + ).block()); + finishPart(blockId); + } + + @Override + protected void onCompletion() { + if (flushedBytes == 0L) { + writeBlob(blobName, buffer.bytes(), failIfAlreadyExists); + } else { + flushBuffer(); + SocketAccess.doPrivilegedVoidException( + () -> blockBlobAsyncClient.commitBlockList(parts, failIfAlreadyExists == false).block()); + } + } + + @Override + protected void onFailure() { + // Nothing to do here, already uploaded blocks will be GCed by Azure after a week. + // see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks + } + }) { + writer.accept(out); + out.markSuccess(); + } + } + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { assert inputStream.markSupported() : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken"; @@ -439,13 +489,11 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes"; final List blockIds = new ArrayList<>(nbParts); - final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding(); - final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder(); for (int i = 0; i < nbParts; i++) { final long length = i < nbParts - 1 ? partSize : lastPartSize; Flux byteBufferFlux = convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE); - final String blockId = base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID())); + final String blockId = makeMultipartBlockId(); blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block(); blockIds.add(blockId); } @@ -454,6 +502,13 @@ private void executeMultipartUpload(String blobName, InputStream inputStream, lo }); } + private static final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding(); + private static final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder(); + + private String makeMultipartBlockId() { + return base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID())); + } + /** * Converts the provided input stream into a Flux of ByteBuffer. To avoid having large amounts of outstanding * memory this Flux reads the InputStream into ByteBuffers of {@code chunkSize} size. diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index d40f76b7bb138..777d0c14ac50a 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -124,7 +124,7 @@ protected BlobStore getBlobStore() { @Override protected AzureBlobStore createBlobStore() { - final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService); + final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, bigArrays); logger.debug(() -> new ParameterizedMessage( "using container [{}], chunk_size [{}], compress [{}], base_path [{}]", diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index 29dde60d590e7..0909fc7a5e237 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -15,6 +15,7 @@ import fixture.azure.AzureHttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; @@ -58,6 +59,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -172,7 +174,7 @@ int getMaxReadRetries(String clientName) { .put(MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB)) .build()); - return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service)); + return new AzureBlobContainer(BlobPath.EMPTY, new AzureBlobStore(repositoryMetadata, service, BigArrays.NON_RECYCLING_INSTANCE)); } public void testReadNonexistentBlobThrowsNoSuchFileException() { @@ -391,6 +393,82 @@ public void testWriteLargeBlob() throws Exception { assertThat(blocks.isEmpty(), is(true)); } + public void testWriteLargeBlobStreaming() throws Exception { + final int maxRetries = randomIntBetween(2, 5); + + final int blobSize = (int) ByteSizeUnit.MB.toBytes(10); + final byte[] data = randomBytes(blobSize); + + final int nbErrors = 2; // we want all requests to fail at least once + final AtomicInteger counterUploads = new AtomicInteger(0); + final AtomicLong bytesReceived = new AtomicLong(0L); + final CountDown countDownComplete = new CountDown(nbErrors); + + final Map blocks = new ConcurrentHashMap<>(); + httpServer.createContext("/account/container/write_large_blob_streaming", exchange -> { + + if ("PUT".equals(exchange.getRequestMethod())) { + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getRawQuery(), 0, params); + + final String blockId = params.get("blockid"); + assert Strings.hasText(blockId) == false || AzureFixtureHelper.assertValidBlockId(blockId); + + if (Strings.hasText(blockId) && (counterUploads.incrementAndGet() % 2 == 0)) { + final BytesReference blockData = Streams.readFully(exchange.getRequestBody()); + blocks.put(blockId, blockData); + bytesReceived.addAndGet(blockData.length()); + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + exchange.close(); + return; + } + + final String complete = params.get("comp"); + if ("blocklist".equals(complete) && (countDownComplete.countDown())) { + final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8)); + final List blockUids = Arrays.stream(blockList.split("")) + .filter(line -> line.contains("")) + .map(line -> line.substring(0, line.indexOf(""))) + .collect(Collectors.toList()); + + final ByteArrayOutputStream blob = new ByteArrayOutputStream(); + for (String blockUid : blockUids) { + BytesReference block = blocks.remove(blockUid); + assert block != null; + block.writeTo(blob); + } + assertArrayEquals(data, blob.toByteArray()); + exchange.getResponseHeaders().add("x-ms-request-server-encrypted", "false"); + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + exchange.close(); + return; + } + } + + if (randomBoolean()) { + Streams.readFully(exchange.getRequestBody()); + AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); + } + exchange.close(); + }); + + final BlobContainer blobContainer = createBlobContainer(maxRetries); + blobContainer.writeBlob("write_large_blob_streaming", false, randomBoolean(), out -> { + int outstanding = data.length; + while (outstanding > 0) { + if (randomBoolean()) { + int toWrite = Math.toIntExact(Math.min(randomIntBetween(64, data.length), outstanding)); + out.write(data, data.length - outstanding, toWrite); + outstanding -= toWrite; + } else { + out.write(data[data.length - outstanding]); + outstanding--; + } + } + }); + assertEquals(blobSize, bytesReceived.get()); + } + public void testRetryUntilFail() throws Exception { final int maxRetries = randomIntBetween(2, 5); final AtomicInteger requestsReceived = new AtomicInteger(0); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 74c697a167189..ce2eae99b59f3 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -15,9 +15,11 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.core.CheckedConsumer; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Iterator; import java.util.Map; @@ -76,6 +78,14 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer); + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { writeBlob(blobName, bytes, failIfAlreadyExists); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index a49a17699ffe4..3a546cdd0f13e 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; @@ -37,8 +38,10 @@ import org.elasticsearch.common.unit.ByteSizeValue; import java.io.ByteArrayInputStream; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; @@ -265,6 +268,52 @@ long getLargeBlobThresholdInBytes() { {Storage.BlobWriteOption.doesNotExist(), Storage.BlobWriteOption.md5Match()}; private static final Storage.BlobWriteOption[] OVERWRITE_CHECK_MD5 = {Storage.BlobWriteOption.md5Match()}; + void writeBlob(String blobName, boolean failIfAlreadyExists, CheckedConsumer writer) throws IOException { + final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build(); + final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? NO_OVERWRITE_NO_MD5 : OVERWRITE_NO_MD5; + + StorageException storageException = null; + + for (int retry = 0; retry < 3; ++retry) { + try { + final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions)); + try (OutputStream out = new FilterOutputStream(Channels.newOutputStream(new WritableBlobChannel(writeChannel))) { + @Override + public void write(byte[] b, int off, int len) throws IOException { + int written = 0; + while (written < len) { + // at most write the default chunk size in one go to prevent allocating huge buffers in the SDK + // see com.google.cloud.BaseWriteChannel#DEFAULT_CHUNK_SIZE + final int toWrite = Math.min(len - written, 60 * 256 * 1024); + out.write(b, off + written, toWrite); + written += toWrite; + } + } + }) { + writer.accept(out); + SocketAccess.doPrivilegedVoidIOException(writeChannel::close); + } + stats.trackPutOperation(); + return; + } catch (final StorageException se) { + final int errorCode = se.getCode(); + if (errorCode == HTTP_GONE) { + logger.warn(() -> new ParameterizedMessage("Retrying broken resumable upload session for blob {}", blobInfo), se); + storageException = ExceptionsHelper.useOrSuppress(storageException, se); + continue; + } else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) { + throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); + } + if (storageException != null) { + se.addSuppressed(storageException); + } + throw se; + } + } + assert storageException != null; + throw storageException; + } + /** * Uploads a blob using the "resumable upload" method (multiple requests, which * can be independently retried in case of failure, see @@ -297,24 +346,9 @@ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, long * It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy * is in the stacktrace and is not granted the permissions needed to close and write the channel. */ - org.elasticsearch.core.internal.io.Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { - - @SuppressForbidden(reason = "channel is based on a socket") - @Override - public int write(final ByteBuffer src) throws IOException { - return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src)); - } - - @Override - public boolean isOpen() { - return writeChannel.isOpen(); - } - - @Override - public void close() throws IOException { - SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - } - }), buffer); + org.elasticsearch.core.internal.io.Streams.copy( + inputStream, Channels.newOutputStream(new WritableBlobChannel(writeChannel)), buffer); + SocketAccess.doPrivilegedVoidIOException(writeChannel::close); // We don't track this operation on the http layer as // we do with the GET/LIST operations since this operations // can trigger multiple underlying http requests but only one @@ -478,4 +512,29 @@ private static String buildKey(String keyPath, String s) { public Map stats() { return stats.toMap(); } + + private static final class WritableBlobChannel implements WritableByteChannel { + + private final WriteChannel channel; + + WritableBlobChannel(WriteChannel writeChannel) { + this.channel = writeChannel; + } + + @SuppressForbidden(reason = "channel is based on a socket") + @Override + public int write(final ByteBuffer src) throws IOException { + return SocketAccess.doPrivilegedIOException(() -> channel.write(src)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() { + // we manually close the channel later to have control over whether or not we want to finalize a blob + } + } } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 1848e9abb1189..6e2c149793f32 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -343,8 +343,12 @@ public void testWriteLargeBlob() throws IOException { final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null; final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null); - try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) { - blobContainer.writeBlob("write_large_blob", stream, data.length, false); + if (randomBoolean()) { + try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) { + blobContainer.writeBlob("write_large_blob", stream, data.length, false); + } + } else { + blobContainer.writeBlob("write_large_blob", false, randomBoolean(), out -> out.write(data)); } assertThat(countInits.get(), equalTo(0)); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 52e951e258c86..b60b0a969429d 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -15,6 +15,7 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; @@ -31,6 +32,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.Collections; @@ -156,6 +158,38 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea }); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + Path blob = new Path(path, blobName); + if (atomic) { + final Path tempBlobPath = new Path(path, FsBlobContainer.tempBlobName(blobName)); + store.execute((Operation) fileContext -> { + try (FSDataOutputStream stream = fileContext.create(tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK))) { + writer.accept(stream); + fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE); + } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) { + throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage()); + } + return null; + }); + } else { + // we pass CREATE, which means it fails if a blob already exists. + final EnumSet flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) + : EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK); + store.execute((Operation) fileContext -> { + try (FSDataOutputStream stream = fileContext.create(blob, flags)) { + writer.accept(stream); + } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) { + throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage()); + } + return null; + }); + } + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final String tempBlob = FsBlobContainer.tempBlobName(blobName); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 6c096c835d132..c5d9cc65531a9 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; @@ -41,10 +42,12 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -129,6 +132,105 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b }); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + try (AmazonS3Reference clientReference = blobStore.clientReference(); + ChunkedBlobOutputStream out = new ChunkedBlobOutputStream<>(blobStore.bigArrays(), blobStore.bufferSizeInBytes()) { + + private final SetOnce uploadId = new SetOnce<>(); + + @Override + protected void flushBuffer() throws IOException { + flushBuffer(false); + } + + private void flushBuffer(boolean lastPart) throws IOException { + if (buffer.size() == 0) { + return; + } + if (flushedBytes == 0L) { + assert lastPart == false : "use single part upload if there's only a single part"; + uploadId.set(SocketAccess.doPrivileged(() -> + clientReference.client().initiateMultipartUpload(initiateMultiPartUpload(blobName)).getUploadId())); + if (Strings.isEmpty(uploadId.get())) { + throw new IOException("Failed to initialize multipart upload " + blobName); + } + } + assert lastPart == false || successful : "must only write last part if successful"; + final UploadPartRequest uploadRequest = createPartUploadRequest( + buffer.bytes().streamInput(), uploadId.get(), parts.size() + 1, blobName, buffer.size(), lastPart); + final UploadPartResult uploadResponse = + SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest)); + finishPart(uploadResponse.getPartETag()); + } + + @Override + protected void onCompletion() throws IOException { + if (flushedBytes == 0L) { + writeBlob(blobName, buffer.bytes(), failIfAlreadyExists); + } else { + flushBuffer(true); + final CompleteMultipartUploadRequest complRequest = + new CompleteMultipartUploadRequest(blobStore.bucket(), blobName, uploadId.get(), parts); + complRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); + SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest)); + } + } + + @Override + protected void onFailure() { + if (Strings.hasText(uploadId.get())) { + abortMultiPartUpload(uploadId.get(), blobName); + } + } + }) { + writer.accept(out); + out.markSuccess(); + } + } + + private UploadPartRequest createPartUploadRequest(InputStream stream, + String uploadId, + int number, + String blobName, + long size, + boolean lastPart) { + final UploadPartRequest uploadRequest = new UploadPartRequest(); + uploadRequest.setBucketName(blobStore.bucket()); + uploadRequest.setKey(blobName); + uploadRequest.setUploadId(uploadId); + uploadRequest.setPartNumber(number); + uploadRequest.setInputStream(stream); + uploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); + uploadRequest.setPartSize(size); + uploadRequest.setLastPart(lastPart); + return uploadRequest; + } + + private void abortMultiPartUpload(String uploadId, String blobName) { + final AbortMultipartUploadRequest abortRequest = + new AbortMultipartUploadRequest(blobStore.bucket(), blobName, uploadId); + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest)); + } + } + + private InitiateMultipartUploadRequest initiateMultiPartUpload(String blobName) { + final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(blobStore.bucket(), blobName); + initRequest.setStorageClass(blobStore.getStorageClass()); + initRequest.setCannedACL(blobStore.getCannedACL()); + initRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); + if (blobStore.serverSideEncryption()) { + final ObjectMetadata md = new ObjectMetadata(); + md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + initRequest.setObjectMetadata(md); + } + return initRequest; + } + // package private for testing long getLargeBlobThresholdInBytes() { return blobStore.bufferSizeInBytes(); @@ -389,19 +491,10 @@ void executeMultipartUpload(final S3BlobStore blobStore, final SetOnce uploadId = new SetOnce<>(); final String bucketName = blobStore.bucket(); boolean success = false; - - final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, blobName); - initRequest.setStorageClass(blobStore.getStorageClass()); - initRequest.setCannedACL(blobStore.getCannedACL()); - initRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); - if (blobStore.serverSideEncryption()) { - final ObjectMetadata md = new ObjectMetadata(); - md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); - initRequest.setObjectMetadata(md); - } try (AmazonS3Reference clientReference = blobStore.clientReference()) { - uploadId.set(SocketAccess.doPrivileged(() -> clientReference.client().initiateMultipartUpload(initRequest).getUploadId())); + uploadId.set(SocketAccess.doPrivileged(() -> + clientReference.client().initiateMultipartUpload(initiateMultiPartUpload(blobName)).getUploadId())); if (Strings.isEmpty(uploadId.get())) { throw new IOException("Failed to initialize multipart upload " + blobName); } @@ -410,21 +503,9 @@ void executeMultipartUpload(final S3BlobStore blobStore, long bytesCount = 0; for (int i = 1; i <= nbParts; i++) { - final UploadPartRequest uploadRequest = new UploadPartRequest(); - uploadRequest.setBucketName(bucketName); - uploadRequest.setKey(blobName); - uploadRequest.setUploadId(uploadId.get()); - uploadRequest.setPartNumber(i); - uploadRequest.setInputStream(input); - uploadRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector); - - if (i < nbParts) { - uploadRequest.setPartSize(partSize); - uploadRequest.setLastPart(false); - } else { - uploadRequest.setPartSize(lastPartSize); - uploadRequest.setLastPart(true); - } + final boolean lastPart = i == nbParts; + final UploadPartRequest uploadRequest = + createPartUploadRequest(input, uploadId.get(), i, blobName, lastPart ? lastPartSize : partSize, lastPart); bytesCount += uploadRequest.getPartSize(); final UploadPartResult uploadResponse = SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest)); @@ -446,10 +527,7 @@ void executeMultipartUpload(final S3BlobStore blobStore, throw new IOException("Unable to upload object [" + blobName + "] using multipart upload", e); } finally { if ((success == false) && Strings.hasLength(uploadId.get())) { - final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(bucketName, blobName, uploadId.get()); - try (AmazonS3Reference clientReference = blobStore.clientReference()) { - SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest)); - } + abortMultiPartUpload(uploadId.get(), blobName); } } } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index a3279c1ef4976..90a86c4910ba5 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStoreException; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; import java.io.IOException; import java.util.HashMap; @@ -35,6 +36,8 @@ class S3BlobStore implements BlobStore { private final S3Service service; + private final BigArrays bigArrays; + private final String bucket; private final ByteSizeValue bufferSize; @@ -56,8 +59,9 @@ class S3BlobStore implements BlobStore { S3BlobStore(S3Service service, String bucket, boolean serverSideEncryption, ByteSizeValue bufferSize, String cannedACL, String storageClass, - RepositoryMetadata repositoryMetadata) { + RepositoryMetadata repositoryMetadata, BigArrays bigArrays) { this.service = service; + this.bigArrays = bigArrays; this.bucket = bucket; this.serverSideEncryption = serverSideEncryption; this.bufferSize = bufferSize; @@ -136,6 +140,10 @@ public String bucket() { return bucket; } + public BigArrays bigArrays() { + return bigArrays; + } + public boolean serverSideEncryption() { return serverSideEncryption; } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index c5324f8572648..5df64cc38ca0f 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -308,7 +308,7 @@ private static BlobPath buildBasePath(RepositoryMetadata metadata) { @Override protected S3BlobStore createBlobStore() { - return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, metadata); + return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, metadata, bigArrays); } // only use for testing diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 5505cd592a8e6..fb26d06a8a3ce 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -12,6 +12,7 @@ import com.amazonaws.util.Base16; import org.apache.http.HttpStatus; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; @@ -42,6 +43,7 @@ import java.util.Locale; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING; import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING; @@ -126,7 +128,7 @@ protected BlobContainer createBlobContainer(final @Nullable Integer maxRetries, bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize, S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY), S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY), - repositoryMetadata)) { + repositoryMetadata, BigArrays.NON_RECYCLING_INSTANCE)) { @Override public InputStream readBlob(String blobName) throws IOException { return new AssertingInputStream(super.readBlob(blobName), blobName); @@ -296,6 +298,105 @@ public void testWriteLargeBlob() throws Exception { assertThat(countDownComplete.isCountedDown(), is(true)); } + public void testWriteLargeBlobStreaming() throws Exception { + final boolean useTimeout = rarely(); + final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; + final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB); + final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize); + + final int parts = randomIntBetween(1, 5); + final long lastPartSize = randomLongBetween(10, 512); + final long blobSize = (parts * bufferSize.getBytes()) + lastPartSize; + + final int nbErrors = 2; // we want all requests to fail at least once + final CountDown countDownInitiate = new CountDown(nbErrors); + final AtomicInteger counterUploads = new AtomicInteger(0); + final AtomicLong bytesReceived = new AtomicLong(0L); + final CountDown countDownComplete = new CountDown(nbErrors); + + httpServer.createContext("/bucket/write_large_blob_streaming", exchange -> { + final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length")); + + if ("POST".equals(exchange.getRequestMethod()) + && exchange.getRequestURI().getQuery().equals("uploads")) { + // initiate multipart upload request + if (countDownInitiate.countDown()) { + byte[] response = ("\n" + + "\n" + + " bucket\n" + + " write_large_blob_streaming\n" + + " TEST\n" + + "").getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; + } + } else if ("PUT".equals(exchange.getRequestMethod()) + && exchange.getRequestURI().getQuery().contains("uploadId=TEST") + && exchange.getRequestURI().getQuery().contains("partNumber=")) { + // upload part request + MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); + BytesReference bytes = Streams.readFully(md5); + + if (counterUploads.incrementAndGet() % 2 == 0) { + bytesReceived.addAndGet(bytes.length()); + exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + exchange.close(); + return; + } + + } else if ("POST".equals(exchange.getRequestMethod()) + && exchange.getRequestURI().getQuery().equals("uploadId=TEST")) { + // complete multipart upload request + if (countDownComplete.countDown()) { + Streams.readFully(exchange.getRequestBody()); + byte[] response = ("\n" + + "\n" + + " bucket\n" + + " write_large_blob_streaming\n" + + "").getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; + } + } + + // sends an error back or let the request time out + if (useTimeout == false) { + if (randomBoolean() && contentLength > 0) { + Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.toIntExact(contentLength - 1))]); + } else { + Streams.readFully(exchange.getRequestBody()); + exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); + } + exchange.close(); + } + }); + + blobContainer.writeBlob("write_large_blob_streaming", false, randomBoolean(), out -> { + final byte[] buffer = new byte[16 * 1024]; + long outstanding = blobSize; + while (outstanding > 0) { + if (randomBoolean()) { + int toWrite = Math.toIntExact(Math.min(randomIntBetween(64, buffer.length), outstanding)); + out.write(buffer, 0, toWrite); + outstanding -= toWrite; + } else { + out.write(0); + outstanding--; + } + } + }); + + assertEquals(blobSize, bytesReceived.get()); + } + /** * Asserts that an InputStream is fully consumed, or aborted, when it is closed */ diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 266a1536274b2..90da41f654528 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -9,9 +9,11 @@ package org.elasticsearch.common.blobstore; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.core.CheckedConsumer; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.Iterator; @@ -116,6 +118,18 @@ default void writeBlob(String blobName, BytesReference bytes, boolean failIfAlre writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); } + /** + * Write a blob by providing a consumer that will write its contents to an output stream. This method allows serializing a blob's + * contents directly to the blob store without having to materialize the serialized version in full before writing. + * + * @param blobName the name of the blob to write + * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists + * @param atomic whether the write should be atomic in case the implementation supports it + * @param writer consumer for an output stream that will write the blob contents to the stream + */ + void writeBlob(String blobName, boolean failIfAlreadyExists, boolean atomic, + CheckedConsumer writer) throws IOException; + /** * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 2d1d75618ad45..e7f7fd0889136 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -18,9 +18,11 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.internal.io.IOUtils; import java.io.FileNotFoundException; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -243,6 +245,49 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea IOUtils.fsync(path, true); } + @Override + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + if (atomic) { + final String tempBlob = tempBlobName(blobName); + try { + writeToPath(tempBlob, true, writer); + moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists); + } catch (IOException ex) { + try { + deleteBlobsIgnoringIfNotExists(Iterators.single(tempBlob)); + } catch (IOException e) { + ex.addSuppressed(e); + } + throw ex; + } + } else { + writeToPath(blobName, failIfAlreadyExists, writer); + } + IOUtils.fsync(path, true); + } + + private void writeToPath(String blobName, boolean failIfAlreadyExists, CheckedConsumer writer) + throws IOException { + final Path file = path.resolve(blobName); + try { + try (OutputStream out = new BlobOutputStream(file)) { + writer.accept(out); + } + } catch (FileAlreadyExistsException faee) { + if (failIfAlreadyExists) { + throw faee; + } + deleteBlobsIgnoringIfNotExists(Iterators.single(blobName)); + try (OutputStream out = new BlobOutputStream(file)) { + writer.accept(out); + } + } + IOUtils.fsync(file, false); + } + @Override public void writeBlobAtomic(final String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final String tempBlob = tempBlobName(blobName); @@ -306,4 +351,16 @@ public static String tempBlobName(final String blobName) { public static boolean isTempBlobName(final String blobName) { return blobName.startsWith(TEMP_FILE_PREFIX); } + + private static class BlobOutputStream extends FilterOutputStream { + + BlobOutputStream(Path file) throws IOException { + super(Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java index cb287531c8f7b..d2746fcb8d1f8 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java @@ -13,9 +13,11 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.core.CheckedConsumer; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Iterator; import java.util.Map; import java.util.Objects; @@ -61,6 +63,12 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b delegate.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); } + @Override + public void writeBlob(String blobName, boolean failIfAlreadyExists, boolean atomic, + CheckedConsumer writer) throws IOException { + delegate.writeBlob(blobName, failIfAlreadyExists, atomic, writer); + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { delegate.writeBlobAtomic(blobName, bytes, failIfAlreadyExists); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index fa1caca1fd28c..cd4c18baa8105 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -53,11 +53,9 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.metrics.CounterMetric; @@ -74,6 +72,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; @@ -118,6 +117,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Collection; @@ -553,15 +553,13 @@ public void cloneShardSnapshot( sourceMeta.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), shardContainer, target.getUUID(), - compress, - bigArrays + compress ); INDEX_SHARD_SNAPSHOTS_FORMAT.write( existingSnapshots.withClone(source.getName(), target.getName()), shardContainer, newGen, - compress, - bigArrays + compress ); return new ShardSnapshotResult( newGen, @@ -1394,7 +1392,7 @@ public void finalizeSnapshot( executor.execute( ActionRunnable.run( allMetaListener, - () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress, bigArrays) + () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress) ) ); @@ -1408,7 +1406,7 @@ public void finalizeSnapshot( if (metaUUID == null) { // We don't yet have this version of the metadata so we write it metaUUID = UUIDs.base64UUID(); - INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress, bigArrays); + INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress); indexMetaIdentifiers.put(identifiers, metaUUID); } indexMetas.put(index, identifiers); @@ -1417,8 +1415,7 @@ public void finalizeSnapshot( clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), - compress, - bigArrays + compress ); } })); @@ -1426,7 +1423,7 @@ public void finalizeSnapshot( executor.execute( ActionRunnable.run( allMetaListener, - () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress, bigArrays) + () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress) ) ); }, onUpdateFailure); @@ -2250,13 +2247,11 @@ public void onFailure(Exception e) { } final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); - try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) { + writeAtomic(blobContainer(), indexBlob, out -> { try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(Streams.noCloseStream(out))) { newRepositoryData.snapshotsToXContent(xContentBuilder, version); } - final BytesReference serializedRepoData = out.bytes(); - writeAtomic(blobContainer(), indexBlob, serializedRepoData, true); - } + }, true); maybeWriteIndexLatest(newGen); // Step 3: Update CS to reflect new repository generation. @@ -2349,7 +2344,7 @@ private void maybeWriteIndexLatest(long newGen) { if (supportURLRepo) { logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen); try { - writeAtomic(blobContainer(), INDEX_LATEST_BLOB, new BytesArray(Numbers.longToBytes(newGen)), false); + writeAtomic(blobContainer(), INDEX_LATEST_BLOB, out -> out.write(Numbers.longToBytes(newGen)), false); } catch (Exception e) { logger.warn( () -> new ParameterizedMessage( @@ -2530,10 +2525,14 @@ private long latestGeneration(Collection rootBlobs) { return latest; } - private void writeAtomic(BlobContainer container, final String blobName, final BytesReference bytesRef, boolean failIfAlreadyExists) - throws IOException { + private void writeAtomic( + BlobContainer container, + final String blobName, + CheckedConsumer writer, + boolean failIfAlreadyExists + ) throws IOException { logger.trace(() -> new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path())); - container.writeBlobAtomic(blobName, bytesRef, failIfAlreadyExists); + container.writeBlob(blobName, failIfAlreadyExists, true, writer); } @Override @@ -2684,13 +2683,7 @@ public void snapshotShard(SnapshotShardContext context) { // reference a generation that has not had all its files fully upload. indexGeneration = UUIDs.randomBase64UUID(); try { - INDEX_SHARD_SNAPSHOTS_FORMAT.write( - updatedBlobStoreIndexShardSnapshots, - shardContainer, - indexGeneration, - compress, - bigArrays - ); + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress); } catch (IOException e) { throw new IndexShardSnapshotFailedException( shardId, @@ -2767,7 +2760,7 @@ public void snapshotShard(SnapshotShardContext context) { ); try { final String snapshotUUID = snapshotId.getUUID(); - INDEX_SHARD_SNAPSHOT_FORMAT.write(blobStoreIndexShardSnapshot, shardContainer, snapshotUUID, compress, bigArrays); + INDEX_SHARD_SNAPSHOT_FORMAT.write(blobStoreIndexShardSnapshot, shardContainer, snapshotUUID, compress); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); } @@ -3140,7 +3133,7 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta( final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); if (indexGeneration < 0L) { writtenGeneration = UUIDs.randomBase64UUID(); - INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress, bigArrays); + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress); } else { writtenGeneration = String.valueOf(indexGeneration); writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots); @@ -3180,12 +3173,11 @@ private void writeShardIndexBlobAtomic( () -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(), indexGeneration, shardContainer.path()) ); final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration)); - INDEX_SHARD_SNAPSHOTS_FORMAT.serialize( - updatedSnapshots, + writeAtomic( + shardContainer, blobName, - compress, - bigArrays, - bytes -> writeAtomic(shardContainer, blobName, bytes, true) + out -> INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compress, out), + true ); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 00f136b6413fa..0425213c9a650 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -19,12 +19,9 @@ import org.elasticsearch.common.Numbers; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -32,7 +29,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.gateway.CorruptStateException; import java.io.FilterInputStream; @@ -272,47 +268,38 @@ private int getAvailable() throws IOException { * @param name blob name * @param compress whether to use compression */ - public void write(T obj, BlobContainer blobContainer, String name, boolean compress, BigArrays bigArrays) throws IOException { + public void write(T obj, BlobContainer blobContainer, String name, boolean compress) throws IOException { final String blobName = blobName(name); - serialize(obj, blobName, compress, bigArrays, bytes -> blobContainer.writeBlob(blobName, bytes, false)); + blobContainer.writeBlob(blobName, false, false, out -> serialize(obj, blobName, compress, out)); } - public void serialize( - final T obj, - final String blobName, - final boolean compress, - BigArrays bigArrays, - CheckedConsumer consumer - ) throws IOException { - try (ReleasableBytesStreamOutput outputStream = new ReleasableBytesStreamOutput(bigArrays)) { - try ( - OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( - "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")", - blobName, - org.elasticsearch.common.io.Streams.noCloseStream(outputStream), - BUFFER_SIZE + public void serialize(final T obj, final String blobName, final boolean compress, OutputStream outputStream) throws IOException { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "ChecksumBlobStoreFormat.serialize(blob=\"" + blobName + "\")", + blobName, + org.elasticsearch.common.io.Streams.noCloseStream(outputStream), + BUFFER_SIZE + ) + ) { + CodecUtil.writeHeader(indexOutput, codec, VERSION); + try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { + @Override + public void close() { + // this is important since some of the XContentBuilders write bytes on close. + // in order to write the footer we need to prevent closing the actual index input. + } + }; + XContentBuilder builder = XContentFactory.contentBuilder( + XContentType.SMILE, + compress ? CompressorFactory.COMPRESSOR.threadLocalOutputStream(indexOutputOutputStream) : indexOutputOutputStream ) ) { - CodecUtil.writeHeader(indexOutput, codec, VERSION); - try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { - @Override - public void close() { - // this is important since some of the XContentBuilders write bytes on close. - // in order to write the footer we need to prevent closing the actual index input. - } - }; - XContentBuilder builder = XContentFactory.contentBuilder( - XContentType.SMILE, - compress ? CompressorFactory.COMPRESSOR.threadLocalOutputStream(indexOutputOutputStream) : indexOutputOutputStream - ) - ) { - builder.startObject(); - obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS); - builder.endObject(); - } - CodecUtil.writeFooter(indexOutput); + builder.startObject(); + obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS); + builder.endObject(); } - consumer.accept(outputStream.bytes()); + CodecUtil.writeFooter(indexOutput); } } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index 57aba7691e8c0..2ff9d68ce8657 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.blobstore.fs.FsBlobStore; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -87,9 +86,9 @@ public void testBlobStoreOperations() throws IOException { // Write blobs in different formats final String randomText = randomAlphaOfLengthBetween(0, 1024 * 8 * 3); final String normalText = "checksum smile: " + randomText; - checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE); + checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false); final String compressedText = "checksum smile compressed: " + randomText; - checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true, MockBigArrays.NON_RECYCLING_INSTANCE); + checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true); // Assert that all checksum blobs can be read assertEquals(normalText, checksumSMILE.read("repo", blobContainer, "check-smile", xContentRegistry()).getText()); @@ -109,8 +108,8 @@ public void testCompressionIsApplied() throws IOException { (repo, parser) -> BlobObj.fromXContent(parser) ); BlobObj blobObj = new BlobObj(veryRedundantText.toString()); - checksumFormat.write(blobObj, blobContainer, "blob-comp", true, MockBigArrays.NON_RECYCLING_INSTANCE); - checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false, MockBigArrays.NON_RECYCLING_INSTANCE); + checksumFormat.write(blobObj, blobContainer, "blob-comp", true); + checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false); Map blobs = blobContainer.listBlobsByPrefix("blob-"); assertEquals(blobs.size(), 2); assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length())); @@ -121,12 +120,13 @@ public void testBlobCorruption() throws IOException { BlobContainer blobContainer = blobStore.blobContainer(BlobPath.EMPTY); String testString = randomAlphaOfLength(randomInt(10000)); BlobObj blobObj = new BlobObj(testString); + ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>( BLOB_CODEC, "%s", (repo, parser) -> BlobObj.fromXContent(parser) ); - checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean(), MockBigArrays.NON_RECYCLING_INSTANCE); + checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean()); assertEquals(checksumFormat.read("repo", blobContainer, "test-path", xContentRegistry()).getText(), testString); randomCorruption(blobContainer, "test-path"); try { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java index 6d1073e14dfb7..f8a7c1a1ac59b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java @@ -9,9 +9,7 @@ package org.elasticsearch.snapshots; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.test.AbstractWireTestCase; @@ -32,22 +30,13 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) throws IOException @Override protected SnapshotInfo copyInstance(SnapshotInfo instance, Version version) throws IOException { - final PlainActionFuture future = new PlainActionFuture<>(); - BlobStoreRepository.SNAPSHOT_FORMAT.serialize( - instance, - "test", - randomBoolean(), - BigArrays.NON_RECYCLING_INSTANCE, - bytes -> ActionListener.completeWith( - future, - () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize( - instance.repository(), - NamedXContentRegistry.EMPTY, - bytes.streamInput() - ) - ) + final BytesStreamOutput out = new BytesStreamOutput(); + BlobStoreRepository.SNAPSHOT_FORMAT.serialize(instance, "test", randomBoolean(), out); + return BlobStoreRepository.SNAPSHOT_FORMAT.deserialize( + instance.repository(), + NamedXContentRegistry.EMPTY, + out.bytes().streamInput() ); - return future.actionGet(); } } diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index 2258dbd4f4420..aee7e0a760478 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -20,11 +20,8 @@ import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; @@ -33,7 +30,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URLDecoder; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -211,11 +207,6 @@ public void handle(final HttpExchange exchange) throws IOException { blobs.put(blobName, BytesArray.EMPTY); byte[] response = requestBody.utf8ToString().getBytes(UTF_8); - if (Paths.get(blobName).getFileName().toString().startsWith(BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX) == false) { - final Map parsedBody = XContentHelper.convertToMap(requestBody, false, XContentType.JSON).v2(); - assert parsedBody.get("md5Hash") != null - : "file [" + blobName + "] is not a data blob but did not come with a md5 checksum"; - } exchange.getResponseHeaders().add("Content-Type", "application/json"); exchange.getResponseHeaders() .add( diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 80eb6279c92df..11ab641d21531 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -37,7 +37,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -382,7 +381,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, Version PlainActionFuture.get(f -> blobStoreRepository.threadPool().generic().execute(ActionRunnable.run(f, () -> BlobStoreRepository.SNAPSHOT_FORMAT.write(downgradedSnapshotInfo, blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath()), snapshotInfo.snapshotId().getUUID(), - randomBoolean(), internalCluster().getCurrentMasterNodeInstance(BigArrays.class))))); + randomBoolean())))); return oldVersionSnapshot; } diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 639a9e7b08d19..0aa6e8d13ca8a 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.blobstore.support.FilterBlobContainer; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.PathUtils; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -40,6 +41,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.nio.file.Path; import java.security.MessageDigest; @@ -488,11 +490,7 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { - maybeIOExceptionOrBlock(blobName); - if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) - && path().equals(basePath()) == false) { - blockExecutionAndMaybeWait(blobName); - } + beforeWrite(blobName); super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); if (RandomizedContext.current().getRandom().nextBoolean()) { // for network based repositories, the blob may have been written but we may still @@ -502,19 +500,35 @@ && path().equals(basePath()) == false) { } @Override - public void writeBlobAtomic(final String blobName, final BytesReference bytes, - final boolean failIfAlreadyExists) throws IOException { - final Random random = RandomizedContext.current().getRandom(); - if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { - throw new IOException("Random IOException"); + public void writeBlob(String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer) throws IOException { + if (atomic) { + beforeAtomicWrite(blobName); + } else { + beforeWrite(blobName); } - if (blobName.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) { - if (blockAndFailOnWriteIndexFile) { - blockExecutionAndFail(blobName); - } else if (blockOnWriteIndexFile) { - blockExecutionAndMaybeWait(blobName); - } + super.writeBlob(blobName, failIfAlreadyExists, atomic, writer); + if (RandomizedContext.current().getRandom().nextBoolean()) { + // for network based repositories, the blob may have been written but we may still + // get an error with the client connection, so an IOException here simulates this + maybeIOExceptionOrBlock(blobName); + } + } + + private void beforeWrite(String blobName) throws IOException { + maybeIOExceptionOrBlock(blobName); + if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) + && path().equals(basePath()) == false) { + blockExecutionAndMaybeWait(blobName); } + } + + @Override + public void writeBlobAtomic(final String blobName, final BytesReference bytes, + final boolean failIfAlreadyExists) throws IOException { + final Random random = beforeAtomicWrite(blobName); if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) { // Simulate a failure between the write and move operation in FsBlobContainer final String tempBlobName = FsBlobContainer.tempBlobName(blobName); @@ -529,6 +543,21 @@ public void writeBlobAtomic(final String blobName, final BytesReference bytes, super.writeBlobAtomic(blobName, bytes, failIfAlreadyExists); } } + + private Random beforeAtomicWrite(String blobName) throws IOException { + final Random random = RandomizedContext.current().getRandom(); + if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { + throw new IOException("Random IOException"); + } + if (blobName.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) { + if (blockAndFailOnWriteIndexFile) { + blockExecutionAndFail(blobName); + } else if (blockOnWriteIndexFile) { + blockExecutionAndMaybeWait(blobName); + } + } + return random; + } } } } diff --git a/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java b/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java index c70e1cf783c91..63ab370d8a820 100644 --- a/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java +++ b/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.Tuple; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -48,6 +49,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; import java.security.GeneralSecurityException; @@ -622,6 +624,24 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea } } + @Override + public void writeBlob( + String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer + ) throws IOException { + // TODO: this is just a stop-gap solution for until we have an encrypted output stream wrapper + try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) { + writer.accept(out); + if (atomic) { + writeBlobAtomic(blobName, out.bytes(), failIfAlreadyExists); + } else { + writeBlob(blobName, out.bytes(), failIfAlreadyExists); + } + } + } + private ChainingInputStream encryptedInput(InputStream inputStream, SingleUseKey singleUseNonceAndDEK, BytesReference dekIdBytes) throws IOException { return ChainingInputStream.chain( diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java index 537cc0903b558..398ef75189e07 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.PathUtilsForTesting; import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheService; @@ -32,6 +33,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.channels.FileChannel; import java.nio.file.FileSystem; import java.nio.file.OpenOption; @@ -269,6 +271,16 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b throw unsupportedException(); } + @Override + public void writeBlob( + String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer + ) { + throw unsupportedException(); + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) { throw unsupportedException(); diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java index 2837c2f31540f..44e730c89ad5f 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java @@ -17,12 +17,14 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -42,6 +44,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; import java.util.Arrays; import java.util.Collection; @@ -453,6 +456,22 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); } + @Override + public void writeBlob( + String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer + ) throws IOException { + final BytesStreamOutput out = new BytesStreamOutput(); + writer.accept(out); + if (atomic) { + writeBlobAtomic(blobName, out.bytes(), failIfAlreadyExists); + } else { + writeBlob(blobName, out.bytes(), failIfAlreadyExists); + } + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { final StreamInput inputStream; diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java index 1134e815f11a2..2f883759b7fe0 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java @@ -16,11 +16,13 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; @@ -40,6 +42,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -326,6 +329,22 @@ public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlrea writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); } + @Override + public void writeBlob( + String blobName, + boolean failIfAlreadyExists, + boolean atomic, + CheckedConsumer writer + ) throws IOException { + final BytesStreamOutput out = new BytesStreamOutput(); + writer.accept(out); + if (atomic) { + writeBlobAtomic(blobName, out.bytes(), failIfAlreadyExists); + } else { + writeBlob(blobName, out.bytes(), failIfAlreadyExists); + } + } + @Override public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { writeBlobAtomic(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);