Skip to content

Commit bcfeb02

Browse files
original-brownbearGurkan Kaymak
authored and
Gurkan Kaymak
committed
Add Bulk Delete Api to BlobStore (elastic#40322)
* Adds Bulk delete API to blob container * Implement bulk delete API for S3 * Adjust S3Fixture to accept both path styles for bulk deletes since the S3 SDK uses both during our ITs * Closes elastic#40250
1 parent b1f8f84 commit bcfeb02

File tree

6 files changed

+136
-51
lines changed

6 files changed

+136
-51
lines changed

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

+52
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
2424
import com.amazonaws.services.s3.model.AmazonS3Exception;
2525
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
26+
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
2627
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
2728
import com.amazonaws.services.s3.model.ObjectListing;
2829
import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -56,6 +57,12 @@
5657

5758
class S3BlobContainer extends AbstractBlobContainer {
5859

60+
/**
61+
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
62+
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
63+
*/
64+
private static final int MAX_BULK_DELETES = 1000;
65+
5966
private final S3BlobStore blobStore;
6067
private final String keyPath;
6168

@@ -118,6 +125,51 @@ public void deleteBlob(String blobName) throws IOException {
118125
deleteBlobIgnoringIfNotExists(blobName);
119126
}
120127

128+
@Override
129+
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
130+
if (blobNames.isEmpty()) {
131+
return;
132+
}
133+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
134+
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
135+
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
136+
final List<String> partition = new ArrayList<>();
137+
for (String blob : blobNames) {
138+
partition.add(buildKey(blob));
139+
if (partition.size() == MAX_BULK_DELETES ) {
140+
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
141+
partition.clear();
142+
}
143+
}
144+
if (partition.isEmpty() == false) {
145+
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
146+
}
147+
SocketAccess.doPrivilegedVoid(() -> {
148+
AmazonClientException aex = null;
149+
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
150+
try {
151+
clientReference.client().deleteObjects(deleteRequest);
152+
} catch (AmazonClientException e) {
153+
if (aex == null) {
154+
aex = e;
155+
} else {
156+
aex.addSuppressed(e);
157+
}
158+
}
159+
}
160+
if (aex != null) {
161+
throw aex;
162+
}
163+
});
164+
} catch (final AmazonClientException e) {
165+
throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
166+
}
167+
}
168+
169+
private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
170+
return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true);
171+
}
172+
121173
@Override
122174
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
123175
try (AmazonS3Reference clientReference = blobStore.clientReference()) {

plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
324324
// Delete Multiple Objects
325325
//
326326
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
327-
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), (request) -> {
327+
final RequestHandler bulkDeleteHandler = request -> {
328328
final List<String> deletes = new ArrayList<>();
329329
final List<String> errors = new ArrayList<>();
330330

@@ -344,7 +344,6 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
344344
if (closingOffset != -1) {
345345
offset = offset + startMarker.length();
346346
final String objectName = requestBody.substring(offset, closingOffset);
347-
348347
boolean found = false;
349348
for (Bucket bucket : buckets.values()) {
350349
if (bucket.objects.containsKey(objectName)) {
@@ -369,7 +368,9 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
369368
}
370369
}
371370
return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request");
372-
});
371+
};
372+
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler);
373+
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler);
373374

374375
// non-authorized requests
375376

plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,7 @@ public DeleteObjectsResult deleteObjects(DeleteObjectsRequest request) throws Sd
158158

