Skip to content

Commit 8b41126

Browse files
Enable Parallel Deletes in Azure Repository (#42783)
* Parallel deletes via private thread pool
1 parent 3cfa4a6 commit 8b41126

File tree

9 files changed

+101
-12
lines changed

9 files changed

+101
-12
lines changed

plugins/repository-azure/qa/microsoft-azure-storage/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ testClusters.integTest {
7171
// in a hacky way to change the protocol and endpoint. We must fix that.
7272
setting 'azure.client.integration_test.endpoint_suffix',
7373
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${azureStorageFixture.addressAndPort }" }
74+
String firstPartOfSeed = project.rootProject.testSeed.tokenize(':').get(0)
75+
setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString()
7476
} else {
7577
println "Using an external service to test the repository-azure plugin"
7678
}

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,37 @@
2323
import com.microsoft.azure.storage.StorageException;
2424
import org.apache.logging.log4j.LogManager;
2525
import org.apache.logging.log4j.Logger;
26+
import org.elasticsearch.action.ActionRunnable;
27+
import org.elasticsearch.action.support.GroupedActionListener;
28+
import org.elasticsearch.action.support.PlainActionFuture;
2629
import org.elasticsearch.common.Nullable;
2730
import org.elasticsearch.common.blobstore.BlobMetaData;
2831
import org.elasticsearch.common.blobstore.BlobPath;
2932
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
33+
import org.elasticsearch.threadpool.ThreadPool;
3034

3135
import java.io.IOException;
3236
import java.io.InputStream;
3337
import java.net.HttpURLConnection;
3438
import java.net.URISyntaxException;
3539
import java.nio.file.NoSuchFileException;
40+
import java.util.Collection;
41+
import java.util.List;
3642
import java.util.Map;
43+
import java.util.concurrent.ExecutorService;
3744

3845
public class AzureBlobContainer extends AbstractBlobContainer {
3946

4047
private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
4148
private final AzureBlobStore blobStore;
42-
49+
private final ThreadPool threadPool;
4350
private final String keyPath;
4451

45-
public AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
52+
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) {
4653
super(path);
4754
this.blobStore = blobStore;
4855
this.keyPath = path.buildAsString();
56+
this.threadPool = threadPool;
4957
}
5058

5159
@Override
@@ -117,6 +125,32 @@ public void deleteBlob(String blobName) throws IOException {
117125
}
118126
}
119127

