@@ -302,7 +302,7 @@ public ClusterState execute(ClusterState currentState) {
302
302
SnapshotDeletionsInProgress .TYPE ,
303
303
SnapshotDeletionsInProgress .EMPTY
304
304
);
305
- ensureNoCleanupInProgress (currentState , repositoryName , snapshotName );
305
+ ensureNoCleanupInProgress (currentState , repositoryName , snapshotName , "create snapshot" );
306
306
ensureBelowConcurrencyLimit (repositoryName , snapshotName , snapshots , deletionsInProgress );
307
307
// Store newSnapshot here to be processed in clusterStateProcessed
308
308
List <String > indices = Arrays .asList (indexNameExpressionResolver .concreteIndexNames (currentState , request ));
@@ -461,7 +461,7 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener<Void> lis
461
461
public ClusterState execute (ClusterState currentState ) {
462
462
ensureRepositoryExists (repositoryName , currentState );
463
463
ensureSnapshotNameAvailableInRepo (repositoryData , snapshotName , repository );
464
- ensureNoCleanupInProgress (currentState , repositoryName , snapshotName );
464
+ ensureNoCleanupInProgress (currentState , repositoryName , snapshotName , "clone snapshot" );
465
465
final SnapshotsInProgress snapshots = currentState .custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY );
466
466
final List <SnapshotsInProgress .Entry > runningSnapshots = snapshots .entries ();
467
467
ensureSnapshotNameNotRunning (runningSnapshots , repositoryName , snapshotName );
@@ -534,7 +534,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
534
534
}, "clone_snapshot [" + request .source () + "][" + snapshotName + ']' , listener ::onFailure );
535
535
}
536
536
537
- private static void ensureNoCleanupInProgress (ClusterState currentState , String repositoryName , String snapshotName ) {
537
+ private static void ensureNoCleanupInProgress (
538
+ final ClusterState currentState ,
539
+ final String repositoryName ,
540
+ final String snapshotName ,
541
+ final String reason
542
+ ) {
538
543
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState .custom (
539
544
RepositoryCleanupInProgress .TYPE ,
540
545
RepositoryCleanupInProgress .EMPTY
@@ -543,7 +548,13 @@ private static void ensureNoCleanupInProgress(ClusterState currentState, String
543
548
throw new ConcurrentSnapshotExecutionException (
544
549
repositoryName ,
545
550
snapshotName ,
546
- "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
551
+ "cannot "
552
+ + reason
553
+ + " while a repository cleanup is in-progress in "
554
+ + repositoryCleanupInProgress .entries ()
555
+ .stream ()
556
+ .map (RepositoryCleanupInProgress .Entry ::repository )
557
+ .collect (Collectors .toSet ())
547
558
);
548
559
}
549
560
}
@@ -2021,18 +2032,17 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
2021
2032
* @param listener listener
2022
2033
*/
2023
2034
public void deleteSnapshots (final DeleteSnapshotRequest request , final ActionListener <Void > listener ) {
2024
-
2035
+ final String repositoryName = request . repository ();
2025
2036
final String [] snapshotNames = request .snapshots ();
2026
- final String repoName = request .repository ();
2027
2037
logger .info (
2028
2038
() -> new ParameterizedMessage (
2029
2039
"deleting snapshots [{}] from repository [{}]" ,
2030
2040
Strings .arrayToCommaDelimitedString (snapshotNames ),
2031
- repoName
2041
+ repositoryName
2032
2042
)
2033
2043
);
2034
2044
2035
- final Repository repository = repositoriesService .repository (repoName );
2045
+ final Repository repository = repositoriesService .repository (repositoryName );
2036
2046
repository .executeConsistentStateUpdate (repositoryData -> new ClusterStateUpdateTask (request .masterNodeTimeout ()) {
2037
2047
2038
2048
private SnapshotDeletionsInProgress .Entry newDelete = null ;
@@ -2049,61 +2059,87 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis
2049
2059
2050
2060
@ Override
2051
2061
public ClusterState execute (ClusterState currentState ) {
2052
- ensureRepositoryExists (repoName , currentState );
2053
- final SnapshotsInProgress snapshots = currentState .custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY );
2054
- final List <SnapshotsInProgress .Entry > snapshotEntries = findInProgressSnapshots (snapshots , snapshotNames , repoName );
2055
- final List <SnapshotId > snapshotIds = matchingSnapshotIds (
2056
- snapshotEntries .stream ().map (e -> e .snapshot ().getSnapshotId ()).collect (Collectors .toList ()),
2057
- repositoryData ,
2058
- snapshotNames ,
2059
- repoName
2060
- );
2062
+ ensureRepositoryExists (repositoryName , currentState );
2063
+ final Set <SnapshotId > snapshotIds = new HashSet <>();
2064
+
2065
+ // find in-progress snapshots to delete in cluster state
2066
+ final SnapshotsInProgress snapshotsInProgress = currentState .custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY );
2067
+ for (SnapshotsInProgress .Entry entry : snapshotsInProgress .entries ()) {
2068
+ final SnapshotId snapshotId = entry .snapshot ().getSnapshotId ();
2069
+ if (entry .repository ().equals (repositoryName ) && Regex .simpleMatch (snapshotNames , snapshotId .getName ())) {
2070
+ snapshotIds .add (snapshotId );
2071
+ }
2072
+ }
2073
+
2074
+ // find snapshots to delete in repository data
2075
+ final Map <String , SnapshotId > snapshotsIdsInRepository = repositoryData .getSnapshotIds ()
2076
+ .stream ()
2077
+ .collect (Collectors .toMap (SnapshotId ::getName , Function .identity ()));
2078
+ for (String snapshotOrPattern : snapshotNames ) {
2079
+ if (Regex .isSimpleMatchPattern (snapshotOrPattern )) {
2080
+ for (Map .Entry <String , SnapshotId > entry : snapshotsIdsInRepository .entrySet ()) {
2081
+ if (Regex .simpleMatch (snapshotOrPattern , entry .getKey ())) {
2082
+ snapshotIds .add (entry .getValue ());
2083
+ }
2084
+ }
2085
+ } else {
2086
+ final SnapshotId foundId = snapshotsIdsInRepository .get (snapshotOrPattern );
2087
+ if (foundId == null ) {
2088
+ if (snapshotIds .stream ().noneMatch (snapshotId -> snapshotId .getName ().equals (snapshotOrPattern ))) {
2089
+ throw new SnapshotMissingException (repositoryName , snapshotOrPattern );
2090
+ }
2091
+ } else {
2092
+ snapshotIds .add (foundId );
2093
+ }
2094
+ }
2095
+ }
2096
+
2061
2097
if (snapshotIds .isEmpty ()) {
2062
2098
return currentState ;
2063
2099
}
2064
- final Set <SnapshotId > activeCloneSources = snapshots .entries ()
2100
+
2101
+ final Set <SnapshotId > activeCloneSources = snapshotsInProgress .entries ()
2065
2102
.stream ()
2066
2103
.filter (SnapshotsInProgress .Entry ::isClone )
2067
2104
.map (SnapshotsInProgress .Entry ::source )
2068
2105
.collect (Collectors .toSet ());
2069
2106
for (SnapshotId snapshotId : snapshotIds ) {
2070
2107
if (activeCloneSources .contains (snapshotId )) {
2071
2108
throw new ConcurrentSnapshotExecutionException (
2072
- new Snapshot (repoName , snapshotId ),
2109
+ new Snapshot (repositoryName , snapshotId ),
2073
2110
"cannot delete snapshot while it is being cloned"
2074
2111
);
2075
2112
}
2076
2113
}
2114
+
2115
+ ensureNoCleanupInProgress (
2116
+ currentState ,
2117
+ repositoryName ,
2118
+ snapshotIds .stream ().findFirst ().get ().getName (),
2119
+ "delete snapshot"
2120
+ );
2121
+
2077
2122
final SnapshotDeletionsInProgress deletionsInProgress = currentState .custom (
2078
2123
SnapshotDeletionsInProgress .TYPE ,
2079
2124
SnapshotDeletionsInProgress .EMPTY
2080
2125
);
2081
- final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState .custom (
2082
- RepositoryCleanupInProgress .TYPE ,
2083
- RepositoryCleanupInProgress .EMPTY
2084
- );
2085
- if (repositoryCleanupInProgress .hasCleanupInProgress ()) {
2086
- throw new ConcurrentSnapshotExecutionException (
2087
- new Snapshot (repoName , snapshotIds .get (0 )),
2088
- "cannot delete snapshots while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"
2089
- );
2090
- }
2126
+
2091
2127
final RestoreInProgress restoreInProgress = currentState .custom (RestoreInProgress .TYPE , RestoreInProgress .EMPTY );
2092
2128
// don't allow snapshot deletions while a restore is taking place,
2093
2129
// otherwise we could end up deleting a snapshot that is being restored
2094
2130
// and the files the restore depends on would all be gone
2095
2131
2096
2132
for (RestoreInProgress .Entry entry : restoreInProgress ) {
2097
- if (repoName .equals (entry .snapshot ().getRepository ()) && snapshotIds .contains (entry .snapshot ().getSnapshotId ())) {
2133
+ if (repositoryName .equals (entry .snapshot ().getRepository ()) && snapshotIds .contains (entry .snapshot ().getSnapshotId ())) {
2098
2134
throw new ConcurrentSnapshotExecutionException (
2099
- new Snapshot (repoName , snapshotIds .get (0 )),
2135
+ new Snapshot (repositoryName , snapshotIds .stream (). findFirst (). get ()),
2100
2136
"cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]"
2101
2137
);
2102
2138
}
2103
2139
}
2104
2140
// Snapshot ids that will have to be physically deleted from the repository
2105
2141
final Set <SnapshotId > snapshotIdsRequiringCleanup = new HashSet <>(snapshotIds );
2106
- final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress .of (snapshots .entries ().stream ().map (existing -> {
2142
+ final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress .of (snapshotsInProgress .entries ().stream ().map (existing -> {
2107
2143
if (existing .state () == State .STARTED && snapshotIdsRequiringCleanup .contains (existing .snapshot ().getSnapshotId ())) {
2108
2144
// snapshot is started - mark every non completed shard as aborted
2109
2145
final SnapshotsInProgress .Entry abortedEntry = existing .abort ();
@@ -2130,14 +2166,15 @@ public ClusterState execute(ClusterState currentState) {
2130
2166
// add the snapshot deletion to the cluster state
2131
2167
final SnapshotDeletionsInProgress .Entry replacedEntry = deletionsInProgress .getEntries ()
2132
2168
.stream ()
2133
- .filter (entry -> entry .repository ().equals (repoName ) && entry .state () == SnapshotDeletionsInProgress .State .WAITING )
2169
+ .filter (entry -> entry .repository ().equals (repositoryName ))
2170
+ .filter (entry -> entry .state () == SnapshotDeletionsInProgress .State .WAITING )
2134
2171
.findFirst ()
2135
2172
.orElse (null );
2136
2173
if (replacedEntry == null ) {
2137
2174
final Optional <SnapshotDeletionsInProgress .Entry > foundDuplicate = deletionsInProgress .getEntries ()
2138
2175
.stream ()
2139
2176
.filter (
2140
- entry -> entry .repository ().equals (repoName )
2177
+ entry -> entry .repository ().equals (repositoryName )
2141
2178
&& entry .state () == SnapshotDeletionsInProgress .State .STARTED
2142
2179
&& entry .getSnapshots ().containsAll (snapshotIds )
2143
2180
)
@@ -2149,14 +2186,14 @@ public ClusterState execute(ClusterState currentState) {
2149
2186
}
2150
2187
newDelete = new SnapshotDeletionsInProgress .Entry (
2151
2188
List .copyOf (snapshotIdsRequiringCleanup ),
2152
- repoName ,
2189
+ repositoryName ,
2153
2190
threadPool .absoluteTimeInMillis (),
2154
2191
repositoryData .getGenId (),
2155
2192
updatedSnapshots .entries ()
2156
2193
.stream ()
2157
- .filter (entry -> repoName .equals (entry .repository ()))
2194
+ .filter (entry -> repositoryName .equals (entry .repository ()))
2158
2195
.noneMatch (SnapshotsService ::isWritingToRepository )
2159
- && deletionsInProgress .hasExecutingDeletion (repoName ) == false
2196
+ && deletionsInProgress .hasExecutingDeletion (repositoryName ) == false
2160
2197
? SnapshotDeletionsInProgress .State .STARTED
2161
2198
: SnapshotDeletionsInProgress .State .WAITING
2162
2199
);
@@ -2193,7 +2230,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
2193
2230
return ;
2194
2231
}
2195
2232
if (newDelete .state () == SnapshotDeletionsInProgress .State .STARTED ) {
2196
- if (tryEnterRepoLoop (repoName )) {
2233
+ if (tryEnterRepoLoop (repositoryName )) {
2197
2234
deleteSnapshotsFromRepository (newDelete , repositoryData , newState .nodes ().getMinNodeVersion ());
2198
2235
} else {
2199
2236
logger .trace ("Delete [{}] could not execute directly and was queued" , newDelete );
@@ -2208,52 +2245,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
2208
2245
}, "delete snapshot [" + repository + "]" + Arrays .toString (snapshotNames ), listener ::onFailure );
2209
2246
}
2210
2247
2211
- private static List <SnapshotId > matchingSnapshotIds (
2212
- List <SnapshotId > inProgress ,
2213
- RepositoryData repositoryData ,
2214
- String [] snapshotsOrPatterns ,
2215
- String repositoryName
2216
- ) {
2217
- final Map <String , SnapshotId > allSnapshotIds = repositoryData .getSnapshotIds ()
2218
- .stream ()
2219
- .collect (Collectors .toMap (SnapshotId ::getName , Function .identity ()));
2220
- final Set <SnapshotId > foundSnapshots = new HashSet <>(inProgress );
2221
- for (String snapshotOrPattern : snapshotsOrPatterns ) {
2222
- if (Regex .isSimpleMatchPattern (snapshotOrPattern )) {
2223
- for (Map .Entry <String , SnapshotId > entry : allSnapshotIds .entrySet ()) {
2224
- if (Regex .simpleMatch (snapshotOrPattern , entry .getKey ())) {
2225
- foundSnapshots .add (entry .getValue ());
2226
- }
2227
- }
2228
- } else {
2229
- final SnapshotId foundId = allSnapshotIds .get (snapshotOrPattern );
2230
- if (foundId == null ) {
2231
- if (inProgress .stream ().noneMatch (snapshotId -> snapshotId .getName ().equals (snapshotOrPattern ))) {
2232
- throw new SnapshotMissingException (repositoryName , snapshotOrPattern );
2233
- }
2234
- } else {
2235
- foundSnapshots .add (allSnapshotIds .get (snapshotOrPattern ));
2236
- }
2237
- }
2238
- }
2239
- return List .copyOf (foundSnapshots );
2240
- }
2241
-
2242
- // Return in-progress snapshot entries by name and repository in the given cluster state or null if none is found
2243
- private static List <SnapshotsInProgress .Entry > findInProgressSnapshots (
2244
- SnapshotsInProgress snapshots ,
2245
- String [] snapshotNames ,
2246
- String repositoryName
2247
- ) {
2248
- List <SnapshotsInProgress .Entry > entries = new ArrayList <>();
2249
- for (SnapshotsInProgress .Entry entry : snapshots .entries ()) {
2250
- if (entry .repository ().equals (repositoryName ) && Regex .simpleMatch (snapshotNames , entry .snapshot ().getSnapshotId ().getName ())) {
2251
- entries .add (entry );
2252
- }
2253
- }
2254
- return entries ;
2255
- }
2256
-
2257
2248
/**
2258
2249
* Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository.
2259
2250
*
0 commit comments