diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 8c82eddbd6ca6..1a81a1b032262 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -302,7 +302,7 @@ public ClusterState execute(ClusterState currentState) { SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY ); - ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); + ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot"); ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress); // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); @@ -461,7 +461,7 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener lis public ClusterState execute(ClusterState currentState) { ensureRepositoryExists(repositoryName, currentState); ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); - ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); + ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot"); final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final List runningSnapshots = snapshots.entries(); ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName); @@ -534,7 +534,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl }, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure); } - private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) { + private static void ensureNoCleanupInProgress( + final ClusterState currentState, + final String repositoryName, + final String snapshotName, + final String reason + ) { final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY @@ -543,7 +548,13 @@ private static void ensureNoCleanupInProgress(ClusterState currentState, String throw new ConcurrentSnapshotExecutionException( repositoryName, snapshotName, - "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]" + "cannot " + + reason + + " while a repository cleanup is in-progress in " + + repositoryCleanupInProgress.entries() + .stream() + .map(RepositoryCleanupInProgress.Entry::repository) + .collect(Collectors.toSet()) ); } } @@ -2021,18 +2032,17 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) { * @param listener listener */ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener listener) { - + final String repositoryName = request.repository(); final String[] snapshotNames = request.snapshots(); - final String repoName = request.repository(); logger.info( () -> new ParameterizedMessage( "deleting snapshots [{}] from repository [{}]", Strings.arrayToCommaDelimitedString(snapshotNames), - repoName + repositoryName ) ); - final Repository repository = repositoriesService.repository(repoName); + final Repository repository = repositoriesService.repository(repositoryName); repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) { private SnapshotDeletionsInProgress.Entry newDelete = null; @@ -2049,19 +2059,46 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis @Override public ClusterState execute(ClusterState currentState) { - ensureRepositoryExists(repoName, currentState); - final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); - final List snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName); - final List snapshotIds = matchingSnapshotIds( - snapshotEntries.stream().map(e -> e.snapshot().getSnapshotId()).collect(Collectors.toList()), - repositoryData, - snapshotNames, - repoName - ); + ensureRepositoryExists(repositoryName, currentState); + final Set snapshotIds = new HashSet<>(); + + // find in-progress snapshots to delete in cluster state + final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + final SnapshotId snapshotId = entry.snapshot().getSnapshotId(); + if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, snapshotId.getName())) { + snapshotIds.add(snapshotId); + } + } + + // find snapshots to delete in repository data + final Map snapshotsIdsInRepository = repositoryData.getSnapshotIds() + .stream() + .collect(Collectors.toMap(SnapshotId::getName, Function.identity())); + for (String snapshotOrPattern : snapshotNames) { + if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { + for (Map.Entry entry : snapshotsIdsInRepository.entrySet()) { + if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { + snapshotIds.add(entry.getValue()); + } + } + } else { + final SnapshotId foundId = snapshotsIdsInRepository.get(snapshotOrPattern); + if (foundId == null) { + if (snapshotIds.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) { + throw new SnapshotMissingException(repositoryName, snapshotOrPattern); + } + } else { + snapshotIds.add(foundId); + } + } + } + if (snapshotIds.isEmpty()) { return currentState; } - final Set activeCloneSources = snapshots.entries() + + final Set activeCloneSources = snapshotsInProgress.entries() .stream() .filter(SnapshotsInProgress.Entry::isClone) .map(SnapshotsInProgress.Entry::source) @@ -2069,41 +2106,40 @@ public ClusterState execute(ClusterState currentState) { for (SnapshotId snapshotId : snapshotIds) { if (activeCloneSources.contains(snapshotId)) { throw new ConcurrentSnapshotExecutionException( - new Snapshot(repoName, snapshotId), + new Snapshot(repositoryName, snapshotId), "cannot delete snapshot while it is being cloned" ); } } + + ensureNoCleanupInProgress( + currentState, + repositoryName, + snapshotIds.stream().findFirst().get().getName(), + "delete snapshot" + ); + final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY ); - final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( - RepositoryCleanupInProgress.TYPE, - RepositoryCleanupInProgress.EMPTY - ); - if (repositoryCleanupInProgress.hasCleanupInProgress()) { - throw new ConcurrentSnapshotExecutionException( - new Snapshot(repoName, snapshotIds.get(0)), - "cannot delete snapshots while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]" - ); - } + final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); // don't allow snapshot deletions while a restore is taking place, // otherwise we could end up deleting a snapshot that is being restored // and the files the restore depends on would all be gone for (RestoreInProgress.Entry entry : restoreInProgress) { - if (repoName.equals(entry.snapshot().getRepository()) && snapshotIds.contains(entry.snapshot().getSnapshotId())) { + if (repositoryName.equals(entry.snapshot().getRepository()) && snapshotIds.contains(entry.snapshot().getSnapshotId())) { throw new ConcurrentSnapshotExecutionException( - new Snapshot(repoName, snapshotIds.get(0)), + new Snapshot(repositoryName, snapshotIds.stream().findFirst().get()), "cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]" ); } } // Snapshot ids that will have to be physically deleted from the repository final Set snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds); - final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream().map(existing -> { + final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress.of(snapshotsInProgress.entries().stream().map(existing -> { if (existing.state() == State.STARTED && snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) { // snapshot is started - mark every non completed shard as aborted final SnapshotsInProgress.Entry abortedEntry = existing.abort(); @@ -2130,14 +2166,15 @@ public ClusterState execute(ClusterState currentState) { // add the snapshot deletion to the cluster state final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries() .stream() - .filter(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.WAITING) + .filter(entry -> entry.repository().equals(repositoryName)) + .filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.WAITING) .findFirst() .orElse(null); if (replacedEntry == null) { final Optional foundDuplicate = deletionsInProgress.getEntries() .stream() .filter( - entry -> entry.repository().equals(repoName) + entry -> entry.repository().equals(repositoryName) && entry.state() == SnapshotDeletionsInProgress.State.STARTED && entry.getSnapshots().containsAll(snapshotIds) ) @@ -2149,14 +2186,14 @@ public ClusterState execute(ClusterState currentState) { } newDelete = new SnapshotDeletionsInProgress.Entry( List.copyOf(snapshotIdsRequiringCleanup), - repoName, + repositoryName, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), updatedSnapshots.entries() .stream() - .filter(entry -> repoName.equals(entry.repository())) + .filter(entry -> repositoryName.equals(entry.repository())) .noneMatch(SnapshotsService::isWritingToRepository) - && deletionsInProgress.hasExecutingDeletion(repoName) == false + && deletionsInProgress.hasExecutingDeletion(repositoryName) == false ? SnapshotDeletionsInProgress.State.STARTED : SnapshotDeletionsInProgress.State.WAITING ); @@ -2193,7 +2230,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS return; } if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) { - if (tryEnterRepoLoop(repoName)) { + if (tryEnterRepoLoop(repositoryName)) { deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion()); } else { logger.trace("Delete [{}] could not execute directly and was queued", newDelete); @@ -2208,52 +2245,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }, "delete snapshot [" + repository + "]" + Arrays.toString(snapshotNames), listener::onFailure); } - private static List matchingSnapshotIds( - List inProgress, - RepositoryData repositoryData, - String[] snapshotsOrPatterns, - String repositoryName - ) { - final Map allSnapshotIds = repositoryData.getSnapshotIds() - .stream() - .collect(Collectors.toMap(SnapshotId::getName, Function.identity())); - final Set foundSnapshots = new HashSet<>(inProgress); - for (String snapshotOrPattern : snapshotsOrPatterns) { - if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { - for (Map.Entry entry : allSnapshotIds.entrySet()) { - if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { - foundSnapshots.add(entry.getValue()); - } - } - } else { - final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern); - if (foundId == null) { - if (inProgress.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) { - throw new SnapshotMissingException(repositoryName, snapshotOrPattern); - } - } else { - foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern)); - } - } - } - return List.copyOf(foundSnapshots); - } - - // Return in-progress snapshot entries by name and repository in the given cluster state or null if none is found - private static List findInProgressSnapshots( - SnapshotsInProgress snapshots, - String[] snapshotNames, - String repositoryName - ) { - List entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, entry.snapshot().getSnapshotId().getName())) { - entries.add(entry); - } - } - return entries; - } - /** * Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository. *