Skip to content

Implement Eventually Consistent Mock Repository for SnapshotResiliencyTests #40893

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
6f24de1
Add Eventually Consistent Mock Repository
original-brownbear Apr 4, 2019
15fe43f
bck
original-brownbear Apr 4, 2019
233ef74
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 5, 2019
c346c3d
better
original-brownbear Apr 5, 2019
ab8da42
repeat 5000
original-brownbear Apr 5, 2019
8713682
repeat 5000
original-brownbear Apr 5, 2019
6d7c993
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 5, 2019
86e3d62
bck
original-brownbear Apr 5, 2019
e7587cb
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 17, 2019
e320ebc
CR: Fix incorrect consistency and deleting of empty trees
original-brownbear Apr 17, 2019
81a4ad6
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 17, 2019
7b5c2e6
CR: Fix incorrect consistency and deleting of empty trees
original-brownbear Apr 17, 2019
dfffe74
handle empty dirs like on S3/GCS
original-brownbear Apr 17, 2019
bc751fd
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 18, 2019
ea12df2
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 19, 2019
0f35d33
CR: fix failing to use cached misses in read and exists calls
original-brownbear Apr 19, 2019
4aaaecb
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 19, 2019
83a7d67
Merge remote-tracking branch 'elastic' into eventually-consistent-moc…
original-brownbear Apr 21, 2019
7f1f7eb
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 23, 2019
beef086
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 24, 2019
cf9a39d
Make things nicer loooking
original-brownbear Apr 24, 2019
dd0f92f
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 24, 2019
ba6abf6
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Apr 26, 2019
228ece6
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear May 7, 2019
f8df217
repro #41898
original-brownbear May 7, 2019
f5ce1d6
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear May 8, 2019
c662f18
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear May 10, 2019
0347ae0
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear May 10, 2019
bc7aac2
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Jun 24, 2019
0f310ad
CR: Adjust implementation to in memory list of all operations
original-brownbear Jun 24, 2019
573c9ae
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Jun 24, 2019
1e50b85
shorter diff
original-brownbear Jun 24, 2019
f0427aa
shorter diff
original-brownbear Jun 24, 2019
7d09033
Merge branch 'eventually-consistent-mock-repo' of github.com:original…
original-brownbear Jul 15, 2019
c73f33b
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Jul 15, 2019
3c94924
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Jul 16, 2019
d1f1e79
CR: Test, fix delete case, dry up ensure open
original-brownbear Jul 16, 2019
f00ba32
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Jul 16, 2019
dc8bac3
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Jul 16, 2019
58a9d42
CR: much stronger overwrite blob assertions, some cleanups, cleaner t…
original-brownbear Jul 16, 2019
7683dcc
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Jul 16, 2019
07bc1cf
Merge remote-tracking branch 'elastic/master' into eventually-consist…
original-brownbear Jul 17, 2019
0afcaa3
blobExists is gone :)
original-brownbear Jul 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,20 @@ public String toString() {
}
return sb.toString();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return paths.equals(((BlobPath) o).paths);
}

