Skip to content

Commit b51ea25

Browse files
Use Azure Bulk Deletes in Azure Repository (#53919) (#53967)
Now that we upgraded the Azure SDK to 8.6.2 in #53865 we can make use of bulk deletes.
1 parent aef7b89 commit b51ea25

File tree

10 files changed

+112
-132
lines changed

10 files changed

+112
-132
lines changed

plugins/repository-azure/qa/microsoft-azure-storage/build.gradle

-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,5 @@ testClusters.integTest {
8888
// in a hacky way to change the protocol and endpoint. We must fix that.
8989
setting 'azure.client.integration_test.endpoint_suffix',
9090
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE
91-
String firstPartOfSeed = BuildParams.testSeed.tokenize(':').get(0)
92-
setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString(), System.getProperty('ignore.tests.seed') == null ? DEFAULT : IGNORE_VALUE
9391
}
9492
}

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

+5-36
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,12 @@
2323
import com.microsoft.azure.storage.StorageException;
2424
import org.apache.logging.log4j.LogManager;
2525
import org.apache.logging.log4j.Logger;
26-
import org.elasticsearch.action.ActionListener;
27-
import org.elasticsearch.action.ActionRunnable;
28-
import org.elasticsearch.action.support.GroupedActionListener;
29-
import org.elasticsearch.action.support.PlainActionFuture;
3026
import org.elasticsearch.common.Nullable;
3127
import org.elasticsearch.common.blobstore.BlobContainer;
3228
import org.elasticsearch.common.blobstore.BlobMetaData;
3329
import org.elasticsearch.common.blobstore.BlobPath;
3430
import org.elasticsearch.common.blobstore.DeleteResult;
3531
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
36-
import org.elasticsearch.threadpool.ThreadPool;
3732

3833
import java.io.IOException;
3934
import java.io.InputStream;
@@ -42,20 +37,18 @@
4237
import java.nio.file.NoSuchFileException;
4338
import java.util.List;
4439
import java.util.Map;
45-
import java.util.concurrent.ExecutorService;
40+
import java.util.stream.Collectors;
4641

4742
public class AzureBlobContainer extends AbstractBlobContainer {
4843

4944
private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
5045
private final AzureBlobStore blobStore;
51-
private final ThreadPool threadPool;
5246
private final String keyPath;
5347

54-
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) {
48+
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
5549
super(path);
5650
this.blobStore = blobStore;
5751
this.keyPath = path.buildAsString();
58-
this.threadPool = threadPool;
5952
}
6053

6154
private boolean blobExists(String blobName) {
@@ -112,41 +105,17 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
112105
@Override
113106
public DeleteResult delete() throws IOException {
114107
try {
115-
return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
108+
return blobStore.deleteBlobDirectory(keyPath);
116109
} catch (URISyntaxException | StorageException e) {
117110
throw new IOException(e);
118111
}
119112
}
120113

121114
@Override
122115
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
123-
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
124-
if (blobNames.isEmpty()) {
125-
result.onResponse(null);
126-
} else {
127-
final GroupedActionListener<Void> listener =
128-
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
129-
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
130-
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
131-
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
132-
for (String blobName : blobNames) {
133-
executor.execute(ActionRunnable.run(listener, () -> {
134-
logger.trace("deleteBlob({})", blobName);
135-
try {
136-
blobStore.deleteBlob(buildKey(blobName));
137-
} catch (StorageException e) {
138-
if (e.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
139-
throw new IOException(e);
140-
}
141-
} catch (URISyntaxException e) {
142-
throw new IOException(e);
143-
}
144-
}));
145-
}
146-
}
147116
try {
148-
result.actionGet();
149-
} catch (Exception e) {
117+
blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
118+
} catch (URISyntaxException | StorageException e) {
150119
throw new IOException("Exception during bulk delete", e);
151120
}
152121
}

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

+8-12
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,13 @@
2828
import org.elasticsearch.common.blobstore.BlobStore;
2929
import org.elasticsearch.common.blobstore.DeleteResult;
3030
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
31-
import org.elasticsearch.threadpool.ThreadPool;
3231

3332
import java.io.IOException;
3433
import java.io.InputStream;
3534
import java.net.URISyntaxException;
35+
import java.util.Collection;
3636
import java.util.Collections;
3737
import java.util.Map;
38-
import java.util.concurrent.Executor;
3938
import java.util.function.Function;
4039
import java.util.stream.Collectors;
4140

