Skip to content

Commit 6736cf5

Browse files
Simplify Snapshot Initialization (#51256)
We were loading `RepositoryData` twice during snapshot initialization, redundantly checking if a snapshot existed already. The first snapshot existence check is somewhat redundant because a snapshot could be created between loading `RepositoryData` and updating the cluster state with the `INIT` state snapshot entry. Also, it is much safer to do the subsequent checks for index existence in the repo and and the presence of old version snapshots once the `INIT` state entry prevents further snapshots from being created concurrently. While the current state of things will never lead to corruption on a concurrent snapshot creation, it could result in a situation (though unlikely) where all the snapshot's work is done on the data nodes, only to find out that the repository generation was off during snapshot finalization, failing there and leaving a bunch of dead data in the repository that won't be used in a subsequent snapshot (because the shard generation was never referenced due to the failed snapshot finalization). Note: This is a step on the way to parallel repository operations by making snapshot related CS and repo related CS more tightly correlated.
1 parent 8a6d68b commit 6736cf5

File tree

2 files changed

+103
-89
lines changed

2 files changed

+103
-89
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,12 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
140140
useShardGenerations);
141141
}
142142

143+
public Entry(Entry entry, State state, List<IndexId> indices, long repositoryStateId,
144+
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, boolean useShardGenerations, String failure) {
145+
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards,
146+
failure, entry.userMetadata, useShardGenerations);
147+
}
148+
143149
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
144150
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
145151
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 97 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that
108108
* no snapshot is currently running and registers the new snapshot in cluster state</li>
109109
* <li>When cluster state is updated
110-
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes
110+
* the {@link #beginSnapshot} method kicks in and initializes
111111
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
112112
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
113113
* start processing them through {@link SnapshotShardsService#startNewSnapshots} method</li>
@@ -268,90 +268,85 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
268268
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
269269
validate(repositoryName, snapshotName);
270270
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
271-
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
272-
repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener);
273-
repositoryDataListener.whenComplete(repositoryData -> {
274-
final boolean hasOldFormatSnapshots = hasOldVersionSnapshots(repositoryName, repositoryData, null);
275-
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
276-
277-
private SnapshotsInProgress.Entry newSnapshot = null;
278-
279-
@Override
280-
public ClusterState execute(ClusterState currentState) {
281-
validate(repositoryName, snapshotName, currentState);
282-
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
283-
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
284-
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
285-
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
286-
}
287-
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
288-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
289-
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
290-
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
291-
}
292-
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
293-
if (snapshots == null || snapshots.entries().isEmpty()) {
294-
// Store newSnapshot here to be processed in clusterStateProcessed
295-
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
296-
request.indicesOptions(), request.indices()));
297-
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
298-
List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
299-
newSnapshot = new SnapshotsInProgress.Entry(
300-
new Snapshot(repositoryName, snapshotId),
301-
request.includeGlobalState(), request.partial(),
302-
State.INIT,
303-
snapshotIndices,
304-
threadPool.absoluteTimeInMillis(),
305-
repositoryData.getGenId(),
306-
null,
307-
request.userMetadata(),
308-
hasOldFormatSnapshots == false &&
309-
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
310-
initializingSnapshots.add(newSnapshot.snapshot());
311-
snapshots = new SnapshotsInProgress(newSnapshot);
312-
} else {
313-
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
314-
}
315-
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
271+
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
272+
273+
private SnapshotsInProgress.Entry newSnapshot = null;
274+
275+
private List<String> indices;
276+
277+
@Override
278+
public ClusterState execute(ClusterState currentState) {
279+
validate(repositoryName, snapshotName, currentState);
280+
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
281+
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
282+
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
283+
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
316284
}
285+
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
286+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
287+
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
288+
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
289+
}
290+
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
291+
if (snapshots != null && snapshots.entries().isEmpty() == false) {
292+
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
293+
}
294+
// Store newSnapshot here to be processed in clusterStateProcessed
295+
indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
296+
request.indicesOptions(), request.indices()));
297+
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
298+
newSnapshot = new SnapshotsInProgress.Entry(
299+
new Snapshot(repositoryName, snapshotId),
300+
request.includeGlobalState(), request.partial(),
301+
State.INIT,
302+
Collections.emptyList(), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot
303+
threadPool.absoluteTimeInMillis(),
304+
RepositoryData.UNKNOWN_REPO_GEN,
305+
null,
306+
request.userMetadata(), false
307+
);
308+
initializingSnapshots.add(newSnapshot.snapshot());
309+
snapshots = new SnapshotsInProgress(newSnapshot);
310+
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
311+
}
317312

