Skip to content

Simplify Blobstore Consistency Check in Tests #73992

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;

Expand Down Expand Up @@ -140,7 +139,7 @@ public void testSimpleWorkflow() {
assertThat(clusterState.getMetadata().hasIndex("test-idx-2"), equalTo(false));
final BlobStoreRepository repo =
(BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
BlobStoreTestUtil.assertConsistency(repo);
}

public void testMissingUri() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor gen
}

@Override
protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception {
protected void assertConsistentRepository(BlobStoreRepository repo) throws Exception {
// S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that
// listing operations will become consistent within these 10 minutes.
assertBusy(() -> super.assertConsistentRepository(repo, executor), 10L, TimeUnit.MINUTES);
assertBusy(() -> super.assertConsistentRepository(repo), 10L, TimeUnit.MINUTES);
}

protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetadata> blobs) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,6 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti
logger.info("--> cleanup repository");
client().admin().cluster().prepareCleanupRepository(repoName).get();

BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic());
BlobStoreTestUtil.assertConsistency(repository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,11 @@ public void verifyReposThenStopServices() {
if (blobStoreContext != null) {
blobStoreContext.forceConsistent();
}
BlobStoreTestUtil.assertConsistency(
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
Runnable::run
final PlainActionFuture<AssertionError> future = BlobStoreTestUtil.assertConsistencyAsync(
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo")
);
deterministicTaskQueue.runAllRunnableTasks();
assertNull(future.actionGet(0));
} finally {
testClusterNodes.nodes.values().forEach(TestClusterNodes.TestClusterNode::stop);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testCleanup() throws Exception {
logger.info("--> deleting a snapshot to trigger repository cleanup");
client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet();

assertConsistentRepository(repo, genericExec);
assertConsistentRepository(repo);

logger.info("--> Create dangling index");
createDanglingIndex(repo, genericExec);
Expand Down Expand Up @@ -247,8 +247,8 @@ protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor exe
return future.actionGet();
}

protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception {
BlobStoreTestUtil.assertConsistency(repo, executor);
protected void assertConsistentRepository(BlobStoreRepository repo) throws Exception {
BlobStoreTestUtil.assertConsistency(repo);
}

protected void assertDeleted(BlobPath path, String name) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.DataInputStream;
Expand All @@ -55,7 +53,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand All @@ -78,23 +75,25 @@

public final class BlobStoreTestUtil {

public static void assertRepoConsistency(InternalTestCluster testCluster, String repoName) {
final BlobStoreRepository repo =
(BlobStoreRepository) testCluster.getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
}

/**
* Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository.
* TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata
* @param repository BlobStoreRepository to check
* @param executor Executor to run all repository calls on. This is needed since the production {@link BlobStoreRepository}
* implementations assert that all IO inducing calls happen on the generic or snapshot thread-pools and hence callers
* of this assertion must pass an executor on those when using such an implementation.
*/
public static void assertConsistency(BlobStoreRepository repository, Executor executor) {
final PlainActionFuture<AssertionError> listener = PlainActionFuture.newFuture();
executor.execute(ActionRunnable.supply(listener, () -> {
public static void assertConsistency(BlobStoreRepository repository) {
final PlainActionFuture<AssertionError> listener = assertConsistencyAsync(repository);
final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
if (err != null) {
throw new AssertionError(err);
}
}

/**
* Same as {@link #assertConsistency(BlobStoreRepository)} but async so it can be used in tests that don't allow blocking.
*/
public static PlainActionFuture<AssertionError> assertConsistencyAsync(BlobStoreRepository repository) {
final PlainActionFuture<AssertionError> future = PlainActionFuture.newFuture();
repository.threadPool().generic().execute(ActionRunnable.wrap(future, listener -> {
try {
final BlobContainer blobContainer = repository.blobContainer();
final long latestGen;
Expand All @@ -113,15 +112,12 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex
assertIndexUUIDs(repository, repositoryData);
assertSnapshotUUIDs(repository, repositoryData);
assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
return null;
listener.onResponse(null);
} catch (AssertionError e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we're catching the AssertionError here in order to re-throw it on the calling thread so that we can use this within assertBusy() for the third-party tests that allow for S3 to be eventually-consistent. Do we need that any more, now that S3 is properly consistent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question ... I guess you could make the same argument for other eventual consistency code (mainly EventuallyConsistentMockRepository and just drop that stuff as well. But I always figured that we may still have other S3 compatible implementations out there in the wild that are eventually consistent and so it's nice to have this stuff in tests for third party testing? (not sure ... I don't know of any and would also be happy to drop all of it to save some LoC :))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could reasonably say a third (fourth?) party implementation that has weaker consistency than the real S3 is not compatible. The repo analysis API fails on inconsistent listings:

final RepositoryVerificationException repositoryVerificationException = new RepositoryVerificationException(
request.repositoryName,
"expected blobs " + missingBlobs + " missing in [" + request.repositoryName + ":" + blobPath + "]"
);
logger.debug("failing due to missing blobs", repositoryVerificationException);
fail(repositoryVerificationException);

Copy link
Member Author

@original-brownbear original-brownbear Jun 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point, I'll open a follow-up that drops all these tests in a bit then :)

return e;
listener.onResponse(e);
}
}));
final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
if (err != null) {
throw new AssertionError(err);
}
return future;
}

private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void assertRepoConsistency() {
clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get();
clusterAdmin().prepareCleanupRepository(name).get();
}
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
BlobStoreTestUtil.assertConsistency(getRepositoryOnMaster(name));
});
} else {
logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason);
Expand Down