@@ -44,17 +43,15 @@
4443
public class AzureBlobStore implements BlobStore {
4544

4645
private final AzureStorageService service;
47-
private final ThreadPool threadPool;
4846

4947
private final String clientName;
5048
private final String container;
5149
private final LocationMode locationMode;
5250

53-
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) {
51+
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
5452
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
5553
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
5654
this.service = service;
57-
this.threadPool = threadPool;
5855
// locationMode is set per repository, not per client
5956
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
6057
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
@@ -80,7 +77,7 @@ public LocationMode getLocationMode() {
8077

8178
@Override
8279
public BlobContainer blobContainer(BlobPath path) {
83-
return new AzureBlobContainer(path, this, threadPool);
80+
return new AzureBlobContainer(path, this);
8481
}
8582

8683
@Override
@@ -91,13 +88,12 @@ public boolean blobExists(String blob) throws URISyntaxException, StorageExcepti
9188
return service.blobExists(clientName, container, blob);
9289
}
9390

94-
public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
95-
service.deleteBlob(clientName, container, blob);
91+
public void deleteBlobsIgnoringIfNotExists(Collection<String> blobs) throws URISyntaxException, StorageException {
92+
service.deleteBlobsIgnoringIfNotExists(clientName, container, blobs);
9693
}
9794

98-
public DeleteResult deleteBlobDirectory(String path, Executor executor)
99-
throws URISyntaxException, StorageException, IOException {
100-
return service.deleteBlobDirectory(clientName, container, path, executor);
95+
public DeleteResult deleteBlobDirectory(String path) throws URISyntaxException, StorageException, IOException {
96+
return service.deleteBlobDirectory(clientName, container, path);
10197
}
10298

10399
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
@@ -111,7 +107,7 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix
111107

112108
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
113109
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
114-
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
110+
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this))));
115111
}
116112

117113
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ protected BlobStore getBlobStore() {
115115

116116
@Override
117117
protected AzureBlobStore createBlobStore() {
118-
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool);
118+
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
119119

120120
logger.debug(() -> new ParameterizedMessage(
121121
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java

-15
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,12 @@
2323
import org.elasticsearch.common.settings.Setting;
2424
import org.elasticsearch.common.settings.Settings;
2525
import org.elasticsearch.common.settings.SettingsException;
26-
import org.elasticsearch.common.unit.TimeValue;
2726
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2827
import org.elasticsearch.env.Environment;
2928
import org.elasticsearch.plugins.Plugin;
3029
import org.elasticsearch.plugins.ReloadablePlugin;
3130
import org.elasticsearch.plugins.RepositoryPlugin;
3231
import org.elasticsearch.repositories.Repository;
33-
import org.elasticsearch.threadpool.ExecutorBuilder;
34-
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
35-
3632
import java.util.Arrays;
3733
import java.util.Collections;
3834
import java.util.List;
@@ -43,8 +39,6 @@
4339
*/
4440
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {
4541

46-
public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure";
47-
4842
// protected for testing
4943
final AzureStorageService azureStoreService;
5044

@@ -80,15 +74,6 @@ public List<Setting<?>> getSettings() {
8074
);
8175
}
8276

83-
@Override
84-
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
85-
return Collections.singletonList(executorBuilder());
86-
}
87-
88-
public static ExecutorBuilder<?> executorBuilder() {
89-
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L));
90-
}
91-
9277
@Override
9378
public void reload(Settings settings) {
9479
// secure settings should be readable

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java

+44-53
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
package org.elasticsearch.repositories.azure;
2121

2222
import com.microsoft.azure.storage.AccessCondition;
23+
import com.microsoft.azure.storage.BatchException;
2324
import com.microsoft.azure.storage.CloudStorageAccount;
25+
import com.microsoft.azure.storage.Constants;
2426
import com.microsoft.azure.storage.OperationContext;
2527
import com.microsoft.azure.storage.RetryExponentialRetry;
2628
import com.microsoft.azure.storage.RetryPolicy;
2729
import com.microsoft.azure.storage.RetryPolicyFactory;
2830
import com.microsoft.azure.storage.StorageErrorCodeStrings;
2931
import com.microsoft.azure.storage.StorageException;
32+
import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
3033
import com.microsoft.azure.storage.blob.BlobInputStream;
3134
import com.microsoft.azure.storage.blob.BlobListingDetails;
3235
import com.microsoft.azure.storage.blob.BlobProperties;
@@ -42,7 +45,6 @@
4245
import org.apache.logging.log4j.LogManager;
4346
import org.apache.logging.log4j.Logger;
4447
import org.apache.logging.log4j.message.ParameterizedMessage;
45-
import org.elasticsearch.action.support.PlainActionFuture;
4648
import org.elasticsearch.common.blobstore.BlobMetaData;
4749
import org.elasticsearch.common.blobstore.BlobPath;
4850
import org.elasticsearch.common.blobstore.DeleteResult;
@@ -53,7 +55,6 @@
5355
import org.elasticsearch.common.settings.SettingsException;
5456
import org.elasticsearch.common.unit.ByteSizeUnit;
5557
import org.elasticsearch.common.unit.ByteSizeValue;
56-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5758

5859
import java.io.IOException;
5960
import java.io.InputStream;
@@ -67,9 +68,10 @@
6768
import java.util.Collections;
6869
import java.util.EnumSet;
6970
import java.util.HashSet;
71+
import java.util.Iterator;
72+
import java.util.List;
7073
import java.util.Map;
7174
import java.util.Set;
72-
import java.util.concurrent.Executor;
7375
import java.util.concurrent.atomic.AtomicLong;
7476
import java.util.function.Supplier;
7577

@@ -188,72 +190,61 @@ public boolean blobExists(String account, String container, String blob) throws
188190
});
189191
}
190192

