Skip to content

Commit 813b49a

Browse files
Make BlobStoreRepository Aware of ClusterState (#49639) (#49711)
* Make BlobStoreRepository Aware of ClusterState (#49639) This is a preliminary to #49060. It does not introduce any substantial behavior change to how the blob store repository operates. What it does is to add all the infrastructure changes around passing the cluster service to the blob store, associated test changes and a best effort approach to tracking the latest repository generation on all nodes from cluster state updates. This brings a slight improvement to the consistency by which non-master nodes (or master directly after a failover) will be able to determine the latest repository generation. It does not however do any tricky checks for the situation after a repository operation (create, delete or cleanup) that could theoretically be used to get even greater accuracy to keep this change simple. This change does not in any way alter the behavior of the blobstore repository other than adding a better "guess" for the value of the latest repo generation and is mainly intended to isolate the actual logical change to how the repository operates in #49060
1 parent 3d525e1 commit 813b49a

File tree

46 files changed

+327
-154
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+327
-154
lines changed

modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919

2020
package org.elasticsearch.plugin.repository.url;
2121

22+
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.common.settings.Setting;
2324
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2425
import org.elasticsearch.env.Environment;
2526
import org.elasticsearch.plugins.Plugin;
2627
import org.elasticsearch.plugins.RepositoryPlugin;
2728
import org.elasticsearch.repositories.Repository;
2829
import org.elasticsearch.repositories.url.URLRepository;
29-
import org.elasticsearch.threadpool.ThreadPool;
3030

3131
import java.util.Arrays;
3232
import java.util.Collections;
@@ -46,8 +46,8 @@ public List<Setting<?>> getSettings() {
4646

4747
@Override
4848
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
49-
ThreadPool threadPool) {
49+
ClusterService clusterService) {
5050
return Collections.singletonMap(URLRepository.TYPE,
51-
metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool));
51+
metadata -> new URLRepository(metadata, env, namedXContentRegistry, clusterService));
5252
}
5353
}

modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
25+
import org.elasticsearch.cluster.service.ClusterService;
2526
import org.elasticsearch.common.blobstore.BlobContainer;
2627
import org.elasticsearch.common.blobstore.BlobPath;
2728
import org.elasticsearch.common.blobstore.BlobStore;
@@ -33,7 +34,6 @@
3334
import org.elasticsearch.env.Environment;
3435
import org.elasticsearch.repositories.RepositoryException;
3536
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
36-
import org.elasticsearch.threadpool.ThreadPool;
3737

3838
import java.net.MalformedURLException;
3939
import java.net.URISyntaxException;
@@ -83,8 +83,8 @@ public class URLRepository extends BlobStoreRepository {
8383
* Constructs a read-only URL-based repository
8484
*/
8585
public URLRepository(RepositoryMetaData metadata, Environment environment,
86-
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
87-
super(metadata, false, namedXContentRegistry, threadPool);
86+
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) {
87+
super(metadata, false, namedXContentRegistry, clusterService);
8888

8989
if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
9090
throw new RepositoryException(metadata.name(), "missing url");

modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import org.elasticsearch.env.Environment;
2626
import org.elasticsearch.env.TestEnvironment;
2727
import org.elasticsearch.repositories.RepositoryException;
28+
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
2829
import org.elasticsearch.test.ESTestCase;
29-
import org.elasticsearch.threadpool.ThreadPool;
3030

3131
import java.io.IOException;
3232
import java.nio.file.Path;
@@ -35,13 +35,12 @@
3535
import static org.hamcrest.CoreMatchers.is;
3636
import static org.hamcrest.CoreMatchers.not;
3737
import static org.hamcrest.CoreMatchers.nullValue;
38-
import static org.mockito.Mockito.mock;
3938

4039
public class URLRepositoryTests extends ESTestCase {
4140

4241
private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) {
4342
return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
44-
new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) {
43+
new NamedXContentRegistry(Collections.emptyList()), BlobStoreTestUtil.mockClusterService()) {
4544
@Override
4645
protected void assertSnapshotOrGenericThread() {
4746
// eliminate thread name check as we create repo manually on test/main threads

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.logging.log4j.Logger;
2525
import org.apache.logging.log4j.message.ParameterizedMessage;
2626
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
27+
import org.elasticsearch.cluster.service.ClusterService;
2728
import org.elasticsearch.common.Strings;
2829
import org.elasticsearch.common.blobstore.BlobPath;
2930
import org.elasticsearch.common.blobstore.BlobStore;
@@ -32,7 +33,6 @@
3233
import org.elasticsearch.common.unit.ByteSizeValue;
3334
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3435
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
35-
import org.elasticsearch.threadpool.ThreadPool;
3636

3737
import java.util.Locale;
3838
import java.util.function.Function;
@@ -81,8 +81,8 @@ public AzureRepository(
8181
final RepositoryMetaData metadata,
8282
final NamedXContentRegistry namedXContentRegistry,
8383
final AzureStorageService storageService,
84-
final ThreadPool threadPool) {
85-
super(metadata, Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
84+
final ClusterService clusterService) {
85+
super(metadata, Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService);
8686
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
8787
this.storageService = storageService;
8888

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.repositories.azure;
2121

22+
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.common.settings.Setting;
2324
import org.elasticsearch.common.settings.Settings;
2425
import org.elasticsearch.common.settings.SettingsException;
@@ -31,7 +32,6 @@
3132
import org.elasticsearch.repositories.Repository;
3233
import org.elasticsearch.threadpool.ExecutorBuilder;
3334
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
34-
import org.elasticsearch.threadpool.ThreadPool;
3535

3636
import java.util.Arrays;
3737
import java.util.Collections;
@@ -60,9 +60,9 @@ AzureStorageService createAzureStoreService(final Settings settings) {
6060

6161
@Override
6262
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
63-
ThreadPool threadPool) {
63+
ClusterService clusterService) {
6464
return Collections.singletonMap(AzureRepository.TYPE,
65-
(metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, threadPool));
65+
(metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, clusterService));
6666
}
6767

6868
@Override

plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import org.elasticsearch.common.unit.ByteSizeValue;
2727
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2828
import org.elasticsearch.env.Environment;
29+
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
2930
import org.elasticsearch.test.ESTestCase;
30-
import org.elasticsearch.threadpool.ThreadPool;
3131

3232
import static org.hamcrest.Matchers.is;
3333
import static org.hamcrest.Matchers.nullValue;
@@ -42,8 +42,7 @@ private AzureRepository azureRepository(Settings settings) {
4242
.put(settings)
4343
.build();
4444
final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings),
45-
NamedXContentRegistry.EMPTY, mock(AzureStorageService.class),
46-
mock(ThreadPool.class));
45+
NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), BlobStoreTestUtil.mockClusterService());
4746
assertThat(azureRepository.getBlobStore(), is(nullValue()));
4847
return azureRepository;
4948
}

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.repositories.gcs;
2121

22+
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.common.settings.Setting;
2324
import org.elasticsearch.common.settings.Settings;
2425
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -27,7 +28,6 @@
2728
import org.elasticsearch.plugins.ReloadablePlugin;
2829
import org.elasticsearch.plugins.RepositoryPlugin;
2930
import org.elasticsearch.repositories.Repository;
30-
import org.elasticsearch.threadpool.ThreadPool;
3131

3232
import java.util.Arrays;
3333
import java.util.Collections;
@@ -52,9 +52,9 @@ protected GoogleCloudStorageService createStorageService() {
5252

5353
@Override
5454
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
55-
ThreadPool threadPool) {
55+
ClusterService clusterService) {
5656
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
57-
metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, threadPool));
57+
metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, clusterService));
5858
}
5959

