Skip to content

Commit 75312a7

Browse files
committed
User proper write-once semantics for GCS repository (#30438)
There's no need for an extra blobExists() call when writing a blob to the GCS service. GCS provides an option (with stronger consistency guarantees) on the insert method that guarantees that the blob that's uploaded does not already exist. Relates to #19749
1 parent 1d329d8 commit 75312a7

File tree

3 files changed

+58
-24
lines changed

3 files changed

+58
-24
lines changed

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

-3
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ public InputStream readBlob(String blobName) throws IOException {
6666

6767
@Override
6868
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
69-
if (blobExists(blobName)) {
70-
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
71-
}
7269
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
7370
}
7471

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

+40-18
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.cloud.storage.Storage;
2929
import com.google.cloud.storage.Storage.BlobListOption;
3030
import com.google.cloud.storage.Storage.CopyRequest;
31+
import com.google.cloud.storage.StorageException;
3132
import org.elasticsearch.common.SuppressForbidden;
3233
import org.elasticsearch.common.blobstore.BlobContainer;
3334
import org.elasticsearch.common.blobstore.BlobMetaData;
@@ -47,12 +48,15 @@
4748
import java.nio.channels.Channels;
4849
import java.nio.channels.ReadableByteChannel;
4950
import java.nio.channels.WritableByteChannel;
51+
import java.nio.file.FileAlreadyExistsException;
5052
import java.nio.file.NoSuchFileException;
5153
import java.util.Collection;
5254
import java.util.List;
5355
import java.util.Map;
5456
import java.util.stream.Collectors;
5557

58+
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
59+
5660
class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore {
5761

5862
// The recommended maximum size of a blob that should be uploaded in a single
@@ -204,24 +208,32 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I
204208
* @param inputStream the stream containing the blob data
205209
*/
206210
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
207-
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> storage.writer(blobInfo));
208-
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
209-
@Override
210-
public boolean isOpen() {
211-
return writeChannel.isOpen();
212-
}
211+
try {
212+
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(
213+
() -> storage.writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
214+
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
215+
@Override
216+
public boolean isOpen() {
217+
return writeChannel.isOpen();
218+
}
213219

214-
@Override
215-
public void close() throws IOException {
216-
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
217-
}
220+
@Override
221+
public void close() throws IOException {
222+
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
223+
}
218224

219-
@SuppressForbidden(reason = "Channel is based of a socket not a file")
220-
@Override
221-
public int write(ByteBuffer src) throws IOException {
222-
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
225+
@SuppressForbidden(reason = "Channel is based of a socket not a file")
226+
@Override
227+
public int write(ByteBuffer src) throws IOException {
228+
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
229+
}
230+
}));
231+
} catch (StorageException se) {
232+
if (se.getCode() == HTTP_PRECON_FAILED) {
233+
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
223234
}
224-
}));
235+
throw se;
236+
}
225237
}
226238

227239
/**
@@ -238,7 +250,17 @@ private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long
238250
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
239251
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
240252
Streams.copy(inputStream, baos);
241-
SocketAccess.doPrivilegedVoidIOException(() -> storage.create(blobInfo, baos.toByteArray()));
253+
SocketAccess.doPrivilegedVoidIOException(
254+
() -> {
255+
try {
256+
storage.create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist());
257+
} catch (StorageException se) {
258+
if (se.getCode() == HTTP_PRECON_FAILED) {
259+
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
260+
}
261+
throw se;
262+
}
263+
});
242264
}
243265

244266
/**
@@ -295,8 +317,8 @@ void deleteBlobs(Collection<String> blobNames) throws IOException {
295317
/**
296318
* Moves a blob within the same bucket
297319
*
298-
* @param sourceBlob name of the blob to move
299-
* @param targetBlob new name of the blob in the same bucket
320+
* @param sourceBlobName name of the blob to move
321+
* @param targetBlobName new name of the blob in the same bucket
300322
*/
301323
void moveBlob(String sourceBlobName, String targetBlobName) throws IOException {
302324
final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName);

plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.concurrent.ConcurrentMap;
5757
import java.util.concurrent.TimeUnit;
5858
import java.util.stream.Collectors;
59+
import java.util.stream.Stream;
5960

6061
/**
6162
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
@@ -113,7 +114,14 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option
113114
if (bucketName.equals(blobInfo.getBucket()) == false) {
114115
throw new StorageException(404, "Bucket not found");
115116
}
116-
blobs.put(blobInfo.getName(), content);
117+
if (Stream.of(options).anyMatch(option -> option.equals(BlobTargetOption.doesNotExist()))) {
118+
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), content);
119+
if (existingBytes != null) {
120+
throw new StorageException(412, "Blob already exists");
121+
}
122+
} else {
123+
blobs.put(blobInfo.getName(), content);
124+
}
117125
return get(BlobId.of(blobInfo.getBucket(), blobInfo.getName()));
118126
}
119127

@@ -243,9 +251,16 @@ public boolean isOpen() {
243251
}
244252

245253
@Override
246-
public void close() throws IOException {
254+
public void close() {
247255
IOUtils.closeWhileHandlingException(writableByteChannel);
248-
blobs.put(blobInfo.getName(), output.toByteArray());
256+
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
257+
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
258+
if (existingBytes != null) {
259+
throw new StorageException(412, "Blob already exists");
260+
}
261+
} else {
262+
blobs.put(blobInfo.getName(), output.toByteArray());
263+
}
249264
}
250265
};
251266
}

0 commit comments

Comments
 (0)