Skip to content

Add Bulk Delete Api to BlobStore (#40322) #41253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand Down Expand Up @@ -56,6 +57,12 @@

class S3BlobContainer extends AbstractBlobContainer {

/**
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
*/
private static final int MAX_BULK_DELETES = 1000;

private final S3BlobStore blobStore;
private final String keyPath;

Expand Down Expand Up @@ -118,6 +125,51 @@ public void deleteBlob(String blobName) throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
return;
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String blob : blobNames) {
partition.add(buildKey(blob));
if (partition.size() == MAX_BULK_DELETES ) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
}
}
if (partition.isEmpty() == false) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
}
SocketAccess.doPrivilegedVoid(() -> {
AmazonClientException aex = null;
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
try {
clientReference.client().deleteObjects(deleteRequest);
} catch (AmazonClientException e) {
if (aex == null) {
aex = e;
} else {
aex.addSuppressed(e);
}
}
}
if (aex != null) {
throw aex;
}
});
} catch (final AmazonClientException e) {
throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
}
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true);
}

@Override
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
// Delete Multiple Objects
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), (request) -> {
final RequestHandler bulkDeleteHandler = request -> {
final List<String> deletes = new ArrayList<>();
final List<String> errors = new ArrayList<>();

Expand All @@ -344,7 +344,6 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
if (closingOffset != -1) {
offset = offset + startMarker.length();
final String objectName = requestBody.substring(offset, closingOffset);

boolean found = false;
for (Bucket bucket : buckets.values()) {
if (bucket.objects.containsKey(objectName)) {
Expand All @@ -369,7 +368,9 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
}
}
return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request");
});
};
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler);
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler);

// non-authorized requests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,7 @@ public DeleteObjectsResult deleteObjects(DeleteObjectsRequest request) throws Sd

final List<DeleteObjectsResult.DeletedObject> deletions = new ArrayList<>();
for (DeleteObjectsRequest.KeyVersion key : request.getKeys()) {
if (blobs.remove(key.getKey()) == null) {
AmazonS3Exception exception = new AmazonS3Exception("[" + key + "] does not exist.");
exception.setStatusCode(404);
throw exception;
} else {
if (blobs.remove(key.getKey()) != null) {
DeleteObjectsResult.DeletedObject deletion = new DeleteObjectsResult.DeletedObject();
deletion.setKey(key.getKey());
deletions.add(deletion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -96,8 +97,9 @@ public interface BlobContainer {
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
* Deletes the blob with the given name, if the blob exists. If the blob does not exist,
* this method throws a NoSuchFileException.
*
* @param blobName
Expand All @@ -107,6 +109,33 @@ public interface BlobContainer {
*/
void deleteBlob(String blobName) throws IOException;

/**
* Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
* when one or multiple of the given blobs don't exist and simply ignore this case.
*
* @param blobNames The names of the blob to delete.
* @throws IOException if a subset of blob exists but could not be deleted.
*/
default void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
IOException ioe = null;
for (String blobName : blobNames) {
try {
deleteBlob(blobName);
} catch (NoSuchFileException e) {
// ignored
} catch (IOException e) {
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
if (ioe != null) {
throw ioe;
}
}

/**
* Deletes a blob with giving name, ignoring if the blob does not exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
Expand Down Expand Up @@ -464,22 +463,16 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
final Collection<IndexId> indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values());
indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values());
final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices"));
for (final IndexId indexId : indicesToCleanUp) {
try {
indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId());
} catch (DirectoryNotEmptyException dnee) {
// if the directory isn't empty for some reason, it will fail to clean up;
// we'll ignore that and accept that cleanup didn't fully succeed.
// since we are using UUIDs for path names, this won't be an issue for
// snapshotting indices of the same name
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
"but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee);
indicesBlobContainer.deleteBlobsIgnoringIfNotExists(
indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException ioe) {
// a different IOException occurred while trying to delete - will just log the issue for now
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
"but failed to clean up its index folder.", metadata.name(), indexId), ioe);
logger.warn(() ->
new ParameterizedMessage(
"[{}] indices {} are no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe);
}
}
} catch (IOException | ResourceNotFoundException ex) {
throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex);
}
Expand Down Expand Up @@ -1016,16 +1009,14 @@ protected void finalize(final List<SnapshotFiles> snapshots,
try {
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
for (final String blobName : blobs.keySet()) {
if (FsBlobContainer.isTempBlobName(blobName)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
snapshotId, shardId, blobName), e);
throw e;
}
}
final List<String> blobNames =
blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
snapshotId, shardId, blobNames), e);
throw e;
}

// If we deleted all snapshots, we don't need to create a new index file
Expand All @@ -1034,28 +1025,26 @@ protected void finalize(final List<SnapshotFiles> snapshots,
}

// Delete old index files
for (final String blobName : blobs.keySet()) {
if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
snapshotId, shardId, blobName), e);
throw e;
}
}
final List<String> indexBlobs =
blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
snapshotId, shardId, indexBlobs), e);
throw e;
}

// Delete all blobs that don't exist in a snapshot
for (final String blobName : blobs.keySet()) {
if (blobName.startsWith(DATA_BLOB_PREFIX) && (updatedSnapshots.findNameFile(canonicalName(blobName)) == null)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blob [{}] during finalization",
snapshotId, shardId, blobName), e);
}
}
final List<String> orphanedBlobs = blobs.keySet().stream()
.filter(blobName ->
blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null)
.collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization",
snapshotId, shardId, orphanedBlobs), e);
}
} catch (IOException e) {
String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
Expand Down Expand Up @@ -136,6 +137,23 @@ public void testDeleteBlob() throws IOException {
}
}

public void testDeleteBlobs() throws IOException {
try (BlobStore store = newBlobStore()) {
final List<String> blobNames = Arrays.asList("foobar", "barfoo");
final BlobContainer container = store.blobContainer(new BlobPath());
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
final BytesArray bytesArray = new BytesArray(data);
for (String blobName : blobNames) {
writeBlob(container, blobName, bytesArray, randomBoolean());
}
assertEquals(container.listBlobs().size(), 2);
container.deleteBlobsIgnoringIfNotExists(blobNames);
assertTrue(container.listBlobs().isEmpty());
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
}
}

public void testDeleteBlobIgnoringIfNotExists() throws IOException {
try (BlobStore store = newBlobStore()) {
BlobPath blobPath = new BlobPath();
Expand Down