Skip to content

Commit 2bb4f38

Browse files
authored
Add write*Blob option to replace existing blob (#31729)
Adds a new parameter to the BlobContainer#write*Blob methods to specify whether the existing file should be overridden or not. For some metadata files in the repository, we actually want to replace the current file. This is currently implemented through an explicit blob delete and then a fresh write. In case of using a cloud provider (S3, GCS, Azure), this results in 2 API requests instead of just 1. This change will therefore allow us to achieve the same functionality using less API requests.
1 parent 631a53a commit 2bb4f38

File tree

21 files changed

+131
-88
lines changed

21 files changed

+131
-88
lines changed

modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public InputStream readBlob(String name) throws IOException {
108108
}
109109

110110
@Override
111-
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
111+
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
112112
throw new UnsupportedOperationException("URL repository doesn't support this operation");
113113
}
114114

plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,20 @@ private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Contai
122122
handlers.insert(path, (request) -> {
123123
final String destContainerName = request.getParam("container");
124124
final String destBlobName = objectName(request.getParameters());
125+
final String ifNoneMatch = request.getHeader("If-None-Match");
125126

126127
final Container destContainer = containers.get(destContainerName);
127128
if (destContainer == null) {
128129
return newContainerNotFoundError(request.getId());
129130
}
130131

131-
byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody());
132-
if (existingBytes != null) {
133-
return newBlobAlreadyExistsError(request.getId());
132+
if ("*".equals(ifNoneMatch)) {
133+
byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody());
134+
if (existingBytes != null) {
135+
return newBlobAlreadyExistsError(request.getId());
136+
}
137+
} else {
138+
destContainer.objects.put(destBlobName, request.getBody());
134139
}
135140

136141
return new Response(RestStatus.CREATED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); })

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,11 @@ public InputStream readBlob(String blobName) throws IOException {
8686
}
8787

8888
@Override
89-
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
89+
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
9090
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);
9191

9292
try {
93-
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
93+
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
9494
} catch (URISyntaxException|StorageException e) {
9595
throw new IOException("Can not write blob " + blobName, e);
9696
}

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix
117117
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
118118
}
119119

120-
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException,
121-
FileAlreadyExistsException {
122-
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize);
120+
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
121+
throws URISyntaxException, StorageException, FileAlreadyExistsException {
122+
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
123123
}
124124
}

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -236,17 +236,20 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
236236
return blobsBuilder.immutableMap();
237237
}
238238

239-
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
239+
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
240+
boolean failIfAlreadyExists)
240241
throws URISyntaxException, StorageException, FileAlreadyExistsException {
241242
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
242243
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
243244
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
244245
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
245246
try {
247+
final AccessCondition accessCondition =
248+
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
246249
SocketAccess.doPrivilegedVoidException(() ->
247-
blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), null, client.v2().get()));
250+
blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get()));
248251
} catch (final StorageException se) {
249-
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
252+
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
250253
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
251254
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
252255
}

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,10 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
108108
}
109109

