From 12146e5a89e8e9f452957f69e02482121087e7e4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 10 Jun 2021 11:12:23 +0200 Subject: [PATCH 1/2] Simplify Blobstore Consistency Check in Tests With work to make repo APIs more async incoming in #73570 we need a non-blocking way to run this check. This adds that async check and removes the need to manually pass executors around as well. --- .../repositories/hdfs/HdfsTests.java | 3 +- .../s3/S3RepositoryThirdPartyTests.java | 4 +- .../BlobStoreRepositoryCleanupIT.java | 2 +- .../snapshots/SnapshotResiliencyTests.java | 7 ++-- .../AbstractThirdPartyRepositoryTestCase.java | 6 +-- .../blobstore/BlobStoreTestUtil.java | 39 +++++++++---------- .../AbstractSnapshotIntegTestCase.java | 2 +- 7 files changed, 30 insertions(+), 33 deletions(-) diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java index cdeda2ed72c08..01f2f524294ed 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; @@ -140,7 +139,7 @@ public void testSimpleWorkflow() { assertThat(clusterState.getMetadata().hasIndex("test-idx-2"), equalTo(false)); final BlobStoreRepository repo = (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo"); - BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC)); + BlobStoreTestUtil.assertConsistency(repo); } public void testMissingUri() { diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index f5f4893d811b7..7e852b4db8fa9 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -77,10 +77,10 @@ protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor gen } @Override - protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception { + protected void assertConsistentRepository(BlobStoreRepository repo) throws Exception { // S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that // listing operations will become consistent within these 10 minutes. - assertBusy(() -> super.assertConsistentRepository(repo, executor), 10L, TimeUnit.MINUTES); + assertBusy(() -> super.assertConsistentRepository(repo), 10L, TimeUnit.MINUTES); } protected void assertBlobsByPrefix(BlobPath path, String prefix, Map blobs) throws Exception { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java index aa0d0ce843137..eb9f60cebce47 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java @@ -167,6 +167,6 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti logger.info("--> cleanup repository"); client().admin().cluster().prepareCleanupRepository(repoName).get(); - BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic()); + BlobStoreTestUtil.assertConsistency(repository); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 4139ac3a39a58..cd8a1a01736cf 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -272,10 +272,11 @@ public void verifyReposThenStopServices() { if (blobStoreContext != null) { blobStoreContext.forceConsistent(); } - BlobStoreTestUtil.assertConsistency( - (BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"), - Runnable::run + final PlainActionFuture future = BlobStoreTestUtil.assertConsistencyAsync( + (BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo") ); + deterministicTaskQueue.runAllRunnableTasks(); + assertNull(future.actionGet(0)); } finally { testClusterNodes.nodes.values().forEach(TestClusterNodes.TestClusterNode::stop); } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index f8a26696a24c1..8724db401b2c2 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -206,7 +206,7 @@ public void testCleanup() throws Exception { logger.info("--> deleting a snapshot to trigger repository cleanup"); client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet(); - assertConsistentRepository(repo, genericExec); + assertConsistentRepository(repo); logger.info("--> Create dangling index"); createDanglingIndex(repo, genericExec); @@ -247,8 +247,8 @@ protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor exe return future.actionGet(); } - protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception { - BlobStoreTestUtil.assertConsistency(repo, executor); + protected void assertConsistentRepository(BlobStoreRepository repo) throws Exception { + BlobStoreTestUtil.assertConsistency(repo); } protected void assertDeleted(BlobPath path, String name) throws Exception { diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index a152d72c38c1d..287994e831cf0 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -34,12 +34,10 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; -import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.threadpool.ThreadPool; import java.io.DataInputStream; @@ -55,7 +53,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -78,23 +75,25 @@ public final class BlobStoreTestUtil { - public static void assertRepoConsistency(InternalTestCluster testCluster, String repoName) { - final BlobStoreRepository repo = - (BlobStoreRepository) testCluster.getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); - BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC)); - } - /** * Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository. * TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata * @param repository BlobStoreRepository to check - * @param executor Executor to run all repository calls on. This is needed since the production {@link BlobStoreRepository} - * implementations assert that all IO inducing calls happen on the generic or snapshot thread-pools and hence callers - * of this assertion must pass an executor on those when using such an implementation. */ - public static void assertConsistency(BlobStoreRepository repository, Executor executor) { - final PlainActionFuture listener = PlainActionFuture.newFuture(); - executor.execute(ActionRunnable.supply(listener, () -> { + public static void assertConsistency(BlobStoreRepository repository) { + final PlainActionFuture listener = assertConsistencyAsync(repository); + final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L)); + if (err != null) { + throw new AssertionError(err); + } + } + + /** + * Same as {@link #assertConsistency(BlobStoreRepository)} but async so it can be used in tests that don't allow blocking. + */ + public static PlainActionFuture assertConsistencyAsync(BlobStoreRepository repository) { + final PlainActionFuture future = PlainActionFuture.newFuture(); + repository.threadPool().generic().execute(ActionRunnable.wrap(future, listener -> { try { final BlobContainer blobContainer = repository.blobContainer(); final long latestGen; @@ -113,15 +112,13 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations()); - return null; } catch (AssertionError e) { - return e; + listener.onResponse(e); + return; } + listener.onResponse(null); })); - final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L)); - if (err != null) { - throw new AssertionError(err); - } + return future; } private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 2c48b897dd06f..e73e5625cef52 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -135,7 +135,7 @@ public void assertRepoConsistency() { clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get(); clusterAdmin().prepareCleanupRepository(name).get(); } - BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name); + BlobStoreTestUtil.assertConsistency(getRepositoryOnMaster(name)); }); } else { logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason); From 2c02cec7a07da5d19a3653a1bc2487fb423dc51f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 10 Jun 2021 15:11:36 +0200 Subject: [PATCH 2/2] CR: nit --- .../repositories/blobstore/BlobStoreTestUtil.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index 287994e831cf0..a301bcb626879 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -112,11 +112,10 @@ public static PlainActionFuture assertConsistencyAsync(BlobStore assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations()); + listener.onResponse(null); } catch (AssertionError e) { listener.onResponse(e); - return; } - listener.onResponse(null); })); return future; }