@Override
public int hashCode() {
return paths.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -155,6 +156,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.snapshots.mockstore.MockEventuallyConsistentRepository;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.NetworkDisruption;
Expand All @@ -167,10 +169,16 @@
import org.junit.After;
import org.junit.Before;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -208,9 +216,18 @@ public class SnapshotResiliencyTests extends ESTestCase {

private Path tempDir;

/**
* Context shared by all the node's {@link Repository} instances if the eventually consistent blobstore is to be used.
* {@code null} if not using the eventually consistent blobstore.
*/
@Nullable private MockEventuallyConsistentRepository.Context blobStoreContext;

@Before
public void createServices() {
tempDir = createTempDir();
if (randomBoolean()) {
blobStoreContext = new MockEventuallyConsistentRepository.Context();
}
deterministicTaskQueue =
new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "shared").build(), random());
}
Expand Down Expand Up @@ -523,10 +540,11 @@ public void run() {
private void assertNoStaleRepositoryData() throws IOException {
final Path repoPath = tempDir.resolve("repo").toAbsolutePath();
final List<Path> repos;
try (Stream<Path> reposDir = Files.list(repoPath)) {
try (Stream<Path> reposDir = repoFilesByPrefix(repoPath)) {
repos = reposDir.filter(s -> s.getFileName().toString().startsWith("extra") == false).collect(Collectors.toList());
}
for (Path repoRoot : repos) {
cleanupEmptyTrees(repoRoot);
final Path latestIndexGenBlob = repoRoot.resolve("index.latest");
assertTrue("Could not find index.latest blob for repo at [" + repoRoot + ']', Files.exists(latestIndexGenBlob));
final long latestGen = ByteBuffer.wrap(Files.readAllBytes(latestIndexGenBlob)).getLong(0);
Expand All @@ -542,8 +560,37 @@ private void assertNoStaleRepositoryData() throws IOException {
}
}

// Lucene's mock file system randomly generates empty `extra0` files that break the deletion of blob-store directories.
// We clean those up here before checking a blob-store for stale files in this test.
private void cleanupEmptyTrees(Path repoPath) {
try {
Files.walkFileTree(repoPath, new SimpleFileVisitor<>() {

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (file.getFileName().toString().startsWith("extra")) {
Files.delete(file);
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
try {
Files.delete(dir);
} catch (DirectoryNotEmptyException e) {
// We're only interested in deleting empty trees here, just ignore directories with content
}
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
throw new AssertionError(e);
}
}

private static void assertIndexGenerations(Path repoRoot, long latestGen) throws IOException {
try (Stream<Path> repoRootBlobs = Files.list(repoRoot)) {
try (Stream<Path> repoRootBlobs = repoFilesByPrefix(repoRoot)) {
final long[] indexGenerations = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith("index-"))
.map(p -> p.getFileName().toString().replace("index-", ""))
.mapToLong(Long::parseLong).sorted().toArray();
Expand All @@ -555,7 +602,7 @@ private static void assertIndexGenerations(Path repoRoot, long latestGen) throws
private static void assertIndexUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException {
final List<String> expectedIndexUUIDs =
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList());
try (Stream<Path> indexRoots = Files.list(repoRoot.resolve("indices"))) {
try (Stream<Path> indexRoots = repoFilesByPrefix(repoRoot.resolve("indices"))) {
final List<String> foundIndexUUIDs = indexRoots.filter(s -> s.getFileName().toString().startsWith("extra") == false)
.map(p -> p.getFileName().toString()).collect(Collectors.toList());
assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY)));
Expand All @@ -566,7 +613,7 @@ private static void assertSnapshotUUIDs(Path repoRoot, RepositoryData repository
final List<String> expectedSnapshotUUIDs =
repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList());
for (String prefix : new String[]{"snap-", "meta-"}) {
try (Stream<Path> repoRootBlobs = Files.list(repoRoot)) {
try (Stream<Path> repoRootBlobs = repoFilesByPrefix(repoRoot)) {
final Collection<String> foundSnapshotUUIDs = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith(prefix))
.map(p -> p.getFileName().toString().replace(prefix, "").replace(".dat", ""))
.collect(Collectors.toSet());
Expand All @@ -575,6 +622,20 @@ private static void assertSnapshotUUIDs(Path repoRoot, RepositoryData repository
}
}

/**
* List contents of a blob path and return an empty stream if the path doesn't exist.
* @param prefix Path to find children for
* @return stream of child paths
* @throws IOException on failure
*/
private static Stream<Path> repoFilesByPrefix(Path prefix) throws IOException {
try {
return Files.list(prefix);
} catch (FileNotFoundException | NoSuchFileException e) {
return Stream.empty();
}
}

private void clearDisruptionsAndAwaitSync() {
testClusterNodes.clearNetworkDisruptions();
runUntil(() -> {
Expand Down Expand Up @@ -959,19 +1020,7 @@ public void onFailure(final Exception e) {
final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
repositoriesService = new RepositoriesService(
settings, clusterService, transportService,
Collections.singletonMap(FsRepository.TYPE, metaData -> {
final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo in the test thread
}
};
repository.start();
return repository;
}
),
emptyMap(),
threadPool
Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool
);
snapshotsService =
new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool);
Expand Down Expand Up @@ -1152,6 +1201,28 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon
client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
}

private Repository.Factory getRepoFactory(Environment environment) {
// Run half the tests with the eventually consistent repository
if (blobStoreContext == null) {
return metaData -> {
final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo in the test thread
}
};
repository.start();
return repository;
};
} else {
return metaData -> {
final Repository repository = new MockEventuallyConsistentRepository(
metaData, environment, xContentRegistry(), deterministicTaskQueue, blobStoreContext);
repository.start();
return repository;
};
}
}
public void restart() {
testClusterNodes.disconnectNode(this);
final ClusterState oldState = this.clusterService.state();
Expand Down
Loading