|
19 | 19 | package org.elasticsearch.repositories.blobstore;
|
20 | 20 |
|
21 | 21 | import org.elasticsearch.action.ActionRunnable;
|
| 22 | +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; |
22 | 23 | import org.elasticsearch.action.support.PlainActionFuture;
|
23 | 24 | import org.elasticsearch.cluster.RepositoryCleanupInProgress;
|
24 | 25 | import org.elasticsearch.common.settings.Settings;
|
25 | 26 | import org.elasticsearch.common.unit.ByteSizeUnit;
|
26 | 27 | import org.elasticsearch.common.unit.TimeValue;
|
27 | 28 | import org.elasticsearch.repositories.RepositoriesService;
|
28 | 29 | import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
|
| 30 | +import org.elasticsearch.snapshots.SnapshotState; |
29 | 31 | import org.elasticsearch.test.ESIntegTestCase;
|
30 | 32 |
|
31 | 33 | import java.io.ByteArrayInputStream;
|
| 34 | +import java.util.concurrent.ExecutionException; |
32 | 35 | import java.util.concurrent.TimeUnit;
|
33 | 36 |
|
34 | 37 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
35 | 38 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
|
| 39 | +import static org.hamcrest.Matchers.is; |
36 | 40 |
|
37 | 41 | @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
38 | 42 | public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase {
|
@@ -107,4 +111,40 @@ private String startBlockedCleanup(String repoName) throws Exception {
|
107 | 111 | waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
|
108 | 112 | return masterNode;
|
109 | 113 | }
|
| 114 | + |
| 115 | + public void testCleanupOldIndexN() throws ExecutionException, InterruptedException { |
| 116 | + internalCluster().startNodes(Settings.EMPTY); |
| 117 | + |
| 118 | + final String repoName = "test-repo"; |
| 119 | + logger.info("--> creating repository"); |
| 120 | + assertAcked(client().admin().cluster().preparePutRepository(repoName).setType("fs").setSettings(Settings.builder() |
| 121 | + .put("location", randomRepoPath()) |
| 122 | + .put("compress", randomBoolean()) |
| 123 | + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); |
| 124 | + |
| 125 | + logger.info("--> create three snapshots"); |
| 126 | + for (int i = 0; i < 3; ++i) { |
| 127 | + CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap-" + i) |
| 128 | + .setWaitForCompletion(true).get(); |
| 129 | + assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.SUCCESS)); |
| 130 | + } |
| 131 | + |
| 132 | + final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName()); |
| 133 | + final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName); |
| 134 | + |
| 135 | + logger.info("--> write two outdated index-N blobs"); |
| 136 | + for (int i = 0; i < 2; ++i) { |
| 137 | + final PlainActionFuture<Void> createOldIndexNFuture = PlainActionFuture.newFuture(); |
| 138 | + final int generation = i; |
| 139 | + repository.threadPool().generic().execute(ActionRunnable.run(createOldIndexNFuture, () -> repository.blobStore() |
| 140 | + .blobContainer(repository.basePath()).writeBlob(BlobStoreRepository.INDEX_FILE_PREFIX + generation, |
| 141 | + new ByteArrayInputStream(new byte[1]), 1, true))); |
| 142 | + createOldIndexNFuture.get(); |
| 143 | + } |
| 144 | + |
| 145 | + logger.info("--> cleanup repository"); |
| 146 | + client().admin().cluster().prepareCleanupRepository(repoName).get(); |
| 147 | + |
| 148 | + BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic()); |
| 149 | + } |
110 | 150 | }
|
0 commit comments