Skip to content

[7.x] Remove and inline methods in SnapshotsService.deleteSnapshots() #76151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 62 additions & 85 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,7 @@ public ClusterState execute(ClusterState currentState) {
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"
);
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(
repositoryName,
snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
);
}
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
// Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from
// a
Expand Down Expand Up @@ -497,7 +490,7 @@ public ClusterState execute(ClusterState currentState) {
if (concurrentOperationsAllowed == false && runningSnapshots.stream().anyMatch(entry -> entry.state() != State.INIT)) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
Expand Down Expand Up @@ -657,7 +650,7 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener<Void> lis
public ClusterState execute(ClusterState currentState) {
ensureRepositoryExists(repositoryName, currentState);
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot");
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries();
ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
Expand Down Expand Up @@ -730,7 +723,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
}, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
}

private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
private static void ensureNoCleanupInProgress(
final ClusterState currentState,
final String repositoryName,
final String snapshotName,
final String reason
) {
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(
RepositoryCleanupInProgress.TYPE,
RepositoryCleanupInProgress.EMPTY
Expand All @@ -739,7 +737,13 @@ private static void ensureNoCleanupInProgress(ClusterState currentState, String
throw new ConcurrentSnapshotExecutionException(
repositoryName,
snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
"cannot "
+ reason
+ " while a repository cleanup is in-progress in "
+ repositoryCleanupInProgress.entries()
.stream()
.map(RepositoryCleanupInProgress.Entry::repository)
.collect(Collectors.toSet())
);
}
}
Expand Down Expand Up @@ -2508,18 +2512,17 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
* @param listener listener
*/
public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener<Void> listener) {

final String repositoryName = request.repository();
final String[] snapshotNames = request.snapshots();
final String repoName = request.repository();
logger.info(
() -> new ParameterizedMessage(
"deleting snapshots [{}] from repository [{}]",
Strings.arrayToCommaDelimitedString(snapshotNames),
repoName
repositoryName
)
);

final Repository repository = repositoriesService.repository(repoName);
final Repository repository = repositoriesService.repository(repositoryName);
final String taskDescription = "delete snapshot [" + repository + "]" + Arrays.toString(snapshotNames);
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {

Expand All @@ -2543,17 +2546,46 @@ public ClusterState execute(ClusterState currentState) throws Exception {
+ "]"
);
}
ensureRepositoryExists(repoName, currentState);
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List<SnapshotsInProgress.Entry> snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName);
final List<SnapshotId> snapshotIds = matchingSnapshotIds(
snapshotEntries.stream().map(e -> e.snapshot().getSnapshotId()).collect(Collectors.toList()),
repositoryData,
snapshotNames,
repoName
);
ensureRepositoryExists(repositoryName, currentState);
final List<SnapshotId> snapshotIds = new ArrayList<>();
final List<SnapshotsInProgress.Entry> snapshotEntries = new ArrayList<>();

// find in-progress snapshots to delete in cluster state
final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
final SnapshotId snapshotId = entry.snapshot().getSnapshotId();
if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, snapshotId.getName())) {
snapshotIds.add(snapshotId);
snapshotEntries.add(entry);
}
}

// find snapshots to delete in repository data
final Map<String, SnapshotId> snapshotsIdsInRepository = repositoryData.getSnapshotIds()
.stream()
.collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
for (String snapshotOrPattern : snapshotNames) {
if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
for (Map.Entry<String, SnapshotId> entry : snapshotsIdsInRepository.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
snapshotIds.add(entry.getValue());
}
}
} else {
final SnapshotId foundId = snapshotsIdsInRepository.get(snapshotOrPattern);
if (foundId == null) {
if (snapshotEntries.stream()
.noneMatch(entry -> entry.snapshot().getSnapshotId().getName().equals(snapshotOrPattern))) {
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
}
} else {
snapshotIds.add(foundId);
}
}
}

if (snapshotEntries.isEmpty() || minNodeVersion.onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) {
deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repoName, repositoryData, Priority.NORMAL, listener);
deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repositoryName, repositoryData, Priority.NORMAL, listener);
return deleteFromRepoTask.execute(currentState);
}
assert snapshotEntries.size() == 1 : "Expected just a single running snapshot but saw " + snapshotEntries;
Expand Down Expand Up @@ -2608,7 +2640,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
.putCustom(
SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(
snapshots.entries()
snapshotsInProgress.entries()
.stream()
// remove init state snapshot we found from a previous master if there was one
.filter(existing -> abortedDuringInit == false || existing.equals(snapshotEntry) == false)
Expand Down Expand Up @@ -2648,7 +2680,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
} else {
clusterService.submitStateUpdateTask(
taskDescription,
createDeleteStateUpdate(outstandingDeletes, repoName, repositoryData, Priority.IMMEDIATE, listener)
createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData, Priority.IMMEDIATE, listener)
);
}
return;
Expand All @@ -2658,7 +2690,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
logger.debug("deleted snapshot completed - deleting files");
clusterService.submitStateUpdateTask(
taskDescription,
createDeleteStateUpdate(outstandingDeletes, repoName, result.v1(), Priority.IMMEDIATE, listener)
createDeleteStateUpdate(outstandingDeletes, repositoryName, result.v1(), Priority.IMMEDIATE, listener)
);
}, e -> {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
Expand All @@ -2676,52 +2708,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}, taskDescription, listener::onFailure);
}

private static List<SnapshotId> matchingSnapshotIds(
List<SnapshotId> inProgress,
RepositoryData repositoryData,
String[] snapshotsOrPatterns,
String repositoryName
) {
final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds()
.stream()
.collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
final Set<SnapshotId> foundSnapshots = new HashSet<>(inProgress);
for (String snapshotOrPattern : snapshotsOrPatterns) {
if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
foundSnapshots.add(entry.getValue());
}
}
} else {
final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
if (foundId == null) {
if (inProgress.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) {
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
}
} else {
foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
}
}
}
return Collections.unmodifiableList(new ArrayList<>(foundSnapshots));
}

// Return in-progress snapshot entries by name and repository in the given cluster state or null if none is found
private static List<SnapshotsInProgress.Entry> findInProgressSnapshots(
SnapshotsInProgress snapshots,
String[] snapshotNames,
String repositoryName
) {
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, entry.snapshot().getSnapshotId().getName())) {
entries.add(entry);
}
}
return entries;
}

private ClusterStateUpdateTask createDeleteStateUpdate(
List<SnapshotId> snapshotIds,
String repoName,
Expand Down Expand Up @@ -2777,16 +2763,7 @@ public ClusterState execute(ClusterState currentState) {
);
}
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(
RepositoryCleanupInProgress.TYPE,
RepositoryCleanupInProgress.EMPTY
);
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(
new Snapshot(repoName, snapshotIds.get(0)),
"cannot delete snapshots while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
);
}
ensureNoCleanupInProgress(currentState, repoName, snapshotIds.get(0).getName(), "delete snapshot");
final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
// don't allow snapshot deletions while a restore is taking place,
// otherwise we could end up deleting a snapshot that is being restored
Expand Down