128+
@Override
129+
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
130+
if (blobNames.isEmpty()) {
131+
return;
132+
}
133+
final PlainActionFuture<Collection<Void>> result = PlainActionFuture.newFuture();
134+
final GroupedActionListener<Void> listener = new GroupedActionListener<>(result, blobNames.size());
135+
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
136+
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint.
137+
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
138+
for (String blobName : blobNames) {
139+
executor.submit(new ActionRunnable<>(listener) {
140+
@Override
141+
protected void doRun() throws IOException {
142+
deleteBlobIgnoringIfNotExists(blobName);
143+
listener.onResponse(null);
144+
}
145+
});
146+
}
147+
try {
148+
result.actionGet();
149+
} catch (Exception e) {
150+
throw new IOException("Exception during bulk delete", e);
151+
}
152+
}
153+
120154
@Override
121155
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String prefix) throws IOException {
122156
logger.trace("listBlobsByPrefix({})", prefix);

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.common.blobstore.BlobPath;
2929
import org.elasticsearch.common.blobstore.BlobStore;
3030
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
31+
import org.elasticsearch.threadpool.ThreadPool;
3132

3233
import java.io.IOException;
3334
import java.io.InputStream;
@@ -40,15 +41,17 @@
4041
public class AzureBlobStore implements BlobStore {
4142

4243
private final AzureStorageService service;
44+
private final ThreadPool threadPool;
4345

4446
private final String clientName;
4547
private final String container;
4648
private final LocationMode locationMode;
4749

48-
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
50+
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) {
4951
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
5052
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
5153
this.service = service;
54+
this.threadPool = threadPool;
5255
// locationMode is set per repository, not per client
5356
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
5457
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
@@ -70,7 +73,7 @@ public LocationMode getLocationMode() {
7073

7174
@Override
7275
public BlobContainer blobContainer(BlobPath path) {
73-
return new AzureBlobContainer(path, this);
76+
return new AzureBlobContainer(path, this, threadPool);
7477
}
7578

7679
@Override

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ protected BlobStore getBlobStore() {
120120

121121
@Override
122122
protected AzureBlobStore createBlobStore() {
123-
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
123+
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool);
124124

125125
logger.debug(() -> new ParameterizedMessage(
126126
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import org.elasticsearch.common.settings.Setting;
2323
import org.elasticsearch.common.settings.Settings;
2424
import org.elasticsearch.common.settings.SettingsException;
25+
import org.elasticsearch.common.unit.TimeValue;
2526
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2627
import org.elasticsearch.env.Environment;
2728
import org.elasticsearch.plugins.Plugin;
2829
import org.elasticsearch.plugins.ReloadablePlugin;
2930
import org.elasticsearch.plugins.RepositoryPlugin;
3031
import org.elasticsearch.repositories.Repository;
32+
import org.elasticsearch.threadpool.ExecutorBuilder;
33+
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
3134
import org.elasticsearch.threadpool.ThreadPool;
3235

3336
import java.util.Arrays;
@@ -40,6 +43,8 @@
4043
*/
4144
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {
4245

46+
public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure";
47+
4348
// protected for testing
4449
final AzureStorageService azureStoreService;
4550

@@ -69,6 +74,15 @@ public List<Setting<?>> getSettings() {
6974
);
7075
}
7176

77+
@Override
78+
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
79+
return Collections.singletonList(executorBuilder());
80+
}
81+
82+
public static ExecutorBuilder<?> executorBuilder() {
83+
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L));
84+
}
85+
7286
@Override
7387
public void reload(Settings settings) {
7488
// secure settings should be readable

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreContainerTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,31 @@
2323
import org.elasticsearch.common.blobstore.BlobStore;
2424
import org.elasticsearch.common.settings.Settings;
2525
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
26+
import org.elasticsearch.threadpool.TestThreadPool;
27+
import org.elasticsearch.threadpool.ThreadPool;
2628

29+
import java.util.concurrent.TimeUnit;
2730

2831
public class AzureBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
32+
33+
private ThreadPool threadPool;
34+
35+
@Override
36+
public void setUp() throws Exception {
37+
super.setUp();
38+
threadPool = new TestThreadPool("AzureBlobStoreTests", AzureRepositoryPlugin.executorBuilder());
39+
}
40+
41+
@Override
42+
public void tearDown() throws Exception {
43+
super.tearDown();
44+
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
45+
}
46+
2947
@Override
3048
protected BlobStore newBlobStore() {
3149
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
3250
AzureStorageServiceMock client = new AzureStorageServiceMock();
33-
return new AzureBlobStore(repositoryMetaData, client);
51+
return new AzureBlobStore(repositoryMetaData, client, threadPool);
3452
}
3553
}

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,31 @@
2222
import org.elasticsearch.common.blobstore.BlobStore;
2323
import org.elasticsearch.common.settings.Settings;
2424
import org.elasticsearch.repositories.ESBlobStoreTestCase;
25+
import org.elasticsearch.threadpool.TestThreadPool;
26+
import org.elasticsearch.threadpool.ThreadPool;
27+
28+
import java.util.concurrent.TimeUnit;
2529

2630
public class AzureBlobStoreTests extends ESBlobStoreTestCase {
2731

32+
private ThreadPool threadPool;
33+
34+
@Override
35+
public void setUp() throws Exception {
36+
super.setUp();
37+
threadPool = new TestThreadPool("AzureBlobStoreTests", AzureRepositoryPlugin.executorBuilder());
38+
}
39+
40+
@Override
41+
public void tearDown() throws Exception {
42+
super.tearDown();
43+
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
44+
}
45+
2846
@Override
2947
protected BlobStore newBlobStore() {
3048
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
3149
AzureStorageServiceMock client = new AzureStorageServiceMock();
32-
return new AzureBlobStore(repositoryMetaData, client);
50+
return new AzureBlobStore(repositoryMetaData, client, threadPool);
3351
}
3452
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
128128

129129
protected final RepositoryMetaData metadata;
130130

131-
private final ThreadPool threadPool;
131+
protected final ThreadPool threadPool;
132132

133133
private static final int BUFFER_SIZE = 4096;
134134

test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@
2424

2525
public class TestThreadPool extends ThreadPool {
2626

27-
public TestThreadPool(String name) {
28-
this(name, Settings.EMPTY);
27+
public TestThreadPool(String name, ExecutorBuilder<?>... customBuilders) {
28+
this(name, Settings.EMPTY, customBuilders);
2929
}
3030

31-
public TestThreadPool(String name, Settings settings) {
32-
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build());
31+
public TestThreadPool(String name, Settings settings, ExecutorBuilder<?>... customBuilders) {
32+
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build(), customBuilders);
3333
}
3434

3535
}

0 commit comments

Comments
 (0)