191-
public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException {
193+
public void deleteBlobsIgnoringIfNotExists(String account, String container, Collection<String> blobs)
194+
throws URISyntaxException, StorageException {
195+
logger.trace(() -> new ParameterizedMessage("delete blobs for container [{}], blob [{}]", container, blobs));
192196
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
193197
// Container name must be lower case.
194198
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
195-
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
196-
SocketAccess.doPrivilegedVoidException(() -> {
197-
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
198-
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
199-
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
200-
});
199+
final Iterator<String> blobIterator = blobs.iterator();
200+
int currentBatchSize = 0;
201+
while (blobIterator.hasNext()) {
202+
final BlobDeleteBatchOperation batchDeleteOp = new BlobDeleteBatchOperation();
203+
do {
204+
batchDeleteOp.addSubOperation(blobContainer.getBlockBlobReference(blobIterator.next()),
205+
DeleteSnapshotsOption.NONE, null, null);
206+
++currentBatchSize;
207+
} while (blobIterator.hasNext() && currentBatchSize < Constants.BATCH_MAX_REQUESTS);
208+
currentBatchSize = 0;
209+
try {
210+
SocketAccess.doPrivilegedVoidException(() -> blobContainer.getServiceClient().executeBatch(batchDeleteOp));
211+
} catch (BatchException e) {
212+
for (StorageException ex : e.getExceptions().values()) {
213+
if (ex.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
214+
logger.error("Batch exceptions [{}]", e.getExceptions());
215+
throw e;
216+
}
217+
}
218+
}
219+
}
201220
}
202221

203-
DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor)
222+
DeleteResult deleteBlobDirectory(String account, String container, String path)
204223
throws URISyntaxException, StorageException, IOException {
205224
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
206225
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
207-
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
208-
final AtomicLong outstanding = new AtomicLong(1L);
209-
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
210226
final AtomicLong blobsDeleted = new AtomicLong();
211227
final AtomicLong bytesDeleted = new AtomicLong();
228+
final List<String> blobsToDelete = new ArrayList<>();
212229
SocketAccess.doPrivilegedVoidException(() -> {
213-
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
230+
for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
214231
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
215232
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
216233
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
217-
outstanding.incrementAndGet();
218-
executor.execute(new AbstractRunnable() {
219-
@Override
220-
protected void doRun() throws Exception {
221-
final long len;
222-
if (blobItem instanceof CloudBlob) {
223-
len = ((CloudBlob) blobItem).getProperties().getLength();
224-
} else {
225-
len = -1L;
226-
}
227-
deleteBlob(account, container, blobPath);
228-
blobsDeleted.incrementAndGet();
229-
if (len >= 0) {
230-
bytesDeleted.addAndGet(len);
231-
}
232-
}
233-
234-
@Override
235-
public void onFailure(Exception e) {
236-
exceptions.add(e);
237-
}
238-
239-
@Override
240-
public void onAfter() {
241-
if (outstanding.decrementAndGet() == 0) {
242-
result.onResponse(null);
243-
}
244-
}
245-
});
234+
final long len;
235+
if (blobItem instanceof CloudBlob) {
236+
len = ((CloudBlob) blobItem).getProperties().getLength();
237+
} else {
238+
len = -1L;
239+
}
240+
blobsToDelete.add(blobPath);
241+
blobsDeleted.incrementAndGet();
242+
if (len >= 0) {
243+
bytesDeleted.addAndGet(len);
244+
}
246245
}
247246
});
248-
if (outstanding.decrementAndGet() == 0) {
249-
result.onResponse(null);
250-
}
251-
result.actionGet();
252-
if (exceptions.isEmpty() == false) {
253-
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
254-
exceptions.forEach(ex::addSuppressed);
255-
throw ex;
256-
}
247+
deleteBlobsIgnoringIfNotExists(account, container, blobsToDelete);
257248
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
258249
}
259250

0 commit comments

Comments
 (0)