Skip to content

Commit be20fb8

Browse files
Recursive Delete on BlobContainer (#43281) (#43920)
This is a prerequisite of #42189: * Add directory delete method to blob container specific to each implementation: * Some notes on the implementations: * AWS + GCS: We can simply exploit the fact that both AWS and GCS return blobs lexicographically ordered which allows us to simply delete in the same order that we receive the blobs from the listing request. For AWS this simply required listing without the delimiter setting (so we get a deep listing) and for GCS the same behavior is achieved by not using the directory mode on the listing invocation. The nice thing about this is, that even for very large numbers of blobs the memory requirements are now capped nicely since we go page by page when deleting. * For Azure I extended the parallelization to the listing calls as well and made it work recursively. I verified that this works with thread count `1` since we only block once in the initial thread and then fan out to a "graph" of child listeners that never block. * HDFS and FS are trivial since we have directory delete methods available for them * Enhances third party tests to ensure the new functionality works (I manually ran them for all cloud providers)
1 parent 49d69bf commit be20fb8

File tree

14 files changed

+229
-38
lines changed

14 files changed

+229
-38
lines changed

modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public void deleteBlob(String blobName) throws IOException {
9696
throw new UnsupportedOperationException("URL repository is read only");
9797
}
9898

99+
@Override
100+
public void delete() {
101+
throw new UnsupportedOperationException("URL repository is read only");
102+
}
103+
99104
/**
100105
* This operation is not supported by URLBlobContainer
101106
*/

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.microsoft.azure.storage.StorageException;
2424
import org.apache.logging.log4j.LogManager;
2525
import org.apache.logging.log4j.Logger;
26+
import org.elasticsearch.action.ActionListener;
2627
import org.elasticsearch.action.ActionRunnable;
2728
import org.elasticsearch.action.support.GroupedActionListener;
2829
import org.elasticsearch.action.support.PlainActionFuture;
@@ -38,7 +39,6 @@
3839
import java.net.HttpURLConnection;
3940
import java.net.URISyntaxException;
4041
import java.nio.file.NoSuchFileException;
41-
import java.util.Collection;
4242
import java.util.List;
4343
import java.util.Map;
4444
import java.util.concurrent.ExecutorService;
@@ -126,24 +126,35 @@ public void deleteBlob(String blobName) throws IOException {
126126
}
127127
}
128128

129+
@Override
130+
public void delete() throws IOException {
131+
try {
132+
blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
133+
} catch (URISyntaxException | StorageException e) {
134+
throw new IOException(e);
135+
}
136+
}
137+
129138
@Override
130139
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
140+
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
131141
if (blobNames.isEmpty()) {
132-
return;
133-
}
134-
final PlainActionFuture<Collection<Void>> result = PlainActionFuture.newFuture();
135-
final GroupedActionListener<Void> listener = new GroupedActionListener<>(result, blobNames.size());
136-
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
137-
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint.
138-
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
139-
for (String blobName : blobNames) {
140-
executor.submit(new ActionRunnable<Void>(listener) {
141-
@Override
142-
protected void doRun() throws IOException {
143-
deleteBlobIgnoringIfNotExists(blobName);
144-
listener.onResponse(null);
145-
}
146-
});
142+
result.onResponse(null);
143+
} else {
144+
final GroupedActionListener<Void> listener =
145+
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
146+
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
147+
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
148+
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
149+
for (String blobName : blobNames) {
150+
executor.execute(new ActionRunnable<Void>(listener) {
151+
@Override
152+
protected void doRun() throws IOException {
153+
deleteBlobIgnoringIfNotExists(blobName);
154+
listener.onResponse(null);
155+
}
156+
});
157+
}
147158
}
148159
try {
149160
result.actionGet();

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.nio.file.FileAlreadyExistsException;
3737
import java.util.Collections;
3838
import java.util.Map;
39+
import java.util.concurrent.Executor;
3940
import java.util.function.Function;
4041
import java.util.stream.Collectors;
4142

@@ -91,6 +92,10 @@ public void deleteBlob(String blob) throws URISyntaxException, StorageException
9192
service.deleteBlob(clientName, container, blob);
9293
}
9394