110110
@Override
111-
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize)
111+
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
112+
boolean failIfAlreadyExists)
112113
throws URISyntaxException, StorageException, FileAlreadyExistsException {
113-
if (blobs.containsKey(blobName)) {
114+
if (failIfAlreadyExists && blobs.containsKey(blobName)) {
114115
throw new FileAlreadyExistsException(blobName);
115116
}
116117
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {

plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,6 @@ private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket
158158
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
159159
handlers.insert("POST /upload/storage/v1/b/{bucket}/o", (request) -> {
160160
final String ifGenerationMatch = request.getParam("ifGenerationMatch");
161-
if ("0".equals(ifGenerationMatch) == false) {
162-
return newError(RestStatus.PRECONDITION_FAILED, "object already exist");
163-
}
164-
165161
final String uploadType = request.getParam("uploadType");
166162
if ("resumable".equals(uploadType)) {
167163
final String objectName = request.getParam("name");
@@ -172,12 +168,19 @@ private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket
172168
if (bucket == null) {
173169
return newError(RestStatus.NOT_FOUND, "bucket not found");
174170
}
175-
if (bucket.objects.putIfAbsent(objectName, EMPTY_BYTE) == null) {
176-
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
171+
if ("0".equals(ifGenerationMatch)) {
172+
if (bucket.objects.putIfAbsent(objectName, EMPTY_BYTE) == null) {
173+
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
177174
+ objectName;
178-
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
175+
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
176+
} else {
177+
return newError(RestStatus.PRECONDITION_FAILED, "object already exist");
178+
}
179179
} else {
180-
return newError(RestStatus.CONFLICT, "object already exist");
180+
bucket.objects.put(objectName, EMPTY_BYTE);
181+
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
182+
+ objectName;
183+
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
181184
}
182185
} else if ("multipart".equals(uploadType)) {
183186
/*

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ public InputStream readBlob(String blobName) throws IOException {
6464
}
6565

6666
@Override
67-
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
68-
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
67+
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
68+
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
6969
}
7070

7171
@Override

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

+21-14
Original file line numberDiff line numberDiff line change
@@ -193,31 +193,34 @@ public void close() throws IOException {
193193

194194
/**
195195
* Writes a blob in the specific bucket
196-
*
197-
* @param inputStream content of the blob to be written
196+
* @param inputStream content of the blob to be written
198197
* @param blobSize expected size of the blob to be written
198+
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
199199
*/
200-
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
200+
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
201201
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
202202
if (blobSize > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
203-
writeBlobResumable(blobInfo, inputStream);
203+
writeBlobResumable(blobInfo, inputStream, failIfAlreadyExists);
204204
} else {
205-
writeBlobMultipart(blobInfo, inputStream, blobSize);
205+
writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists);
206206
}
207207
}
208208

209209
/**
210210
* Uploads a blob using the "resumable upload" method (multiple requests, which
211211
* can be independently retried in case of failure, see
212212
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
213-
*
214213
* @param blobInfo the info for the blob to be uploaded
215214
* @param inputStream the stream containing the blob data
215+
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
216216
*/
217-
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
217+
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
218218
try {
219+
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
220+
new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } :
221+
new Storage.BlobWriteOption[0];
219222
final WriteChannel writeChannel = SocketAccess
220-
.doPrivilegedIOException(() -> client().writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
223+
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
221224
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
222225
@Override
223226
public boolean isOpen() {
@@ -236,7 +239,7 @@ public int write(ByteBuffer src) throws IOException {
236239
}
237240
}));
238241
} catch (final StorageException se) {
239-
if (se.getCode() == HTTP_PRECON_FAILED) {
242+
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
240243
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
241244
}
242245
throw se;
@@ -248,20 +251,24 @@ public int write(ByteBuffer src) throws IOException {
248251
* 'multipart/related' request containing both data and metadata. The request is
249252
* gziped), see:
250253
* https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload
251-
*
252-
* @param blobInfo the info for the blob to be uploaded
254+
* @param blobInfo the info for the blob to be uploaded
253255
* @param inputStream the stream containing the blob data
254256
* @param blobSize the size
257+
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
255258
*/
256-
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize) throws IOException {
259+
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
260+
throws IOException {
257261
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
258262
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
259263
Streams.copy(inputStream, baos);
260264
try {
265+
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
266+
new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :
267+
new Storage.BlobTargetOption[0];
261268
SocketAccess.doPrivilegedVoidIOException(
262-
() -> client().create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist()));
269+
() -> client().create(blobInfo, baos.toByteArray(), targetOptions));
263270
} catch (final StorageException se) {
264-
if (se.getCode() == HTTP_PRECON_FAILED) {
271+
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
265272
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
266273
}
267274
throw se;

plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,12 @@ public InputStream readBlob(String blobName) throws IOException {
9191
}
9292

9393
@Override
94-
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
94+
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
9595
store.execute((Operation<Void>) fileContext -> {
9696
Path blob = new Path(path, blobName);
9797
// we pass CREATE, which means it fails if a blob already exists.
98-
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
98+
EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) :
99+
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
99100
CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)};
100101
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
101102
int bytesRead;

plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void testReadOnly() throws Exception {
135135
assertTrue(util.exists(hdfsPath));
136136

137137
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
138-
writeBlob(container, "foo", new BytesArray(data));
138+
writeBlob(container, "foo", new BytesArray(data), randomBoolean());
139139
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
140140
assertTrue(container.blobExists("foo"));
141141
}

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,11 @@ public InputStream readBlob(String blobName) throws IOException {
9090
}
9191
}
9292

93+
/**
94+
* This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model.
95+
*/
9396
@Override
94-
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
97+
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
9598
SocketAccess.doPrivilegedIOException(() -> {
9699
if (blobSize <= blobStore.bufferSizeInBytes()) {
97100
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,18 @@ public interface BlobContainer {
6969
* @param blobSize
7070
* The size of the blob to be written, in bytes. It is implementation dependent whether
7171
* this value is used in writing the blob to the repository.
72-
* @throws FileAlreadyExistsException if a blob by the same name already exists
72+
* @param failIfAlreadyExists
73+
* whether to throw a FileAlreadyExistsException if the given blob already exists
74+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
7375
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
7476
*/
75-
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException;
77+
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
7678

7779
/**
7880
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
7981
* using an atomic write operation if the implementation supports it. When the BlobContainer implementation
8082
* does not provide a specific implementation of writeBlobAtomic(String, InputStream, long), then
81-
* the {@link #writeBlob(String, InputStream, long)} method is used.
83+
* the {@link #writeBlob(String, InputStream, long, boolean)} method is used.
8284
*
8385
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
8486
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
@@ -90,11 +92,14 @@ public interface BlobContainer {
9092
* @param blobSize
9193
* The size of the blob to be written, in bytes. It is implementation dependent whether
9294
* this value is used in writing the blob to the repository.
93-
* @throws FileAlreadyExistsException if a blob by the same name already exists
95+
* @param failIfAlreadyExists
96+
* whether to throw a FileAlreadyExistsException if the given blob already exists
97+
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
9498
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
9599
*/
96-
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
97-
writeBlob(blobName, inputStream, blobSize);
100+
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists)
101+
throws IOException {
102+
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
98103
}
99104

100105
/**

0 commit comments

Comments
 (0)