diff --git a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java index 6351d5e2f2bf0..8b7f799d0e7c2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java +++ b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java @@ -54,6 +54,13 @@ private ShardGenerations(Map> shardGenerations) { this.shardGenerations = shardGenerations; } + /** + * Returns the total number of shards tracked by this instance. + */ + public int totalShards() { + return shardGenerations.values().stream().mapToInt(List::size).sum(); + } + /** * Returns all indices for which shard generations are tracked. * diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7c638bc332edc..cfb2112b4044d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -568,16 +568,17 @@ public void onNoLongerMaster() { private void cleanupAfterError(Exception exception) { threadPool.generic().execute(() -> { if (snapshotCreated) { + final MetaData metaData = clusterService.state().metaData(); repositoriesService.repository(snapshot.snapshot().getRepository()) .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), - buildGenerations(snapshot), + buildGenerations(snapshot, metaData), snapshot.startTime(), ExceptionsHelper.stackTrace(exception), 0, Collections.emptyList(), snapshot.repositoryStateId(), snapshot.includeGlobalState(), - metaDataForSnapshot(snapshot, clusterService.state().metaData()), + metaDataForSnapshot(snapshot, metaData), snapshot.userMetadata(), snapshot.useShardGenerations(), ActionListener.runAfter(ActionListener.wrap(ignored -> { @@ -593,11 +594,21 @@ private void cleanupAfterError(Exception exception) { } } - private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) { + private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, MetaData metaData) { ShardGenerations.Builder builder = ShardGenerations.builder(); final Map indexLookup = new HashMap<>(); snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx)); - snapshot.shards().forEach(c -> builder.put(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation())); + snapshot.shards().forEach(c -> { + if (metaData.index(c.key.getIndex()) == null) { + assert snapshot.partial() : + "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial."; + return; + } + final IndexId indexId = indexLookup.get(c.key.getIndexName()); + if (indexId != null) { + builder.put(indexId, c.key.id(), c.value.generation()); + } + }); return builder.build(); } @@ -1032,12 +1043,13 @@ protected void doRun() { shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason())); } } + final ShardGenerations shardGenerations = buildGenerations(entry, metaData); repository.finalizeSnapshot( snapshot.getSnapshotId(), - buildGenerations(entry), + shardGenerations, entry.startTime(), failure, - entry.shards().size(), + entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), unmodifiableList(shardFailures), entry.repositoryStateId(), entry.includeGlobalState(), diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index a0e7e51098681..5fd6624d1d128 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -143,6 +144,7 @@ import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; +import org.elasticsearch.index.Index; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; @@ -211,6 +213,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Mockito.mock; @@ -503,7 +506,7 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { } } - public void testConcurrentSnapshotDeleteAndDeleteIndex() { + public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); String repoName = "repo"; @@ -514,11 +517,13 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() { testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); final StepListener> createIndicesListener = new StepListener<>(); + final int indices = randomIntBetween(5, 20); + final SetOnce firstIndex = new SetOnce<>(); continueOrDie(createRepoAndIndex(repoName, index, 1), createIndexResponse -> { + firstIndex.set(masterNode.clusterService.state().metaData().index(index).getIndex()); // create a few more indices to make it more likely that the subsequent index delete operation happens before snapshot // finalization - final int indices = randomIntBetween(5, 20); final GroupedActionListener listener = new GroupedActionListener<>(createIndicesListener, indices); for (int i = 0; i < indices; ++i) { client().admin().indices().create(new CreateIndexRequest("index-" + i), listener); @@ -527,23 +532,54 @@ public void testConcurrentSnapshotDeleteAndDeleteIndex() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); + final boolean partialSnapshot = randomBoolean(); + continueOrDie(createIndicesListener, createIndexResponses -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(false) - .execute(createSnapshotResponseStepListener)); + .setPartial(partialSnapshot).execute(createSnapshotResponseStepListener)); continueOrDie(createSnapshotResponseStepListener, - createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), noopListener())); + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (partialSnapshot) { + // Recreate index by the same name to test that we don't snapshot conflicting metadata in this scenario + client().admin().indices().create(new CreateIndexRequest(index), noopListener()); + } + } + + @Override + public void onFailure(Exception e) { + if (partialSnapshot) { + throw new AssertionError("Delete index should always work during partial snapshots", e); + } + } + })); deterministicTaskQueue.runAllRunnableTasks(); SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + final RepositoryData repositoryData = getRepositoryData(repository); + Collection snapshotIds = repositoryData.getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + if (partialSnapshot) { + // Single shard for each index so we either get all indices or all except for the deleted index + assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices))); + if (snapshotInfo.successfulShards() == indices + 1) { + final IndexMetaData indexMetaData = + repository.getSnapshotIndexMetaData(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index)); + // Make sure we snapshotted the metadata of this index and not the recreated version + assertEquals(indexMetaData.getIndex(), firstIndex.get()); + } + } else { + // Index delete must be blocked for non-partial snapshots and we get a snapshot for every index + assertEquals(snapshotInfo.successfulShards(), indices + 1); + } assertEquals(0, snapshotInfo.failedShards()); }