Skip to content

Commit db348a4

Browse files
Simplify Blobstore Consistency Check in Tests (elastic#73992)
With work to make repo APIs more async incoming in elastic#73570 we need a non-blocking way to run this check. This adds that async check and removes the need to manually pass executors around as well.
1 parent 094eb11 commit db348a4

File tree

8 files changed

+29
-72
lines changed

8 files changed

+29
-72
lines changed

plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
2323
import org.elasticsearch.snapshots.SnapshotState;
2424
import org.elasticsearch.test.ESSingleNodeTestCase;
25-
import org.elasticsearch.threadpool.ThreadPool;
2625

2726
import java.util.Collection;
2827

@@ -140,7 +139,7 @@ public void testSimpleWorkflow() {
140139
assertThat(clusterState.getMetadata().hasIndex("test-idx-2"), equalTo(false));
141140
final BlobStoreRepository repo =
142141
(BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
143-
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
142+
BlobStoreTestUtil.assertConsistency(repo);
144143
}
145144

146145
public void testMissingUri() {

server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,6 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti
167167
logger.info("--> cleanup repository");
168168
client().admin().cluster().prepareCleanupRepository(repoName).get();
169169

170-
BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic());
170+
BlobStoreTestUtil.assertConsistency(repository);
171171
}
172172
}

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,11 @@ public void verifyReposThenStopServices() {
260260

261261
runUntil(cleanedUp::get, TimeUnit.MINUTES.toMillis(1L));
262262

263-
BlobStoreTestUtil.assertConsistency(
264-
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
265-
Runnable::run
263+
final PlainActionFuture<AssertionError> future = BlobStoreTestUtil.assertConsistencyAsync(
264+
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo")
266265
);
266+
deterministicTaskQueue.runAllRunnableTasks();
267+
assertNull(future.actionGet(0));
267268
} finally {
268269
testClusterNodes.nodes.values().forEach(TestClusterNodes.TestClusterNode::stop);
269270
}

test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public void testCleanup() throws Exception {
198198
logger.info("--> deleting a snapshot to trigger repository cleanup");
199199
client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet();
200200

201-
BlobStoreTestUtil.assertConsistency(repo, genericExec);
201+
BlobStoreTestUtil.assertConsistency(repo);
202202

203203
logger.info("--> Create dangling index");
204204
createDanglingIndex(repo, genericExec);

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,10 @@
3535
import org.elasticsearch.common.xcontent.XContentParser;
3636
import org.elasticsearch.common.xcontent.XContentType;
3737
import org.elasticsearch.repositories.IndexId;
38-
import org.elasticsearch.repositories.RepositoriesService;
3938
import org.elasticsearch.repositories.RepositoryData;
4039
import org.elasticsearch.repositories.ShardGenerations;
4140
import org.elasticsearch.snapshots.SnapshotId;
4241
import org.elasticsearch.snapshots.SnapshotInfo;
43-
import org.elasticsearch.test.InternalTestCluster;
4442
import org.elasticsearch.threadpool.ThreadPool;
4543

4644
import java.io.ByteArrayInputStream;
@@ -58,7 +56,6 @@
5856
import java.util.Set;
5957
import java.util.concurrent.CopyOnWriteArrayList;
6058
import java.util.concurrent.ExecutionException;
61-
import java.util.concurrent.Executor;
6259
import java.util.concurrent.atomic.AtomicLong;
6360
import java.util.concurrent.atomic.AtomicReference;
6461
import java.util.stream.Collectors;
@@ -82,23 +79,25 @@
8279

8380
public final class BlobStoreTestUtil {
8481

85-
public static void assertRepoConsistency(InternalTestCluster testCluster, String repoName) {
86-
final BlobStoreRepository repo =
87-
(BlobStoreRepository) testCluster.getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
88-
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
89-
}
90-
9182
/**
9283
* Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository.
9384
* TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata
9485
* @param repository BlobStoreRepository to check
95-
* @param executor Executor to run all repository calls on. This is needed since the production {@link BlobStoreRepository}
96-
* implementations assert that all IO inducing calls happen on the generic or snapshot thread-pools and hence callers
97-
* of this assertion must pass an executor on those when using such an implementation.
9886
*/
99-
public static void assertConsistency(BlobStoreRepository repository, Executor executor) {
100-
final PlainActionFuture<AssertionError> listener = PlainActionFuture.newFuture();
101-
executor.execute(ActionRunnable.supply(listener, () -> {
87+
public static void assertConsistency(BlobStoreRepository repository) {
88+
final PlainActionFuture<AssertionError> listener = assertConsistencyAsync(repository);
89+
final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
90+
if (err != null) {
91+
throw new AssertionError(err);
92+
}
93+
}
94+
95+
/**
96+
* Same as {@link #assertConsistency(BlobStoreRepository)} but async so it can be used in tests that don't allow blocking.
97+
*/
98+
public static PlainActionFuture<AssertionError> assertConsistencyAsync(BlobStoreRepository repository) {
99+
final PlainActionFuture<AssertionError> future = PlainActionFuture.newFuture();
100+
repository.threadPool().generic().execute(ActionRunnable.wrap(future, listener -> {
102101
try {
103102
final BlobContainer blobContainer = repository.blobContainer();
104103
final long latestGen;
@@ -117,15 +116,12 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex
117116
assertIndexUUIDs(repository, repositoryData);
118117
assertSnapshotUUIDs(repository, repositoryData);
119118
assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
120-
return null;
119+
listener.onResponse(null);
121120
} catch (AssertionError e) {
122-
return e;
121+
listener.onResponse(e);
123122
}
124123
}));
125-
final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
126-
if (err != null) {
127-
throw new AssertionError(err);
128-
}
124+
return future;
129125
}
130126

131127
private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {

test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void assertRepoConsistency() {
134134
clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get();
135135
clusterAdmin().prepareCleanupRepository(name).get();
136136
}
137-
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
137+
BlobStoreTestUtil.assertConsistency(getRepositoryOnMaster(name));
138138
});
139139
} else {
140140
logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason);

x-pack/snapshot-tool/qa/s3/src/test/java/org/elasticsearch/snapshots/S3CleanupTests.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,43 +10,21 @@
1010
import joptsimple.OptionSet;
1111
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1212
import org.elasticsearch.cli.MockTerminal;
13-
import org.elasticsearch.common.blobstore.BlobMetadata;
14-
import org.elasticsearch.common.blobstore.BlobPath;
1513
import org.elasticsearch.common.settings.MockSecureSettings;
1614
import org.elasticsearch.common.settings.SecureSettings;
1715
import org.elasticsearch.common.settings.Settings;
1816
import org.elasticsearch.plugins.Plugin;
19-
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
2017
import org.elasticsearch.repositories.s3.S3RepositoryPlugin;
2118

2219
import java.util.HashMap;
2320
import java.util.Collection;
2421
import java.util.Collections;
2522
import java.util.Map;
26-
import java.util.Set;
27-
import java.util.concurrent.Executor;
28-
import java.util.concurrent.TimeUnit;
2923

3024
import static org.hamcrest.Matchers.equalTo;
3125

3226
public class S3CleanupTests extends AbstractCleanupTests {
3327

34-
@Override
35-
protected void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath path, String prefix, Map<String, BlobMetadata> blobs)
36-
throws Exception {
37-
assertBusy(() -> super.assertBlobsByPrefix(repository, path, prefix, blobs), 10, TimeUnit.MINUTES);
38-
}
39-
40-
@Override
41-
protected void assertCorruptionVisible(BlobStoreRepository repo, Map<String, Set<String>> indexToFiles) throws Exception {
42-
assertBusy(() -> super.assertCorruptionVisible(repo, indexToFiles), 10, TimeUnit.MINUTES);
43-
}
44-
45-
@Override
46-
protected void assertConsistency(BlobStoreRepository repo, Executor executor) throws Exception {
47-
assertBusy(() -> super.assertConsistency(repo, executor), 10, TimeUnit.MINUTES);
48-
}
49-
5028
@Override
5129
protected Collection<Class<? extends Plugin>> getPlugins() {
5230
return pluginList(S3RepositoryPlugin.class);

x-pack/snapshot-tool/src/test/java/org/elasticsearch/snapshots/AbstractCleanupTests.java

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,18 @@
1212
import org.elasticsearch.action.support.PlainActionFuture;
1313
import org.elasticsearch.cli.MockTerminal;
1414
import org.elasticsearch.cli.Terminal;
15-
import org.elasticsearch.common.blobstore.BlobMetadata;
16-
import org.elasticsearch.common.blobstore.BlobPath;
1715
import org.elasticsearch.common.settings.SecureSettings;
1816
import org.elasticsearch.common.settings.Settings;
1917
import org.elasticsearch.repositories.RepositoriesService;
2018
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
2119
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
2220
import org.elasticsearch.test.ESSingleNodeTestCase;
23-
import org.elasticsearch.threadpool.ThreadPool;
2421

2522
import java.util.Collections;
2623
import java.util.Map;
2724
import java.util.Set;
2825
import java.util.TreeMap;
2926
import java.util.TreeSet;
30-
import java.util.concurrent.Executor;
3127

3228
import static org.hamcrest.Matchers.containsString;
3329
import static org.hamcrest.Matchers.equalTo;
@@ -38,19 +34,6 @@ public abstract class AbstractCleanupTests extends ESSingleNodeTestCase {
3834

3935
protected BlobStoreRepository repository;
4036

41-
protected void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath path, String prefix, Map<String, BlobMetadata> blobs)
42-
throws Exception {
43-
BlobStoreTestUtil.assertBlobsByPrefix(repository, path, prefix, blobs);
44-
}
45-
46-
protected void assertConsistency(BlobStoreRepository repo, Executor executor) throws Exception {
47-
BlobStoreTestUtil.assertConsistency(repo, executor);
48-
}
49-
50-
protected void assertCorruptionVisible(BlobStoreRepository repo, Map<String, Set<String>> indexToFiles) throws Exception {
51-
BlobStoreTestUtil.assertCorruptionVisible(repo, indexToFiles);
52-
}
53-
5437
@Override
5538
public void setUp() throws Exception {
5639
super.setUp();
@@ -64,7 +47,7 @@ private void cleanupRepository(BlobStoreRepository repository) throws Exception
6447
repository.threadPool().generic().execute(ActionRunnable.run(future,
6548
() -> repository.blobStore().blobContainer(repository.basePath()).delete()));
6649
future.actionGet();
67-
assertBlobsByPrefix(repository, repository.basePath(), "", Collections.emptyMap());
50+
BlobStoreTestUtil.assertBlobsByPrefix(repository, repository.basePath(), "", Collections.emptyMap());
6851
}
6952

7053
@Override
@@ -191,7 +174,7 @@ public void testCleanup() throws Throwable {
191174
assertThat(terminal.getOutput(), containsString("Set of deletion candidates is empty. Exiting"));
192175

193176
logger.info("--> check that there is no inconsistencies after running the tool");
194-
assertConsistency(repository, repository.threadPool().executor(ThreadPool.Names.GENERIC));
177+
BlobStoreTestUtil.assertConsistency(repository);
195178

196179
logger.info("--> create several dangling indices");
197180
int numOfFiles = 0;
@@ -211,7 +194,7 @@ public void testCleanup() throws Throwable {
211194
Set<String> danglingIndices = indexToFiles.keySet();
212195

213196
logger.info("--> ensure dangling index folders are visible");
214-
assertCorruptionVisible(repository, indexToFiles);
197+
BlobStoreTestUtil.assertCorruptionVisible(repository, indexToFiles);
215198

216199
logger.info("--> execute cleanup tool, corruption is created latter than snapshot, there is nothing to cleanup");
217200
terminal = executeCommand(false);
@@ -258,7 +241,7 @@ public void testCleanup() throws Throwable {
258241
containsString("Total bytes freed: " + size));
259242

260243
logger.info("--> verify that there is no inconsistencies");
261-
assertConsistency(repository, repository.threadPool().executor(ThreadPool.Names.GENERIC));
244+
BlobStoreTestUtil.assertConsistency(repository);
262245
logger.info("--> perform cleanup by removing snapshots");
263246
assertTrue(client().admin()
264247
.cluster()

0 commit comments

Comments
 (0)