diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 331e2dadca2da..833539905103a 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -66,9 +66,6 @@ public InputStream readBlob(String blobName) throws IOException { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { - if (blobExists(blobName)) { - throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); - } blobStore.writeBlob(buildKey(blobName), inputStream, blobSize); } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 5dc03ea45de03..83aafdde2b1ab 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -28,6 +28,7 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobListOption; import com.google.cloud.storage.Storage.CopyRequest; +import com.google.cloud.storage.StorageException; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -47,12 +48,15 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; + class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore { // The recommended maximum size of a blob that should be uploaded in a single @@ -204,24 +208,32 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I * @param inputStream the stream containing the blob data */ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException { - final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> storage.writer(blobInfo)); - Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { - @Override - public boolean isOpen() { - return writeChannel.isOpen(); - } + try { + final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException( + () -> storage.writer(blobInfo, Storage.BlobWriteOption.doesNotExist())); + Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { + @Override + public boolean isOpen() { + return writeChannel.isOpen(); + } - @Override - public void close() throws IOException { - SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - } + @Override + public void close() throws IOException { + SocketAccess.doPrivilegedVoidIOException(writeChannel::close); + } - @SuppressForbidden(reason = "Channel is based of a socket not a file") - @Override - public int write(ByteBuffer src) throws IOException { - return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src)); + @SuppressForbidden(reason = "Channel is based of a socket not a file") + @Override + public int write(ByteBuffer src) throws IOException { + return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src)); + } + })); + } catch (StorageException se) { + if (se.getCode() == HTTP_PRECON_FAILED) { + throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); } - })); + throw se; + } } /** @@ -238,7 +250,17 @@ private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method"; final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize)); Streams.copy(inputStream, baos); - SocketAccess.doPrivilegedVoidIOException(() -> storage.create(blobInfo, baos.toByteArray())); + SocketAccess.doPrivilegedVoidIOException( + () -> { + try { + storage.create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist()); + } catch (StorageException se) { + if (se.getCode() == HTTP_PRECON_FAILED) { + throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); + } + throw se; + } + }); } /** @@ -295,8 +317,8 @@ void deleteBlobs(Collection blobNames) throws IOException { /** * Moves a blob within the same bucket * - * @param sourceBlob name of the blob to move - * @param targetBlob new name of the blob in the same bucket + * @param sourceBlobName name of the blob to move + * @param targetBlobName new name of the blob in the same bucket */ void moveBlob(String sourceBlobName, String targetBlobName) throws IOException { final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName); diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java index 2b52b7a32a9cc..1b31b3018e48a 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java @@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * {@link MockStorage} mocks a {@link Storage} client by storing all the blobs @@ -113,7 +114,14 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option if (bucketName.equals(blobInfo.getBucket()) == false) { throw new StorageException(404, "Bucket not found"); } - blobs.put(blobInfo.getName(), content); + if (Stream.of(options).anyMatch(option -> option.equals(BlobTargetOption.doesNotExist()))) { + byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), content); + if (existingBytes != null) { + throw new StorageException(412, "Blob already exists"); + } + } else { + blobs.put(blobInfo.getName(), content); + } return get(BlobId.of(blobInfo.getBucket(), blobInfo.getName())); } @@ -243,9 +251,16 @@ public boolean isOpen() { } @Override - public void close() throws IOException { + public void close() { IOUtils.closeWhileHandlingException(writableByteChannel); - blobs.put(blobInfo.getName(), output.toByteArray()); + if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) { + byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray()); + if (existingBytes != null) { + throw new StorageException(412, "Blob already exists"); + } + } else { + blobs.put(blobInfo.getName(), output.toByteArray()); + } } }; }