Skip to content

Commit 525f9c0

Browse files
Optimize Azure Directory Delete (#43341)
* Follow up to #43281: * Optimizing the Azure directory delete operation: * Same as with GCS and S3 we can simply flat list a prefix and then delete as we iterate instead of listing the directories recursively. This should require fewer actual list RPC calls and the logic becomes simpler
1 parent 5ad9103 commit 525f9c0

File tree

3 files changed

+67
-38
lines changed

3 files changed

+67
-38
lines changed

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 11 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939
import java.net.HttpURLConnection;
4040
import java.net.URISyntaxException;
4141
import java.nio.file.NoSuchFileException;
42-
import java.util.ArrayList;
43-
import java.util.Collection;
4442
import java.util.List;
4543
import java.util.Map;
4644
import java.util.concurrent.ExecutorService;
@@ -130,56 +128,26 @@ public void deleteBlob(String blobName) throws IOException {
130128

131129
@Override
132130
public void delete() throws IOException {
133-
PlainActionFuture<Void> result = PlainActionFuture.newFuture();
134-
asyncDelete(result);
135131
try {
136-
result.actionGet();
137-
} catch (Exception e) {
138-
throw new IOException("Exception during container delete", e);
139-
}
140-
}
141-
142-
private void asyncDelete(ActionListener<Void> listener) throws IOException {
143-
final Collection<BlobContainer> childContainers = children().values();
144-
if (childContainers.isEmpty() == false) {
145-
final ActionListener<Void> childListener = new GroupedActionListener<>(
146-
ActionListener.wrap(v -> asyncDeleteBlobsIgnoringIfNotExists(
147-
new ArrayList<>(listBlobs().keySet()), listener), listener::onFailure), childContainers.size());
148-
for (BlobContainer container : childContainers) {
149-
threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME).submit(new ActionRunnable<>(childListener) {
150-
@Override
151-
protected void doRun() throws Exception {
152-
((AzureBlobContainer) container).asyncDelete(childListener);
153-
}
154-
});
155-
}
156-
} else {
157-
asyncDeleteBlobsIgnoringIfNotExists(new ArrayList<>(listBlobs().keySet()), listener);
132+
blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
133+
} catch (URISyntaxException | StorageException e) {
134+
throw new IOException(e);
158135
}
159136
}
160137

161138
@Override
162139
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
163140
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
164-
asyncDeleteBlobsIgnoringIfNotExists(blobNames, result);
165-
try {
166-
result.actionGet();
167-
} catch (Exception e) {
168-
throw new IOException("Exception during bulk delete", e);
169-
}
170-
}
171-
172-
private void asyncDeleteBlobsIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> callback) {
173141
if (blobNames.isEmpty()) {
174-
callback.onResponse(null);
142+
result.onResponse(null);
175143
} else {
176144
final GroupedActionListener<Void> listener =
177-
new GroupedActionListener<>(ActionListener.map(callback, v -> null), blobNames.size());
145+
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
178146
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
179147
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
180148
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
181149
for (String blobName : blobNames) {
182-
executor.submit(new ActionRunnable<>(listener) {
150+
executor.execute(new ActionRunnable<>(listener) {
183151
@Override
184152
protected void doRun() throws IOException {
185153
deleteBlobIgnoringIfNotExists(blobName);
@@ -188,6 +156,11 @@ protected void doRun() throws IOException {
188156
});
189157
}
190158
}
159+
try {
160+
result.actionGet();
161+
} catch (Exception e) {
162+
throw new IOException("Exception during bulk delete", e);
163+
}
191164
}
192165

193166
@Override

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.nio.file.FileAlreadyExistsException;
3737
import java.util.Collections;
3838
import java.util.Map;
39+
import java.util.concurrent.Executor;
3940
import java.util.function.Function;
4041
import java.util.stream.Collectors;
4142

@@ -91,6 +92,10 @@ public void deleteBlob(String blob) throws URISyntaxException, StorageException
9192
service.deleteBlob(clientName, container, blob);
9293
}
9394

95+
public void deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException {
96+
service.deleteBlobDirectory(clientName, container, path, executor);
97+
}
98+
9499
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
95100
return service.getInputStream(clientName, container, blob);
96101
}

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.logging.log4j.LogManager;
4040
import org.apache.logging.log4j.Logger;
4141
import org.apache.logging.log4j.message.ParameterizedMessage;
42+
import org.elasticsearch.action.support.PlainActionFuture;
4243
import org.elasticsearch.common.blobstore.BlobMetaData;
4344
import org.elasticsearch.common.blobstore.BlobPath;
4445
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
@@ -47,6 +48,7 @@
4748
import org.elasticsearch.common.settings.SettingsException;
4849
import org.elasticsearch.common.unit.ByteSizeUnit;
4950
import org.elasticsearch.common.unit.ByteSizeValue;
51+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5052

5153
import java.io.IOException;
5254
import java.io.InputStream;
@@ -55,11 +57,16 @@
5557
import java.net.URISyntaxException;
5658
import java.nio.file.FileAlreadyExistsException;
5759
import java.security.InvalidKeyException;
60+
import java.util.ArrayList;
61+
import java.util.Collection;
62+
import java.util.Collections;
5863
import java.util.EnumSet;
5964
import java.util.HashMap;
6065
import java.util.HashSet;
6166
import java.util.Map;
6267
import java.util.Set;
68+
import java.util.concurrent.Executor;
69+
import java.util.concurrent.atomic.AtomicLong;
6370
import java.util.function.Supplier;
6471

6572
import static java.util.Collections.emptyMap;
@@ -185,6 +192,50 @@ public void deleteBlob(String account, String container, String blob) throws URI
185192
});
186193
}
187194

195+
void deleteBlobDirectory(String account, String container, String path, Executor executor)
196+
throws URISyntaxException, StorageException, IOException {
197+
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
198+
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
199+
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
200+
final AtomicLong outstanding = new AtomicLong(1L);
201+
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
202+
SocketAccess.doPrivilegedVoidException(() -> {
203+
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
204+
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
205+
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
206+
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
207+
outstanding.incrementAndGet();
208+
executor.execute(new AbstractRunnable() {
209+
@Override
210+
protected void doRun() throws Exception {
211+
deleteBlob(account, container, blobPath);
212+
}
213+
214+
@Override
215+
public void onFailure(Exception e) {
216+
exceptions.add(e);
217+
}
218+
219+
@Override
220+
public void onAfter() {
221+
if (outstanding.decrementAndGet() == 0) {
222+
result.onResponse(null);
223+
}
224+
}
225+
});
226+
}
227+
});
228+
if (outstanding.decrementAndGet() == 0) {
229+
result.onResponse(null);
230+
}
231+
result.actionGet();
232+
if (exceptions.isEmpty() == false) {
233+
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
234+
exceptions.forEach(ex::addSuppressed);
235+
throw ex;
236+
}
237+
}
238+
188239
public InputStream getInputStream(String account, String container, String blob)
189240
throws URISyntaxException, StorageException, IOException {
190241
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);

0 commit comments

Comments
 (0)