|
105 | 105 | import org.elasticsearch.cluster.service.ClusterApplierService;
|
106 | 106 | import org.elasticsearch.cluster.service.ClusterService;
|
107 | 107 | import org.elasticsearch.cluster.service.MasterService;
|
| 108 | +import org.elasticsearch.common.Strings; |
| 109 | +import org.elasticsearch.common.bytes.BytesArray; |
108 | 110 | import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
109 | 111 | import org.elasticsearch.common.network.NetworkModule;
|
110 | 112 | import org.elasticsearch.common.settings.ClusterSettings;
|
|
115 | 117 | import org.elasticsearch.common.util.PageCacheRecycler;
|
116 | 118 | import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
117 | 119 | import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
| 120 | +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; |
118 | 121 | import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
| 122 | +import org.elasticsearch.common.xcontent.XContentHelper; |
| 123 | +import org.elasticsearch.common.xcontent.XContentParser; |
| 124 | +import org.elasticsearch.common.xcontent.XContentType; |
119 | 125 | import org.elasticsearch.env.Environment;
|
120 | 126 | import org.elasticsearch.env.NodeEnvironment;
|
121 | 127 | import org.elasticsearch.env.TestEnvironment;
|
|
140 | 146 | import org.elasticsearch.ingest.IngestService;
|
141 | 147 | import org.elasticsearch.node.ResponseCollectorService;
|
142 | 148 | import org.elasticsearch.plugins.PluginsService;
|
| 149 | +import org.elasticsearch.repositories.IndexId; |
143 | 150 | import org.elasticsearch.repositories.RepositoriesService;
|
144 | 151 | import org.elasticsearch.repositories.Repository;
|
| 152 | +import org.elasticsearch.repositories.RepositoryData; |
145 | 153 | import org.elasticsearch.repositories.fs.FsRepository;
|
146 | 154 | import org.elasticsearch.script.ScriptService;
|
147 | 155 | import org.elasticsearch.search.SearchService;
|
|
159 | 167 | import org.junit.After;
|
160 | 168 | import org.junit.Before;
|
161 | 169 |
|
| 170 | +import java.io.FileNotFoundException; |
162 | 171 | import java.io.IOException;
|
| 172 | +import java.nio.ByteBuffer; |
| 173 | +import java.nio.file.DirectoryNotEmptyException; |
| 174 | +import java.nio.file.FileVisitResult; |
| 175 | +import java.nio.file.Files; |
| 176 | +import java.nio.file.NoSuchFileException; |
163 | 177 | import java.nio.file.Path;
|
| 178 | +import java.nio.file.SimpleFileVisitor; |
| 179 | +import java.nio.file.attribute.BasicFileAttributes; |
164 | 180 | import java.util.Collection;
|
165 | 181 | import java.util.Collections;
|
166 | 182 | import java.util.Comparator;
|
@@ -206,8 +222,12 @@ public void createServices() {
|
206 | 222 | }
|
207 | 223 |
|
208 | 224 | @After
|
209 |
| - public void stopServices() { |
210 |
| - testClusterNodes.nodes.values().forEach(TestClusterNode::stop); |
| 225 | + public void verifyReposThenStopServices() throws IOException { |
| 226 | + try { |
| 227 | + assertNoStaleRepositoryData(); |
| 228 | + } finally { |
| 229 | + testClusterNodes.nodes.values().forEach(TestClusterNode::stop); |
| 230 | + } |
211 | 231 | }
|
212 | 232 |
|
213 | 233 | public void testSuccessfulSnapshotAndRestore() {
|
@@ -364,7 +384,6 @@ public void testSnapshotWithNodeDisconnects() {
|
364 | 384 | assertThat(snapshotIds, hasSize(1));
|
365 | 385 | }
|
366 | 386 |
|
367 |
| - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41326") |
368 | 387 | public void testConcurrentSnapshotCreateAndDelete() {
|
369 | 388 | setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
|
370 | 389 |
|
@@ -414,7 +433,6 @@ public void testConcurrentSnapshotCreateAndDelete() {
|
414 | 433 | * Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently
|
415 | 434 | * deleting a snapshot.
|
416 | 435 | */
|
417 |
| - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41326") |
418 | 436 | public void testSnapshotPrimaryRelocations() {
|
419 | 437 | final int masterNodeCount = randomFrom(1, 3, 5);
|
420 | 438 | setupTestCluster(masterNodeCount, randomIntBetween(2, 10));
|
@@ -504,6 +522,109 @@ public void run() {
|
504 | 522 | assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));
|
505 | 523 | }
|
506 | 524 |
|
| 525 | + /** |
| 526 | + * Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository. |
| 527 | + * TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata |
| 528 | + */ |
| 529 | + private void assertNoStaleRepositoryData() throws IOException { |
| 530 | + final Path repoPath = tempDir.resolve("repo").toAbsolutePath(); |
| 531 | + final List<Path> repos; |
| 532 | + try (Stream<Path> reposDir = repoFilesByPrefix(repoPath)) { |
| 533 | + repos = reposDir.filter(s -> s.getFileName().toString().startsWith("extra") == false).collect(Collectors.toList()); |
| 534 | + } |
| 535 | + for (Path repoRoot : repos) { |
| 536 | + cleanupEmptyTrees(repoRoot); |
| 537 | + final Path latestIndexGenBlob = repoRoot.resolve("index.latest"); |
| 538 | + assertTrue("Could not find index.latest blob for repo at [" + repoRoot + ']', Files.exists(latestIndexGenBlob)); |
| 539 | + final long latestGen = ByteBuffer.wrap(Files.readAllBytes(latestIndexGenBlob)).getLong(0); |
| 540 | + assertIndexGenerations(repoRoot, latestGen); |
| 541 | + final RepositoryData repositoryData; |
| 542 | + try (XContentParser parser = |
| 543 | + XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, |
| 544 | + new BytesArray(Files.readAllBytes(repoRoot.resolve("index-" + latestGen))), XContentType.JSON)) { |
| 545 | + repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); |
| 546 | + } |
| 547 | + assertIndexUUIDs(repoRoot, repositoryData); |
| 548 | + assertSnapshotUUIDs(repoRoot, repositoryData); |
| 549 | + } |
| 550 | + } |
| 551 | + |
| 552 | + // Lucene's mock file system randomly generates empty `extra0` files that break the deletion of blob-store directories. |
| 553 | + // We clean those up here before checking a blob-store for stale files. |
| 554 | + private void cleanupEmptyTrees(Path repoPath) { |
| 555 | + try { |
| 556 | + Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>() { |
| 557 | + |
| 558 | + @Override |
| 559 | + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { |
| 560 | + if (file.getFileName().toString().startsWith("extra")) { |
| 561 | + Files.delete(file); |
| 562 | + } |
| 563 | + return FileVisitResult.CONTINUE; |
| 564 | + } |
| 565 | + |
| 566 | + @Override |
| 567 | + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { |
| 568 | + try { |
| 569 | + Files.delete(dir); |
| 570 | + } catch (DirectoryNotEmptyException e) { |
| 571 | + // We're only interested in deleting empty trees here, just ignore directories with content |
| 572 | + } |
| 573 | + return FileVisitResult.CONTINUE; |
| 574 | + } |
| 575 | + }); |
| 576 | + } catch (IOException e) { |
| 577 | + throw new AssertionError(e); |
| 578 | + } |
| 579 | + } |
| 580 | + |
| 581 | + private static void assertIndexGenerations(Path repoRoot, long latestGen) throws IOException { |
| 582 | + try (Stream<Path> repoRootBlobs = repoFilesByPrefix(repoRoot)) { |
| 583 | + final long[] indexGenerations = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith("index-")) |
| 584 | + .map(p -> p.getFileName().toString().replace("index-", "")) |
| 585 | + .mapToLong(Long::parseLong).sorted().toArray(); |
| 586 | + assertEquals(latestGen, indexGenerations[indexGenerations.length - 1]); |
| 587 | + assertTrue(indexGenerations.length <= 2); |
| 588 | + } |
| 589 | + } |
| 590 | + |
| 591 | + private static void assertIndexUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException { |
| 592 | + final List<String> expectedIndexUUIDs = |
| 593 | + repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList()); |
| 594 | + try (Stream<Path> indexRoots = repoFilesByPrefix(repoRoot.resolve("indices"))) { |
| 595 | + final List<String> foundIndexUUIDs = indexRoots.filter(s -> s.getFileName().toString().startsWith("extra") == false) |
| 596 | + .map(p -> p.getFileName().toString()).collect(Collectors.toList()); |
| 597 | + assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY))); |
| 598 | + } |
| 599 | + } |
| 600 | + |
| 601 | + private static void assertSnapshotUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException { |
| 602 | + final List<String> expectedSnapshotUUIDs = |
| 603 | + repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList()); |
| 604 | + for (String prefix : new String[]{"snap-", "meta-"}) { |
| 605 | + try (Stream<Path> repoRootBlobs = repoFilesByPrefix(repoRoot)) { |
| 606 | + final Collection<String> foundSnapshotUUIDs = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith(prefix)) |
| 607 | + .map(p -> p.getFileName().toString().replace(prefix, "").replace(".dat", "")) |
| 608 | + .collect(Collectors.toSet()); |
| 609 | + assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY))); |
| 610 | + } |
| 611 | + } |
| 612 | + } |
| 613 | + |
| 614 | + /** |
| 615 | + * List contents of a blob path and return an empty stream if the path doesn't exist. |
| 616 | + * @param prefix Path to find children for |
| 617 | + * @return stream of child paths |
| 618 | + * @throws IOException on failure |
| 619 | + */ |
| 620 | + private static Stream<Path> repoFilesByPrefix(Path prefix) throws IOException { |
| 621 | + try { |
| 622 | + return Files.list(prefix); |
| 623 | + } catch (FileNotFoundException | NoSuchFileException e) { |
| 624 | + return Stream.empty(); |
| 625 | + } |
| 626 | + } |
| 627 | + |
507 | 628 | private void clearDisruptionsAndAwaitSync() {
|
508 | 629 | testClusterNodes.clearNetworkDisruptions();
|
509 | 630 | runUntil(() -> {
|
|
0 commit comments