99
99
import java .util .List ;
100
100
import java .util .Locale ;
101
101
import java .util .Map ;
102
+ import java .util .Objects ;
102
103
import java .util .Optional ;
103
104
import java .util .Set ;
104
105
import java .util .concurrent .ConcurrentHashMap ;
@@ -1141,7 +1142,13 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis
1141
1142
1142
1143
private boolean reusedExistingDelete = false ;
1143
1144
1144
- private final Collection <SnapshotsInProgress .Entry > completedSnapshots = new ArrayList <>();
1145
+ // Snapshots that had all of their shard snapshots in queued state and thus were removed from the
1146
+ // cluster state right away
1147
+ private final Collection <Snapshot > completedNoCleanup = new ArrayList <>();
1148
+
1149
+ // Snapshots that were aborted and that already wrote data to the repository and now have to be deleted
1150
+ // from the repository after the cluster state update
1151
+ private final Collection <SnapshotsInProgress .Entry > completedWithCleanup = new ArrayList <>();
1145
1152
1146
1153
@ Override
1147
1154
public ClusterState execute (ClusterState currentState ) {
@@ -1172,18 +1179,34 @@ public ClusterState execute(ClusterState currentState) {
1172
1179
"cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]" );
1173
1180
}
1174
1181
}
1182
+ // Snapshot ids that will have to be physically deleted from the repository
1183
+ final Set <SnapshotId > snapshotIdsRequiringCleanup = new HashSet <>(snapshotIds );
1175
1184
final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress .of (snapshots .entries ().stream ()
1176
1185
.map (existing -> {
1177
- // snapshot is started - mark every non completed shard as aborted
1178
- if (existing .state () == State .STARTED && snapshotIds .contains (existing .snapshot ().getSnapshotId ())) {
1186
+ if (existing .state () == State .STARTED &&
1187
+ snapshotIdsRequiringCleanup .contains (existing .snapshot ().getSnapshotId ())) {
1188
+ // snapshot is started - mark every non completed shard as aborted
1179
1189
final SnapshotsInProgress .Entry abortedEntry = existing .abort ();
1180
- if (abortedEntry .state ().completed ()) {
1181
- completedSnapshots .add (abortedEntry );
1190
+ if (abortedEntry == null ) {
1191
+ // No work has been done for this snapshot yet so we remove it from the cluster state directly
1192
+ final Snapshot existingNotYetStartedSnapshot = existing .snapshot ();
1193
+ // Adding the snapshot to #endingSnapshots since we still have to resolve its listeners to not trip
1194
+ // any leaked listener assertions
1195
+ if (endingSnapshots .add (existingNotYetStartedSnapshot )) {
1196
+ completedNoCleanup .add (existingNotYetStartedSnapshot );
1197
+ }
1198
+ snapshotIdsRequiringCleanup .remove (existingNotYetStartedSnapshot .getSnapshotId ());
1199
+ } else if (abortedEntry .state ().completed ()) {
1200
+ completedWithCleanup .add (abortedEntry );
1182
1201
}
1183
1202
return abortedEntry ;
1184
1203
}
1185
1204
return existing ;
1186
- }).collect (Collectors .toUnmodifiableList ()));
1205
+ }).filter (Objects ::nonNull ).collect (Collectors .toUnmodifiableList ()));
1206
+ if (snapshotIdsRequiringCleanup .isEmpty ()) {
1207
+ // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions
1208
+ return updateWithSnapshots (currentState , updatedSnapshots , null );
1209
+ }
1187
1210
// add the snapshot deletion to the cluster state
1188
1211
final SnapshotDeletionsInProgress .Entry replacedEntry = deletionsInProgress .getEntries ().stream ().filter (entry ->
1189
1212
entry .repository ().equals (repoName ) && entry .state () == SnapshotDeletionsInProgress .State .WAITING )
@@ -1198,9 +1221,10 @@ public ClusterState execute(ClusterState currentState) {
1198
1221
reusedExistingDelete = true ;
1199
1222
return currentState ;
1200
1223
}
1201
- ensureBelowConcurrencyLimit (repoName , snapshotIds .get (0 ).getName (), snapshots , deletionsInProgress );
1224
+ final List <SnapshotId > toDelete = List .copyOf (snapshotIdsRequiringCleanup );
1225
+ ensureBelowConcurrencyLimit (repoName , toDelete .get (0 ).getName (), snapshots , deletionsInProgress );
1202
1226
newDelete = new SnapshotDeletionsInProgress .Entry (
1203
- snapshotIds ,
1227
+ toDelete ,
1204
1228
repoName ,
1205
1229
threadPool .absoluteTimeInMillis (),
1206
1230
repositoryData .getGenId (),
@@ -1210,7 +1234,7 @@ public ClusterState execute(ClusterState currentState) {
1210
1234
repoName .equals (entry .repository ()) && entry .state () == SnapshotDeletionsInProgress .State .STARTED )
1211
1235
? SnapshotDeletionsInProgress .State .STARTED : SnapshotDeletionsInProgress .State .WAITING );
1212
1236
} else {
1213
- newDelete = replacedEntry .withAddedSnapshots (snapshotIds );
1237
+ newDelete = replacedEntry .withAddedSnapshots (snapshotIdsRequiringCleanup );
1214
1238
}
1215
1239
return updateWithSnapshots (currentState , updatedSnapshots ,
1216
1240
(replacedEntry == null ? deletionsInProgress : deletionsInProgress .withRemovedEntry (replacedEntry .uuid ()))
@@ -1219,11 +1243,16 @@ public ClusterState execute(ClusterState currentState) {
1219
1243
1220
1244
@ Override
1221
1245
public void onFailure (String source , Exception e ) {
1246
+ endingSnapshots .removeAll (completedNoCleanup );
1222
1247
listener .onFailure (e );
1223
1248
}
1224
1249
1225
1250
@ Override
1226
1251
public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
1252
+ for (Snapshot snapshot : completedNoCleanup ) {
1253
+ failSnapshotCompletionListeners (snapshot ,
1254
+ new SnapshotException (snapshot , SnapshotsInProgress .ABORTED_FAILURE_TEXT ));
1255
+ }
1227
1256
if (newDelete == null ) {
1228
1257
listener .onResponse (null );
1229
1258
} else {
@@ -1238,7 +1267,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
1238
1267
logger .trace ("Delete [{}] could not execute directly and was queued" , newDelete );
1239
1268
}
1240
1269
} else {
1241
- for (SnapshotsInProgress .Entry completedSnapshot : completedSnapshots ) {
1270
+ for (SnapshotsInProgress .Entry completedSnapshot : completedWithCleanup ) {
1242
1271
endSnapshot (completedSnapshot , newState .metadata (), repositoryData );
1243
1272
}
1244
1273
}
0 commit comments