Skip to content

Commit 7cc4b9a

Browse files
Implement Bulk Deletes for GCS Repository (#41368) (#41681)
* Implement Bulk Deletes for GCS Repository (#41368) * Just like #40322 for AWS * We already had a bulk delete API but weren't using it from the blob container implementation, now we are using it * Made the bulk delete API also compliant with our interface that only suppresses errors about non existent blobs by stating failed deletes (I didn't use any bulk stat action here since having to stat here should be the exception anyway and it would make error handling a lot more complex) * Fixed bulk delete API to limit its batch size to 100 in line with GCS recommendations back port of #41368
1 parent 53702ef commit 7cc4b9a

File tree

3 files changed

+69
-20
lines changed

3 files changed

+69
-20
lines changed

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626

2727
import java.io.IOException;
2828
import java.io.InputStream;
29+
import java.util.List;
2930
import java.util.Map;
31+
import java.util.stream.Collectors;
3032

3133
class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
3234

@@ -78,7 +80,12 @@ public void deleteBlob(String blobName) throws IOException {
7880
blobStore.deleteBlob(buildKey(blobName));
7981
}
8082

81-
protected String buildKey(String blobName) {
83+
@Override
84+
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
85+
blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
86+
}
87+
88+
private String buildKey(String blobName) {
8289
assert blobName != null;
8390
return path + blobName;
8491
}

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.repositories.gcs;
2121

22+
import com.google.cloud.BatchResult;
2223
import com.google.cloud.ReadChannel;
2324
import com.google.cloud.WriteChannel;
2425
import com.google.cloud.storage.Blob;
@@ -27,10 +28,9 @@
2728
import com.google.cloud.storage.Bucket;
2829
import com.google.cloud.storage.Storage;
2930
import com.google.cloud.storage.Storage.BlobListOption;
31+
import com.google.cloud.storage.StorageBatch;
3032
import com.google.cloud.storage.StorageException;
3133

32-
import org.apache.logging.log4j.LogManager;
33-
import org.apache.logging.log4j.Logger;
3434
import org.elasticsearch.common.SuppressForbidden;
3535
import org.elasticsearch.common.blobstore.BlobContainer;
3636
import org.elasticsearch.common.blobstore.BlobMetaData;
@@ -50,17 +50,18 @@
5050
import java.nio.channels.WritableByteChannel;
5151
import java.nio.file.FileAlreadyExistsException;
5252
import java.nio.file.NoSuchFileException;
53+
import java.util.ArrayList;
5354
import java.util.Collection;
55+
import java.util.Collections;
5456
import java.util.List;
5557
import java.util.Map;
58+
import java.util.concurrent.atomic.AtomicReference;
5659
import java.util.stream.Collectors;
5760

5861
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
5962
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
6063

6164
class GoogleCloudStorageBlobStore implements BlobStore {
62-
63-
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);
6465

6566
// The recommended maximum size of a blob that should be uploaded in a single
6667
// request. Larger files should be uploaded over multiple requests (this is
@@ -105,7 +106,7 @@ public void close() {
105106
* @param bucketName name of the bucket
106107
* @return true iff the bucket exists
107108
*/
108-
boolean doesBucketExist(String bucketName) {
109+
private boolean doesBucketExist(String bucketName) {
109110
try {
110111
final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> client().get(bucketName));
111112
return bucket != null;
@@ -295,16 +296,16 @@ void deleteBlob(String blobName) throws IOException {
295296
*
296297
* @param prefix prefix of the blobs to delete
297298
*/
298-
void deleteBlobsByPrefix(String prefix) throws IOException {
299-
deleteBlobs(listBlobsByPrefix("", prefix).keySet());
299+
private void deleteBlobsByPrefix(String prefix) throws IOException {
300+
deleteBlobsIgnoringIfNotExists(listBlobsByPrefix("", prefix).keySet());
300301
}
301302

302303
/**
303304
* Deletes multiple blobs from the specific bucket using a batch request
304305
*
305306
* @param blobNames names of the blobs to delete
306307
*/
307-
void deleteBlobs(Collection<String> blobNames) throws IOException {
308+
void deleteBlobsIgnoringIfNotExists(Collection<String> blobNames) throws IOException {
308309
if (blobNames.isEmpty()) {
309310
return;
310311
}
@@ -314,17 +315,33 @@ void deleteBlobs(Collection<String> blobNames) throws IOException {
314315
return;
315316
}
316317
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList());
317-
final List<Boolean> deletedStatuses = SocketAccess.doPrivilegedIOException(() -> client().delete(blobIdsToDelete));
318-
assert blobIdsToDelete.size() == deletedStatuses.size();
319-
boolean failed = false;
320-
for (int i = 0; i < blobIdsToDelete.size(); i++) {
321-
if (deletedStatuses.get(i) == false) {
322-
logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucketName);
323-
failed = true;
318+
final List<BlobId> failedBlobs = Collections.synchronizedList(new ArrayList<>());
319+
final StorageException e = SocketAccess.doPrivilegedIOException(() -> {
320+
final AtomicReference<StorageException> ioe = new AtomicReference<>();
321+
final StorageBatch batch = client().batch();
322+
for (BlobId blob : blobIdsToDelete) {
323+
batch.delete(blob).notify(
324+
new BatchResult.Callback<Boolean, StorageException>() {
325+
@Override
326+
public void success(Boolean result) {
327+
}
328+
329+
@Override
330+
public void error(StorageException exception) {
331+
if (exception.getCode() != HTTP_NOT_FOUND) {
332+
failedBlobs.add(blob);
333+
if (ioe.compareAndSet(null, exception) == false) {
334+
ioe.get().addSuppressed(exception);
335+
}
336+
}
337+
}
338+
});
324339
}
325-
}
326-
if (failed) {
327-
throw new IOException("Failed to delete all [" + blobIdsToDelete.size() + "] blobs");
340+
batch.submit();
341+
return ioe.get();
342+
});
343+
if (e != null) {
344+
throw new IOException("Exception when deleting blobs [" + failedBlobs + "]", e);
328345
}
329346
}
330347

plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.repositories.gcs;
2121

2222
import com.google.api.gax.paging.Page;
23+
import com.google.cloud.BatchResult;
2324
import com.google.cloud.Policy;
2425
import com.google.cloud.ReadChannel;
2526
import com.google.cloud.RestorableState;
@@ -34,11 +35,13 @@
3435
import com.google.cloud.storage.ServiceAccount;
3536
import com.google.cloud.storage.Storage;
3637
import com.google.cloud.storage.StorageBatch;
38+
import com.google.cloud.storage.StorageBatchResult;
3739
import com.google.cloud.storage.StorageException;
3840
import com.google.cloud.storage.StorageOptions;
3941
import com.google.cloud.storage.StorageRpcOptionUtils;
4042
import com.google.cloud.storage.StorageTestUtils;
4143
import org.elasticsearch.core.internal.io.IOUtils;
44+
import org.mockito.stubbing.Answer;
4245

4346
import java.io.ByteArrayInputStream;
4447
import java.io.ByteArrayOutputStream;
@@ -57,6 +60,11 @@
5760
import java.util.stream.Collectors;
5861
import java.util.stream.Stream;
5962

63+
import static org.mockito.Matchers.any;
64+
import static org.mockito.Matchers.anyVararg;
65+
import static org.mockito.Mockito.doAnswer;
66+
import static org.mockito.Mockito.mock;
67+
6068
/**
6169
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
6270
* in a given concurrent map.
@@ -356,8 +364,25 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
356364
}
357365

358366
@Override
367+
@SuppressWarnings("unchecked")
359368
public StorageBatch batch() {
360-
return null;
369+
final Answer<?> throwOnMissingMock = invocationOnMock -> {
370+
throw new AssertionError("Did not expect call to method [" + invocationOnMock.getMethod().getName() + ']');
371+
};
372+
final StorageBatch batch = mock(StorageBatch.class, throwOnMissingMock);
373+
StorageBatchResult<Boolean> result = mock(StorageBatchResult.class, throwOnMissingMock);
374+
doAnswer(answer -> {
375+
BatchResult.Callback<Boolean, Exception> callback = (BatchResult.Callback<Boolean, Exception>) answer.getArguments()[0];
376+
callback.success(true);
377+
return null;
378+
}).when(result).notify(any(BatchResult.Callback.class));
379+
doAnswer(invocation -> {
380+
final BlobId blobId = (BlobId) invocation.getArguments()[0];
381+
delete(blobId);
382+
return result;
383+
}).when(batch).delete(any(BlobId.class), anyVararg());
384+
doAnswer(invocation -> null).when(batch).submit();
385+
return batch;
361386
}
362387

363388
@Override

0 commit comments

Comments
 (0)