95+
public void deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException {
96+
service.deleteBlobDirectory(clientName, container, path, executor);
97+
}
98+
9499
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
95100
return service.getInputStream(clientName, container, blob);
96101
}

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.logging.log4j.LogManager;
4141
import org.apache.logging.log4j.Logger;
4242
import org.apache.logging.log4j.message.ParameterizedMessage;
43+
import org.elasticsearch.action.support.PlainActionFuture;
4344
import org.elasticsearch.common.blobstore.BlobMetaData;
4445
import org.elasticsearch.common.blobstore.BlobPath;
4546
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
@@ -49,6 +50,7 @@
4950
import org.elasticsearch.common.settings.SettingsException;
5051
import org.elasticsearch.common.unit.ByteSizeUnit;
5152
import org.elasticsearch.common.unit.ByteSizeValue;
53+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5254

5355
import java.io.IOException;
5456
import java.io.InputStream;
@@ -57,11 +59,15 @@
5759
import java.net.URISyntaxException;
5860
import java.nio.file.FileAlreadyExistsException;
5961
import java.security.InvalidKeyException;
62+
import java.util.ArrayList;
63+
import java.util.Collection;
6064
import java.util.Collections;
6165
import java.util.EnumSet;
6266
import java.util.HashSet;
6367
import java.util.Map;
6468
import java.util.Set;
69+
import java.util.concurrent.Executor;
70+
import java.util.concurrent.atomic.AtomicLong;
6571
import java.util.function.Supplier;
6672

6773
import static java.util.Collections.emptyMap;
@@ -187,6 +193,50 @@ public void deleteBlob(String account, String container, String blob) throws URI
187193
});
188194
}
189195

196+
void deleteBlobDirectory(String account, String container, String path, Executor executor)
197+
throws URISyntaxException, StorageException, IOException {
198+
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
199+
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
200+
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
201+
final AtomicLong outstanding = new AtomicLong(1L);
202+
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
203+
SocketAccess.doPrivilegedVoidException(() -> {
204+
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
205+
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
206+
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
207+
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
208+
outstanding.incrementAndGet();
209+
executor.execute(new AbstractRunnable() {
210+
@Override
211+
protected void doRun() throws Exception {
212+
deleteBlob(account, container, blobPath);
213+
}
214+
215+
@Override
216+
public void onFailure(Exception e) {
217+
exceptions.add(e);
218+
}
219+
220+
@Override
221+
public void onAfter() {
222+
if (outstanding.decrementAndGet() == 0) {
223+
result.onResponse(null);
224+
}
225+
}
226+
});
227+
}
228+
});
229+
if (outstanding.decrementAndGet() == 0) {
230+
result.onResponse(null);
231+
}
232+
result.actionGet();
233+
if (exceptions.isEmpty() == false) {
234+
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
235+
exceptions.forEach(ex::addSuppressed);
236+
throw ex;
237+
}
238+
}
239+
190240
public InputStream getInputStream(String account, String container, String blob)
191241
throws URISyntaxException, StorageException, IOException {
192242
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ public void deleteBlob(String blobName) throws IOException {
8686
blobStore.deleteBlob(buildKey(blobName));
8787
}
8888

89+
@Override
90+
public void delete() throws IOException {
91+
blobStore.deleteDirectory(path().buildAsString());
92+
}
93+
8994
@Override
9095
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
9196
blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.repositories.gcs;
2121

22+
import com.google.api.gax.paging.Page;
2223
import com.google.cloud.BatchResult;
2324
import com.google.cloud.ReadChannel;
2425
import com.google.cloud.WriteChannel;
@@ -306,6 +307,23 @@ void deleteBlob(String blobName) throws IOException {
306307
}
307308
}
308309