159159
final List<DeleteObjectsResult.DeletedObject> deletions = new ArrayList<>();
160160
for (DeleteObjectsRequest.KeyVersion key : request.getKeys()) {
161-
if (blobs.remove(key.getKey()) == null) {
162-
AmazonS3Exception exception = new AmazonS3Exception("[" + key + "] does not exist.");
163-
exception.setStatusCode(404);
164-
throw exception;
165-
} else {
161+
if (blobs.remove(key.getKey()) != null) {
166162
DeleteObjectsResult.DeletedObject deletion = new DeleteObjectsResult.DeletedObject();
167163
deletion.setKey(key.getKey());
168164
deletions.add(deletion);

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.InputStream;
2424
import java.nio.file.FileAlreadyExistsException;
2525
import java.nio.file.NoSuchFileException;
26+
import java.util.List;
2627
import java.util.Map;
2728

2829
/**
@@ -96,8 +97,9 @@ public interface BlobContainer {
9697
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
9798
*/
9899
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
100+
99101
/**
100-
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
102+
* Deletes the blob with the given name, if the blob exists. If the blob does not exist,
101103
* this method throws a NoSuchFileException.
102104
*
103105
* @param blobName
@@ -107,6 +109,33 @@ public interface BlobContainer {
107109
*/
108110
void deleteBlob(String blobName) throws IOException;
109111

112+
/**
113+
* Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
114+
* when one or multiple of the given blobs don't exist and simply ignore this case.
115+
*
116+
* @param blobNames The names of the blob to delete.
117+
* @throws IOException if a subset of blob exists but could not be deleted.
118+
*/
119+
default void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
120+
IOException ioe = null;
121+
for (String blobName : blobNames) {
122+
try {
123+
deleteBlob(blobName);
124+
} catch (NoSuchFileException e) {
125+
// ignored
126+
} catch (IOException e) {
127+
if (ioe == null) {
128+
ioe = e;
129+
} else {
130+
ioe.addSuppressed(e);
131+
}
132+
}
133+
}
134+
if (ioe != null) {
135+
throw ioe;
136+
}
137+
}
138+
110139
/**
111140
* Deletes a blob with giving name, ignoring if the blob does not exist.
112141
*

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+31-42
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
import java.io.FilterInputStream;
102102
import java.io.IOException;
103103
import java.io.InputStream;
104-
import java.nio.file.DirectoryNotEmptyException;
105104
import java.nio.file.FileAlreadyExistsException;
106105
import java.nio.file.NoSuchFileException;
107106
import java.util.ArrayList;
@@ -466,22 +465,16 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
466465
final Collection<IndexId> indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values());
467466
indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values());
468467
final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices"));
469-
for (final IndexId indexId : indicesToCleanUp) {
470468
try {
471-
indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId());
472-
} catch (DirectoryNotEmptyException dnee) {
473-
// if the directory isn't empty for some reason, it will fail to clean up;
474-
// we'll ignore that and accept that cleanup didn't fully succeed.
475-
// since we are using UUIDs for path names, this won't be an issue for
476-
// snapshotting indices of the same name
477-
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
478-
"but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee);
469+
indicesBlobContainer.deleteBlobsIgnoringIfNotExists(
470+
indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList()));
479471
} catch (IOException ioe) {
480472
// a different IOException occurred while trying to delete - will just log the issue for now
481-
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
482-
"but failed to clean up its index folder.", metadata.name(), indexId), ioe);
473+
logger.warn(() ->
474+
new ParameterizedMessage(
475+
"[{}] indices {} are no longer part of any snapshots in the repository, " +
476+
"but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe);
483477
}
484-
}
485478
} catch (IOException | ResourceNotFoundException ex) {
486479
throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex);
487480
}
@@ -1018,16 +1011,14 @@ protected void finalize(final List<SnapshotFiles> snapshots,
10181011
try {
10191012
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
10201013
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
1021-
for (final String blobName : blobs.keySet()) {
1022-
if (FsBlobContainer.isTempBlobName(blobName)) {
1023-
try {
1024-
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
1025-
} catch (IOException e) {
1026-
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
1027-
snapshotId, shardId, blobName), e);
1028-
throw e;
1029-
}
1030-
}
1014+
final List<String> blobNames =
1015+
blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
1016+
try {
1017+
blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
1018+
} catch (IOException e) {
1019+
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
1020+
snapshotId, shardId, blobNames), e);
1021+
throw e;
10311022
}
10321023

10331024
// If we deleted all snapshots, we don't need to create a new index file
@@ -1036,28 +1027,26 @@ protected void finalize(final List<SnapshotFiles> snapshots,
10361027
}
10371028

10381029
// Delete old index files
1039-
for (final String blobName : blobs.keySet()) {
1040-
if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
1041-
try {
1042-
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
1043-
} catch (IOException e) {
1044-
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
1045-
snapshotId, shardId, blobName), e);
1046-
throw e;
1047-
}
1048-
}
1030+
final List<String> indexBlobs =
1031+
blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
1032+
try {
1033+
blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
1034+
} catch (IOException e) {
1035+
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
1036+
snapshotId, shardId, indexBlobs), e);
1037+
throw e;
10491038
}
10501039

10511040
// Delete all blobs that don't exist in a snapshot
1052-
for (final String blobName : blobs.keySet()) {
1053-
if (blobName.startsWith(DATA_BLOB_PREFIX) && (updatedSnapshots.findNameFile(canonicalName(blobName)) == null)) {
1054-
try {
1055-
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
1056-
} catch (IOException e) {
1057-
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blob [{}] during finalization",
1058-
snapshotId, shardId, blobName), e);
1059-
}
1060-
}
1041+
final List<String> orphanedBlobs = blobs.keySet().stream()
1042+
.filter(blobName ->
1043+
blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null)
1044+
.collect(Collectors.toList());
1045+
try {
1046+
blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
1047+
} catch (IOException e) {
1048+
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization",
1049+
snapshotId, shardId, orphanedBlobs), e);
10611050
}
10621051
} catch (IOException e) {
10631052
String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]";

test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java

+18
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.nio.file.NoSuchFileException;
3434
import java.util.Arrays;
3535
import java.util.HashMap;
36+
import java.util.List;
3637
import java.util.Map;
3738

3839
import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
@@ -136,6 +137,23 @@ public void testDeleteBlob() throws IOException {
136137
}
137138
}
138139

140+
public void testDeleteBlobs() throws IOException {
141+
try (BlobStore store = newBlobStore()) {
142+
final List<String> blobNames = Arrays.asList("foobar", "barfoo");
143+
final BlobContainer container = store.blobContainer(new BlobPath());
144+
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
145+
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
146+
final BytesArray bytesArray = new BytesArray(data);
147+
for (String blobName : blobNames) {
148+
writeBlob(container, blobName, bytesArray, randomBoolean());
149+
}
150+
assertEquals(container.listBlobs().size(), 2);
151+
container.deleteBlobsIgnoringIfNotExists(blobNames);
152+
assertTrue(container.listBlobs().isEmpty());
153+
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
154+
}
155+
}
156+
139157
public void testDeleteBlobIgnoringIfNotExists() throws IOException {
140158
try (BlobStore store = newBlobStore()) {
141159
BlobPath blobPath = new BlobPath();

0 commit comments

Comments
 (0)