-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Remove and inline methods in SnapshotsService.deleteSnapshots() #76079
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
tlrx
merged 2 commits into
elastic:master
from
tlrx:minor-changes-in-delete-snapshots-method
Aug 5, 2021
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -302,7 +302,7 @@ public ClusterState execute(ClusterState currentState) { | |
SnapshotDeletionsInProgress.TYPE, | ||
SnapshotDeletionsInProgress.EMPTY | ||
); | ||
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); | ||
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot"); | ||
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress); | ||
// Store newSnapshot here to be processed in clusterStateProcessed | ||
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); | ||
|
@@ -461,7 +461,7 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener<Void> lis | |
public ClusterState execute(ClusterState currentState) { | ||
ensureRepositoryExists(repositoryName, currentState); | ||
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); | ||
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); | ||
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot"); | ||
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); | ||
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots.entries(); | ||
ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName); | ||
|
@@ -534,7 +534,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl | |
}, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure); | ||
} | ||
|
||
private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) { | ||
private static void ensureNoCleanupInProgress( | ||
final ClusterState currentState, | ||
final String repositoryName, | ||
final String snapshotName, | ||
final String reason | ||
) { | ||
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( | ||
RepositoryCleanupInProgress.TYPE, | ||
RepositoryCleanupInProgress.EMPTY | ||
|
@@ -543,7 +548,13 @@ private static void ensureNoCleanupInProgress(ClusterState currentState, String | |
throw new ConcurrentSnapshotExecutionException( | ||
repositoryName, | ||
snapshotName, | ||
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]" | ||
"cannot " | ||
+ reason | ||
+ " while a repository cleanup is in-progress in " | ||
+ repositoryCleanupInProgress.entries() | ||
.stream() | ||
.map(RepositoryCleanupInProgress.Entry::repository) | ||
.collect(Collectors.toSet()) | ||
); | ||
} | ||
} | ||
|
@@ -2021,18 +2032,17 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) { | |
* @param listener listener | ||
*/ | ||
public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener<Void> listener) { | ||
|
||
final String repositoryName = request.repository(); | ||
final String[] snapshotNames = request.snapshots(); | ||
final String repoName = request.repository(); | ||
logger.info( | ||
() -> new ParameterizedMessage( | ||
"deleting snapshots [{}] from repository [{}]", | ||
Strings.arrayToCommaDelimitedString(snapshotNames), | ||
repoName | ||
repositoryName | ||
) | ||
); | ||
|
||
final Repository repository = repositoriesService.repository(repoName); | ||
final Repository repository = repositoriesService.repository(repositoryName); | ||
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) { | ||
|
||
private SnapshotDeletionsInProgress.Entry newDelete = null; | ||
|
@@ -2049,61 +2059,87 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis | |
|
||
@Override | ||
public ClusterState execute(ClusterState currentState) { | ||
ensureRepositoryExists(repoName, currentState); | ||
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); | ||
final List<SnapshotsInProgress.Entry> snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName); | ||
final List<SnapshotId> snapshotIds = matchingSnapshotIds( | ||
snapshotEntries.stream().map(e -> e.snapshot().getSnapshotId()).collect(Collectors.toList()), | ||
repositoryData, | ||
snapshotNames, | ||
repoName | ||
); | ||
ensureRepositoryExists(repositoryName, currentState); | ||
final Set<SnapshotId> snapshotIds = new HashSet<>(); | ||
|
||
// find in-progress snapshots to delete in cluster state | ||
final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); | ||
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { | ||
final SnapshotId snapshotId = entry.snapshot().getSnapshotId(); | ||
if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, snapshotId.getName())) { | ||
snapshotIds.add(snapshotId); | ||
} | ||
} | ||
|
||
// find snapshots to delete in repository data | ||
final Map<String, SnapshotId> snapshotsIdsInRepository = repositoryData.getSnapshotIds() | ||
.stream() | ||
.collect(Collectors.toMap(SnapshotId::getName, Function.identity())); | ||
for (String snapshotOrPattern : snapshotNames) { | ||
if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { | ||
for (Map.Entry<String, SnapshotId> entry : snapshotsIdsInRepository.entrySet()) { | ||
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { | ||
snapshotIds.add(entry.getValue()); | ||
} | ||
} | ||
} else { | ||
final SnapshotId foundId = snapshotsIdsInRepository.get(snapshotOrPattern); | ||
if (foundId == null) { | ||
if (snapshotIds.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) { | ||
throw new SnapshotMissingException(repositoryName, snapshotOrPattern); | ||
} | ||
} else { | ||
snapshotIds.add(foundId); | ||
} | ||
} | ||
} | ||
|
||
if (snapshotIds.isEmpty()) { | ||
return currentState; | ||
} | ||
final Set<SnapshotId> activeCloneSources = snapshots.entries() | ||
|
||
final Set<SnapshotId> activeCloneSources = snapshotsInProgress.entries() | ||
.stream() | ||
.filter(SnapshotsInProgress.Entry::isClone) | ||
.map(SnapshotsInProgress.Entry::source) | ||
.collect(Collectors.toSet()); | ||
for (SnapshotId snapshotId : snapshotIds) { | ||
if (activeCloneSources.contains(snapshotId)) { | ||
throw new ConcurrentSnapshotExecutionException( | ||
new Snapshot(repoName, snapshotId), | ||
new Snapshot(repositoryName, snapshotId), | ||
"cannot delete snapshot while it is being cloned" | ||
); | ||
} | ||
} | ||
|
||
ensureNoCleanupInProgress( | ||
currentState, | ||
repositoryName, | ||
snapshotIds.stream().findFirst().get().getName(), | ||
"delete snapshot" | ||
); | ||
|
||
final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( | ||
SnapshotDeletionsInProgress.TYPE, | ||
SnapshotDeletionsInProgress.EMPTY | ||
); | ||
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( | ||
RepositoryCleanupInProgress.TYPE, | ||
RepositoryCleanupInProgress.EMPTY | ||
); | ||
if (repositoryCleanupInProgress.hasCleanupInProgress()) { | ||
throw new ConcurrentSnapshotExecutionException( | ||
new Snapshot(repoName, snapshotIds.get(0)), | ||
"cannot delete snapshots while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]" | ||
); | ||
} | ||
|
||
final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); | ||
// don't allow snapshot deletions while a restore is taking place, | ||
// otherwise we could end up deleting a snapshot that is being restored | ||
// and the files the restore depends on would all be gone | ||
|
||
for (RestoreInProgress.Entry entry : restoreInProgress) { | ||
if (repoName.equals(entry.snapshot().getRepository()) && snapshotIds.contains(entry.snapshot().getSnapshotId())) { | ||
if (repositoryName.equals(entry.snapshot().getRepository()) && snapshotIds.contains(entry.snapshot().getSnapshotId())) { | ||
throw new ConcurrentSnapshotExecutionException( | ||
new Snapshot(repoName, snapshotIds.get(0)), | ||
new Snapshot(repositoryName, snapshotIds.stream().findFirst().get()), | ||
"cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]" | ||
); | ||
} | ||
} | ||
// Snapshot ids that will have to be physically deleted from the repository | ||
final Set<SnapshotId> snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds); | ||
final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream().map(existing -> { | ||
final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress.of(snapshotsInProgress.entries().stream().map(existing -> { | ||
if (existing.state() == State.STARTED && snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) { | ||
// snapshot is started - mark every non completed shard as aborted | ||
final SnapshotsInProgress.Entry abortedEntry = existing.abort(); | ||
|
@@ -2130,14 +2166,15 @@ public ClusterState execute(ClusterState currentState) { | |
// add the snapshot deletion to the cluster state | ||
final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries() | ||
.stream() | ||
.filter(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.WAITING) | ||
.filter(entry -> entry.repository().equals(repositoryName)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why split into multiple filters here? (doesn't matter much here but it's slightly harder to read IMO) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find it easier that way 😅 |
||
.filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.WAITING) | ||
.findFirst() | ||
.orElse(null); | ||
if (replacedEntry == null) { | ||
final Optional<SnapshotDeletionsInProgress.Entry> foundDuplicate = deletionsInProgress.getEntries() | ||
.stream() | ||
.filter( | ||
entry -> entry.repository().equals(repoName) | ||
entry -> entry.repository().equals(repositoryName) | ||
&& entry.state() == SnapshotDeletionsInProgress.State.STARTED | ||
&& entry.getSnapshots().containsAll(snapshotIds) | ||
) | ||
|
@@ -2149,14 +2186,14 @@ public ClusterState execute(ClusterState currentState) { | |
} | ||
newDelete = new SnapshotDeletionsInProgress.Entry( | ||
List.copyOf(snapshotIdsRequiringCleanup), | ||
repoName, | ||
repositoryName, | ||
threadPool.absoluteTimeInMillis(), | ||
repositoryData.getGenId(), | ||
updatedSnapshots.entries() | ||
.stream() | ||
.filter(entry -> repoName.equals(entry.repository())) | ||
.filter(entry -> repositoryName.equals(entry.repository())) | ||
.noneMatch(SnapshotsService::isWritingToRepository) | ||
&& deletionsInProgress.hasExecutingDeletion(repoName) == false | ||
&& deletionsInProgress.hasExecutingDeletion(repositoryName) == false | ||
? SnapshotDeletionsInProgress.State.STARTED | ||
: SnapshotDeletionsInProgress.State.WAITING | ||
); | ||
|
@@ -2193,7 +2230,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS | |
return; | ||
} | ||
if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) { | ||
if (tryEnterRepoLoop(repoName)) { | ||
if (tryEnterRepoLoop(repositoryName)) { | ||
deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion()); | ||
} else { | ||
logger.trace("Delete [{}] could not execute directly and was queued", newDelete); | ||
|
@@ -2208,52 +2245,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS | |
}, "delete snapshot [" + repository + "]" + Arrays.toString(snapshotNames), listener::onFailure); | ||
} | ||
|
||
private static List<SnapshotId> matchingSnapshotIds( | ||
List<SnapshotId> inProgress, | ||
RepositoryData repositoryData, | ||
String[] snapshotsOrPatterns, | ||
String repositoryName | ||
) { | ||
final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds() | ||
.stream() | ||
.collect(Collectors.toMap(SnapshotId::getName, Function.identity())); | ||
final Set<SnapshotId> foundSnapshots = new HashSet<>(inProgress); | ||
for (String snapshotOrPattern : snapshotsOrPatterns) { | ||
if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { | ||
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) { | ||
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { | ||
foundSnapshots.add(entry.getValue()); | ||
} | ||
} | ||
} else { | ||
final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern); | ||
if (foundId == null) { | ||
if (inProgress.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) { | ||
throw new SnapshotMissingException(repositoryName, snapshotOrPattern); | ||
} | ||
} else { | ||
foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern)); | ||
} | ||
} | ||
} | ||
return List.copyOf(foundSnapshots); | ||
} | ||
|
||
// Return in-progress snapshot entries by name and repository in the given cluster state or null if none is found | ||
private static List<SnapshotsInProgress.Entry> findInProgressSnapshots( | ||
SnapshotsInProgress snapshots, | ||
String[] snapshotNames, | ||
String repositoryName | ||
) { | ||
List<SnapshotsInProgress.Entry> entries = new ArrayList<>(); | ||
for (SnapshotsInProgress.Entry entry : snapshots.entries()) { | ||
if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, entry.snapshot().getSnapshotId().getName())) { | ||
entries.add(entry); | ||
} | ||
} | ||
return entries; | ||
} | ||
|
||
/** | ||
* Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository. | ||
* | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh. I'm not the biggest fan of inlining this code. These CS updates are already too complex/long even without inlining these loops? Maybe we could improve the state of things through better naming that makes it clear that these methods are filtering by snapshot name to make the different paths with resolving by uuid clearer instead?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I read that code recently I had to go jump to the methods and go back multiple times to be sure to understand which snapshots where filtered and why. I don't think the findInProgressSnapshots() had any value and it's easier to read inline.
Maybe I can keep that one inline and revert+rename the matchinSnapshotsIds one? In the future I'd like this method to be able to match on snapshot names or snapshot uuids by passing a SnapshotId -> String mapping function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That variant makes good sense to me :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with these being inline, the comments tell us what's going on well enough. I do like that we now just have the one set of snapshot IDs rather than using a list for the first bit and then copying it over to a set for the second, then back to a list again. If we extract methods again it'd be great if we could keep this improvement.