310+
/**
311+
* Deletes the given path and all its children.
312+
*
313+
* @param pathStr Name of path to delete
314+
*/
315+
void deleteDirectory(String pathStr) throws IOException {
316+
SocketAccess.doPrivilegedVoidIOException(() -> {
317+
Page<Blob> page = client().get(bucketName).list(BlobListOption.prefix(pathStr));
318+
do {
319+
final Collection<String> blobsToDelete = new ArrayList<>();
320+
page.getValues().forEach(b -> blobsToDelete.add(b.getName()));
321+
deleteBlobsIgnoringIfNotExists(blobsToDelete);
322+
page = page.getNextPage();
323+
} while (page != null);
324+
});
325+
}
326+
309327
/**
310328
* Deletes multiple blobs from the specific bucket using a batch request
311329
*

plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public void deleteBlob(String blobName) throws IOException {
7878
}
7979
}
8080

81+
@Override
82+
public void delete() throws IOException {
83+
store.execute(fileContext -> fileContext.delete(path, true));
84+
}
85+
8186
@Override
8287
public InputStream readBlob(String blobName) throws IOException {
8388
// FSDataInputStream does buffering internally

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.ArrayList;
5555
import java.util.Collections;
5656
import java.util.HashMap;
57+
import java.util.HashSet;
5758
import java.util.List;
5859
import java.util.Map;
5960
import java.util.Set;
@@ -130,12 +131,53 @@ public void deleteBlob(String blobName) throws IOException {
130131
deleteBlobIgnoringIfNotExists(blobName);
131132
}
132133

134+
@Override
135+
public void delete() throws IOException {
136+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
137+
ObjectListing prevListing = null;
138+
while (true) {
139+
ObjectListing list;
140+
if (prevListing != null) {
141+
final ObjectListing finalPrevListing = prevListing;
142+
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
143+
} else {
144+
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
145+
listObjectsRequest.setBucketName(blobStore.bucket());
146+
listObjectsRequest.setPrefix(keyPath);
147+
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
148+
}
149+
final List<String> blobsToDelete =
150+
list.getObjectSummaries().stream().map(S3ObjectSummary::getKey).collect(Collectors.toList());
151+
if (list.isTruncated()) {
152+
doDeleteBlobs(blobsToDelete, false);
153+
prevListing = list;
154+
} else {
155+
final List<String> lastBlobsToDelete = new ArrayList<>(blobsToDelete);
156+
lastBlobsToDelete.add(keyPath);
157+
doDeleteBlobs(lastBlobsToDelete, false);
158+
break;
159+
}
160+
}
161+
} catch (final AmazonClientException e) {
162+
throw new IOException("Exception when deleting blob container [" + keyPath + "]", e);
163+
}
164+
}
165+
133166
@Override
134167
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
168+
doDeleteBlobs(blobNames, true);
169+
}
170+
171+
private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOException {
135172
if (blobNames.isEmpty()) {
136173
return;
137174
}
138-
final Set<String> outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
175+
final Set<String> outstanding;
176+
if (relative) {
177+
outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
178+
} else {
179+
outstanding = new HashSet<>(blobNames);
180+
}
139181
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
140182
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
141183
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();

plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,11 @@ protected void assertChildren(BlobPath path, Collection<String> children) throws
8888
// to become consistent.
8989
assertBusy(() -> super.assertChildren(path, children), 10L, TimeUnit.MINUTES);
9090
}
91+
92+
@Override
93+
protected void assertDeleted(BlobPath path, String name) throws Exception {
94+
// AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
95+
// to become consistent.
96+
assertBusy(() -> super.assertDeleted(path, name), 10L, TimeUnit.MINUTES);
97+
}
9198
}

server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ public interface BlobContainer {
109109
*/
110110
void deleteBlob(String blobName) throws IOException;
111111

112+
/**
113+
* Deletes this container and all its contents from the repository.
114+
* @throws IOException on failure
115+
*/
116+
void delete() throws IOException;
117+
112118
/**
113119
* Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
114120
* when one or multiple of the given blobs don't exist and simply ignore this case.

server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.common.blobstore;
2121

22+
import org.elasticsearch.common.Nullable;
23+
2224
import java.util.ArrayList;
2325
import java.util.Collections;
2426
import java.util.Iterator;
@@ -68,6 +70,20 @@ public String buildAsString() {
6870
return p + SEPARATOR;
6971
}
7072

73+
/**
74+
* Returns this path's parent path.
75+
*
76+
* @return Parent path or {@code null} if there is none
77+
*/
78+
@Nullable
79+
public BlobPath parent() {
80+
if (paths.isEmpty()) {
81+
return null;
82+
} else {
83+
return new BlobPath(new ArrayList<>(paths.subList(0, paths.size() - 1)));
84+
}
85+
}
86+
7187
@Override
7288
public String toString() {
7389
StringBuilder sb = new StringBuilder();

server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx
122122
}
123123
}
124124

125+
@Override
126+
public void delete() throws IOException {
127+
IOUtils.rm(path);
128+
}
129+
125130
@Override
126131
public boolean blobExists(String blobName) {
127132
return Files.exists(path.resolve(blobName));

server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ public void deleteBlob(String blobName) throws IOException {
6464
delegate.deleteBlob(blobName);
6565
}
6666

67+
@Override
68+
public void delete() throws IOException {
69+
delegate.delete();
70+
}
71+
6772
@Override
6873
public void deleteBlobIgnoringIfNotExists(final String blobName) throws IOException {
6974
delegate.deleteBlobIgnoringIfNotExists(blobName);

0 commit comments

Comments
 (0)