Skip to content

Commit 5249540

Browse files
Simplify Blobstore Consistency Check in Tests (#73992)
With work to make repo APIs more async incoming in #73570 we need a non-blocking way to run this check. This adds that async check and removes the need to manually pass executors around as well.
1 parent 6235ff2 commit 5249540

File tree

7 files changed

+29
-33
lines changed

7 files changed

+29
-33
lines changed

plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
2323
import org.elasticsearch.snapshots.SnapshotState;
2424
import org.elasticsearch.test.ESSingleNodeTestCase;
25-
import org.elasticsearch.threadpool.ThreadPool;
2625

2726
import java.util.Collection;
2827

@@ -140,7 +139,7 @@ public void testSimpleWorkflow() {
140139
assertThat(clusterState.getMetadata().hasIndex("test-idx-2"), equalTo(false));
141140
final BlobStoreRepository repo =
142141
(BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
143-
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
142+
BlobStoreTestUtil.assertConsistency(repo);
144143
}
145144

146145
public void testMissingUri() {

plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor gen
7777
}
7878

7979
@Override
80-
protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception {
80+
protected void assertConsistentRepository(BlobStoreRepository repo) throws Exception {
8181
// S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that
8282
// listing operations will become consistent within these 10 minutes.
83-
assertBusy(() -> super.assertConsistentRepository(repo, executor), 10L, TimeUnit.MINUTES);
83+
assertBusy(() -> super.assertConsistentRepository(repo), 10L, TimeUnit.MINUTES);
8484
}
8585

8686
protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetadata> blobs) throws Exception {

server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,6 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti
167167
logger.info("--> cleanup repository");
168168
client().admin().cluster().prepareCleanupRepository(repoName).get();
169169

170-
BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic());
170+
BlobStoreTestUtil.assertConsistency(repository);
171171
}
172172
}

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,11 @@ public void verifyReposThenStopServices() {
272272
if (blobStoreContext != null) {
273273
blobStoreContext.forceConsistent();
274274
}
275-
BlobStoreTestUtil.assertConsistency(
276-
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
277-
Runnable::run
275+
final PlainActionFuture<AssertionError> future = BlobStoreTestUtil.assertConsistencyAsync(
276+
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo")
278277
);
278+
deterministicTaskQueue.runAllRunnableTasks();
279+
assertNull(future.actionGet(0));
279280
} finally {
280281
testClusterNodes.nodes.values().forEach(TestClusterNodes.TestClusterNode::stop);
281282
}

test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public void testCleanup() throws Exception {
206206
logger.info("--> deleting a snapshot to trigger repository cleanup");
207207
client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet();
208208

209-
assertConsistentRepository(repo, genericExec);
209+
assertConsistentRepository(repo);
210210

211211
logger.info("--> Create dangling index");
212212
createDanglingIndex(repo, genericExec);
@@ -247,8 +247,8 @@ protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor exe
247247
return future.actionGet();
248248
}
249249

250-
protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception {
251-
BlobStoreTestUtil.assertConsistency(repo, executor);
250+
protected void assertConsistentRepository(BlobStoreRepository repo) throws Exception {
251+
BlobStoreTestUtil.assertConsistency(repo);
252252
}
253253

254254
protected void assertDeleted(BlobPath path, String name) throws Exception {

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java

+17-21
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,10 @@
3434
import org.elasticsearch.common.xcontent.XContentParser;
3535
import org.elasticsearch.common.xcontent.XContentType;
3636
import org.elasticsearch.repositories.IndexId;
37-
import org.elasticsearch.repositories.RepositoriesService;
3837
import org.elasticsearch.repositories.RepositoryData;
3938
import org.elasticsearch.repositories.ShardGenerations;
4039
import org.elasticsearch.snapshots.SnapshotId;
4140
import org.elasticsearch.snapshots.SnapshotInfo;
42-
import org.elasticsearch.test.InternalTestCluster;
4341
import org.elasticsearch.threadpool.ThreadPool;
4442

4543
import java.io.DataInputStream;
@@ -55,7 +53,6 @@
5553
import java.util.Map;
5654
import java.util.Set;
5755
import java.util.concurrent.CopyOnWriteArrayList;
58-
import java.util.concurrent.Executor;
5956
import java.util.concurrent.atomic.AtomicReference;
6057
import java.util.stream.Collectors;
6158

@@ -78,23 +75,25 @@
7875

7976
public final class BlobStoreTestUtil {
8077

81-
public static void assertRepoConsistency(InternalTestCluster testCluster, String repoName) {
82-
final BlobStoreRepository repo =
83-
(BlobStoreRepository) testCluster.getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
84-
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
85-
}
86-
8778
/**
8879
* Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository.
8980
* TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata
9081
* @param repository BlobStoreRepository to check
91-
* @param executor Executor to run all repository calls on. This is needed since the production {@link BlobStoreRepository}
92-
* implementations assert that all IO inducing calls happen on the generic or snapshot thread-pools and hence callers
93-
* of this assertion must pass an executor on those when using such an implementation.
9482
*/
95-
public static void assertConsistency(BlobStoreRepository repository, Executor executor) {
96-
final PlainActionFuture<AssertionError> listener = PlainActionFuture.newFuture();
97-
executor.execute(ActionRunnable.supply(listener, () -> {
83+
public static void assertConsistency(BlobStoreRepository repository) {
84+
final PlainActionFuture<AssertionError> listener = assertConsistencyAsync(repository);
85+
final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
86+
if (err != null) {
87+
throw new AssertionError(err);
88+
}
89+
}
90+
91+
/**
92+
* Same as {@link #assertConsistency(BlobStoreRepository)} but async so it can be used in tests that don't allow blocking.
93+
*/
94+
public static PlainActionFuture<AssertionError> assertConsistencyAsync(BlobStoreRepository repository) {
95+
final PlainActionFuture<AssertionError> future = PlainActionFuture.newFuture();
96+
repository.threadPool().generic().execute(ActionRunnable.wrap(future, listener -> {
9897
try {
9998
final BlobContainer blobContainer = repository.blobContainer();
10099
final long latestGen;
@@ -113,15 +112,12 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex
113112
assertIndexUUIDs(repository, repositoryData);
114113
assertSnapshotUUIDs(repository, repositoryData);
115114
assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
116-
return null;
115+
listener.onResponse(null);
117116
} catch (AssertionError e) {
118-
return e;
117+
listener.onResponse(e);
119118
}
120119
}));
121-
final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
122-
if (err != null) {
123-
throw new AssertionError(err);
124-
}
120+
return future;
125121
}
126122

127123
private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {

test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void assertRepoConsistency() {
135135
clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get();
136136
clusterAdmin().prepareCleanupRepository(name).get();
137137
}
138-
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
138+
BlobStoreTestUtil.assertConsistency(getRepositoryOnMaster(name));
139139
});
140140
} else {
141141
logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason);

0 commit comments

Comments
 (0)