318-
@Override
319-
public void onFailure(String source, Exception e) {
320-
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
321-
if (newSnapshot != null) {
322-
initializingSnapshots.remove(newSnapshot.snapshot());
323-
}
324-
newSnapshot = null;
325-
listener.onFailure(e);
313+
@Override
314+
public void onFailure(String source, Exception e) {
315+
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
316+
if (newSnapshot != null) {
317+
initializingSnapshots.remove(newSnapshot.snapshot());
326318
}
319+
newSnapshot = null;
320+
listener.onFailure(e);
321+
}
327322

328-
@Override
329-
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
330-
if (newSnapshot != null) {
331-
final Snapshot current = newSnapshot.snapshot();
332-
assert initializingSnapshots.contains(current);
333-
beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<>() {
334-
@Override
335-
public void onResponse(final Snapshot snapshot) {
336-
initializingSnapshots.remove(snapshot);
337-
listener.onResponse(snapshot);
338-
}
323+
@Override
324+
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
325+
if (newSnapshot != null) {
326+
final Snapshot current = newSnapshot.snapshot();
327+
assert initializingSnapshots.contains(current);
328+
assert indices != null;
329+
beginSnapshot(newState, newSnapshot, request.partial(), indices, new ActionListener<>() {
330+
@Override
331+
public void onResponse(final Snapshot snapshot) {
332+
initializingSnapshots.remove(snapshot);
333+
listener.onResponse(snapshot);
334+
}
339335

340-
@Override
341-
public void onFailure(final Exception e) {
342-
initializingSnapshots.remove(current);
343-
listener.onFailure(e);
344-
}
345-
});
346-
}
336+
@Override
337+
public void onFailure(final Exception e) {
338+
initializingSnapshots.remove(current);
339+
listener.onFailure(e);
340+
}
341+
});
347342
}
343+
}
348344

349-
@Override
350-
public TimeValue timeout() {
351-
return request.masterNodeTimeout();
352-
}
353-
});
354-
}, listener::onFailure);
345+
@Override
346+
public TimeValue timeout() {
347+
return request.masterNodeTimeout();
348+
}
349+
});
355350
}
356351

357352
public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) {
@@ -436,6 +431,7 @@ private static void validate(final String repositoryName, final String snapshotN
436431
private void beginSnapshot(final ClusterState clusterState,
437432
final SnapshotsInProgress.Entry snapshot,
438433
final boolean partial,
434+
final List<String> indices,
439435
final ActionListener<Snapshot> userCreateSnapshotListener) {
440436
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
441437

@@ -460,13 +456,20 @@ protected void doRun() {
460456
throw new InvalidSnapshotNameException(
461457
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
462458
}
459+
463460
snapshotCreated = true;
464461

465462
logger.info("snapshot [{}] started", snapshot.snapshot());
466-
if (snapshot.indices().isEmpty()) {
463+
final boolean hasOldFormatSnapshots =
464+
hasOldVersionSnapshots(snapshot.snapshot().getRepository(), repositoryData, null);
465+
final boolean writeShardGenerations = hasOldFormatSnapshots == false &&
466+
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION);
467+
if (indices.isEmpty()) {
467468
// No indices in this snapshot - we are done
468469
userCreateSnapshotListener.onResponse(snapshot.snapshot());
469-
endSnapshot(snapshot, clusterState.metaData());
470+
endSnapshot(new SnapshotsInProgress.Entry(
471+
snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, writeShardGenerations,
472+
null), clusterState.metaData());
470473
return;
471474
}
472475
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
@@ -486,8 +489,10 @@ public ClusterState execute(ClusterState currentState) {
486489
assert entry.shards().isEmpty();
487490
hadAbortedInitializations = true;
488491
} else {
492+
final List<IndexId> indexIds = repositoryData.resolveNewIndices(indices);
489493
// Replace the snapshot that was just initialized
490-
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(currentState, entry, repositoryData);
494+
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
495+
shards(currentState, indexIds, writeShardGenerations, repositoryData);
491496
if (!partial) {
492497
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
493498
currentState.metaData());
@@ -506,12 +511,13 @@ public ClusterState execute(ClusterState currentState) {
506511
failureMessage.append("Indices are closed ");
507512
failureMessage.append(closed);
508513
}
509-
entries.add(
510-
new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString()));
514+
entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, indexIds,
515+
repositoryData.getGenId(), shards, writeShardGenerations, failureMessage.toString()));
511516
continue;
512517
}
513518
}
514-
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards));
519+
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, indexIds, repositoryData.getGenId(),
520+
shards, writeShardGenerations, null));
515521
}
516522
}
517523
return ClusterState.builder(currentState)
@@ -1493,17 +1499,19 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
14931499
/**
14941500
* Calculates the list of shards that should be included into the current snapshot
14951501
*
1496-
* @param clusterState cluster state
1497-
* @param snapshot SnapshotsInProgress Entry
1502+
* @param clusterState cluster state
1503+
* @param indices Indices to snapshot
1504+
* @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot
14981505
* @return list of shard to be included into current snapshot
14991506
*/
15001507
private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState,
1501-
SnapshotsInProgress.Entry snapshot,
1508+
List<IndexId> indices,
1509+
boolean useShardGenerations,
15021510
RepositoryData repositoryData) {
15031511
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
15041512
MetaData metaData = clusterState.metaData();
15051513
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
1506-
for (IndexId index : snapshot.indices()) {
1514+
for (IndexId index : indices) {
15071515
final String indexName = index.getName();
15081516
final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false;
15091517
IndexMetaData indexMetaData = metaData.index(indexName);
@@ -1516,7 +1524,7 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
15161524
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
15171525
ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
15181526
final String shardRepoGeneration;
1519-
if (snapshot.useShardGenerations()) {
1527+
if (useShardGenerations) {
15201528
if (isNewIndex) {
15211529
assert shardGenerations.getShardGen(index, shardId.getId()) == null
15221530
: "Found shard generation for new index [" + index + "]";

0 commit comments

Comments
 (0)