6060
@Override

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
25+
import org.elasticsearch.cluster.service.ClusterService;
2526
import org.elasticsearch.common.Strings;
2627
import org.elasticsearch.common.blobstore.BlobPath;
2728
import org.elasticsearch.common.settings.Setting;
@@ -30,7 +31,6 @@
3031
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3132
import org.elasticsearch.repositories.RepositoryException;
3233
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
33-
import org.elasticsearch.threadpool.ThreadPool;
3434

3535
import java.util.function.Function;
3636

@@ -68,8 +68,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
6868
final RepositoryMetaData metadata,
6969
final NamedXContentRegistry namedXContentRegistry,
7070
final GoogleCloudStorageService storageService,
71-
final ThreadPool threadPool) {
72-
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, threadPool);
71+
final ClusterService clusterService) {
72+
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService);
7373
this.storageService = storageService;
7474

7575
String basePath = BASE_PATH.get(metadata.settings());

plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import fixture.gcs.FakeOAuth2HttpHandler;
2828
import fixture.gcs.GoogleCloudStorageHttpHandler;
2929
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
30+
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.SuppressForbidden;
3132
import org.elasticsearch.common.io.Streams;
3233
import org.elasticsearch.common.settings.MockSecureSettings;
@@ -38,7 +39,6 @@
3839
import org.elasticsearch.plugins.Plugin;
3940
import org.elasticsearch.repositories.Repository;
4041
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
41-
import org.elasticsearch.threadpool.ThreadPool;
4242
import org.threeten.bp.Duration;
4343

4444
import java.io.IOException;
@@ -171,9 +171,10 @@ StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clien
171171
}
172172

173173
@Override
174-
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry registry, ThreadPool threadPool) {
174+
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry registry,
175+
ClusterService clusterService) {
175176
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
176-
metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, threadPool) {
177+
metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, clusterService) {
177178
@Override
178179
protected GoogleCloudStorageBlobStore createBlobStore() {
179180
return new GoogleCloudStorageBlobStore("bucket", "test", storageService) {

plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@
3030
import org.apache.hadoop.security.KerberosInfo;
3131
import org.apache.hadoop.security.SecurityUtil;
3232
import org.elasticsearch.SpecialPermission;
33+
import org.elasticsearch.cluster.service.ClusterService;
3334
import org.elasticsearch.common.SuppressForbidden;
3435
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3536
import org.elasticsearch.env.Environment;
3637
import org.elasticsearch.plugins.Plugin;
3738
import org.elasticsearch.plugins.RepositoryPlugin;
3839
import org.elasticsearch.repositories.Repository;
39-
import org.elasticsearch.threadpool.ThreadPool;
4040

4141
public final class HdfsPlugin extends Plugin implements RepositoryPlugin {
4242

@@ -112,7 +112,7 @@ private static Void eagerInit() {
112112

113113
@Override
114114
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
115-
ThreadPool threadPool) {
116-
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool));
115+
ClusterService clusterService) {
116+
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, clusterService));
117117
}
118118
}

plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.logging.log4j.Logger;
3232
import org.elasticsearch.SpecialPermission;
3333
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
34+
import org.elasticsearch.cluster.service.ClusterService;
3435
import org.elasticsearch.common.Strings;
3536
import org.elasticsearch.common.SuppressForbidden;
3637
import org.elasticsearch.common.blobstore.BlobPath;
@@ -40,7 +41,6 @@
4041
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
4142
import org.elasticsearch.env.Environment;
4243
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
43-
import org.elasticsearch.threadpool.ThreadPool;
4444

4545
import java.io.IOException;
4646
import java.io.UncheckedIOException;
@@ -71,8 +71,8 @@ public HdfsRepository(
7171
final RepositoryMetaData metadata,
7272
final Environment environment,
7373
final NamedXContentRegistry namedXContentRegistry,
74-
final ThreadPool threadPool) {
75-
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, threadPool);
74+
final ClusterService clusterService) {
75+
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService);
7676

7777
this.environment = environment;
7878
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
25+
import org.elasticsearch.cluster.service.ClusterService;
2526
import org.elasticsearch.common.Strings;
2627
import org.elasticsearch.common.blobstore.BlobPath;
2728
import org.elasticsearch.common.blobstore.BlobStore;
@@ -35,7 +36,6 @@
3536
import org.elasticsearch.monitor.jvm.JvmInfo;
3637
import org.elasticsearch.repositories.RepositoryException;
3738
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
38-
import org.elasticsearch.threadpool.ThreadPool;
3939

4040
import java.util.function.Function;
4141

@@ -172,8 +172,8 @@ class S3Repository extends BlobStoreRepository {
172172
final RepositoryMetaData metadata,
173173
final NamedXContentRegistry namedXContentRegistry,
174174
final S3Service service,
175-
final ThreadPool threadPool) {
176-
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
175+
final ClusterService clusterService) {
176+
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService);
177177
this.service = service;
178178

179179
this.repositoryMetaData = metadata;

plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.amazonaws.util.json.Jackson;
2323
import org.elasticsearch.SpecialPermission;
2424
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
25+
import org.elasticsearch.cluster.service.ClusterService;
2526
import org.elasticsearch.common.settings.Setting;
2627
import org.elasticsearch.common.settings.Settings;
2728
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -30,7 +31,6 @@
3031
import org.elasticsearch.plugins.ReloadablePlugin;
3132
import org.elasticsearch.plugins.RepositoryPlugin;
3233
import org.elasticsearch.repositories.Repository;
33-
import org.elasticsearch.threadpool.ThreadPool;
3434

3535
import java.io.IOException;
3636
import java.security.AccessController;
@@ -79,14 +79,14 @@ public S3RepositoryPlugin(final Settings settings) {
7979
protected S3Repository createRepository(
8080
final RepositoryMetaData metadata,
8181
final NamedXContentRegistry registry,
82-
final ThreadPool threadPool) {
83-
return new S3Repository(metadata, registry, service, threadPool);
82+
final ClusterService clusterService) {
83+
return new S3Repository(metadata, registry, service, clusterService);
8484
}
8585

8686
@Override
8787
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
88-
final ThreadPool threadPool) {
89-
return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, threadPool));
88+
final ClusterService clusterService) {
89+
return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, clusterService));
9090
}
9191

9292
@Override

plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.client.node.NodeClient;
2828
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
2929
import org.elasticsearch.common.SuppressForbidden;
30+
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.settings.MockSecureSettings;
3132
import org.elasticsearch.common.settings.Settings;
3233
import org.elasticsearch.common.settings.SettingsFilter;
@@ -41,7 +42,6 @@
4142
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
4243
import org.elasticsearch.test.ESSingleNodeTestCase;
4344
import org.elasticsearch.test.rest.FakeRestRequest;
44-
import org.elasticsearch.threadpool.ThreadPool;
4545

4646
import java.security.AccessController;
4747
import java.security.PrivilegedAction;
@@ -271,8 +271,8 @@ public ProxyS3RepositoryPlugin(Settings settings) {
271271

272272
@Override
273273
protected S3Repository createRepository(RepositoryMetaData metadata,
274-
NamedXContentRegistry registry, ThreadPool threadPool) {
275-
return new S3Repository(metadata, registry, service, threadPool) {
274+
NamedXContentRegistry registry, ClusterService clusterService) {
275+
return new S3Repository(metadata, registry, service, clusterService) {
276276
@Override
277277
protected void assertSnapshotOrGenericThread() {
278278
// eliminate thread name check as we create repo manually on test/main threads

0 commit comments

Comments
 (0)