@@ -992,17 +992,19 @@ public void applyClusterState(ClusterChangedEvent event) {
992
992
newMaster || removedNodesCleanupNeeded (snapshotsInProgress , event .nodesDelta ().removedNodes ()),
993
993
event .routingTableChanged () && waitingShardsStartedOrUnassigned (snapshotsInProgress , event )
994
994
);
995
- triggerSnapshotsPendingDeletions (event .state ());
996
- } else if (snapshotCompletionListeners .isEmpty () == false ) {
997
- // We have snapshot listeners but are not the master any more. Fail all waiting listeners except for those that already
998
- // have their snapshots finalizing (those that are already finalizing will fail on their own from to update the cluster
999
- // state).
1000
- for (Snapshot snapshot : Set .copyOf (snapshotCompletionListeners .keySet ())) {
1001
- if (endingSnapshots .add (snapshot )) {
1002
- failSnapshotCompletionListeners (snapshot , new SnapshotException (snapshot , "no longer master" ));
995
+ } else {
996
+ if (snapshotCompletionListeners .isEmpty () == false ) {
997
+ // We have snapshot listeners but are not the master any more. Fail all waiting listeners except for those that already
998
+ // have their snapshots finalizing (those that are already finalizing will fail on their own from to update the cluster
999
+ // state).
1000
+ for (Snapshot snapshot : Set .copyOf (snapshotCompletionListeners .keySet ())) {
1001
+ if (endingSnapshots .add (snapshot )) {
1002
+ failSnapshotCompletionListeners (snapshot , new SnapshotException (snapshot , "no longer master" ));
1003
+ }
1003
1004
}
1004
1005
}
1005
1006
}
1007
+ triggerSnapshotsPendingDeletions (event );
1006
1008
} catch (Exception e ) {
1007
1009
assert false : new AssertionError (e );
1008
1010
logger .warn ("Failed to update snapshot state " , e );
@@ -1295,6 +1297,13 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
1295
1297
*/
1296
1298
private final Set <SnapshotId > ongoingSnapshotsDeletions = ConcurrentCollections .newConcurrentSet ();
1297
1299
1300
+ /**
1301
+ * Set of pending snapshots deletions whose deletion is conflicting with on-going restores, clones or repository statuses
1302
+ */
1303
+ private final Set <SnapshotId > pendingDeletionsWithConflictingRestores = ConcurrentCollections .newConcurrentSet ();
1304
+ private final Set <SnapshotId > pendingDeletionsWithConflictingClones = ConcurrentCollections .newConcurrentSet ();
1305
+ private final Set <SnapshotId > pendingDeletionsWithConflictingRepos = ConcurrentCollections .newConcurrentSet ();
1306
+
1298
1307
/**
1299
1308
* Find snapshots to delete in the the cluster state and triggers explicit snapshot delete requests. This method attempts to detect
1300
1309
* conflicting situations where triggering the snapshot deletion would likely fail due to a concurrent snapshot operation. In such
@@ -1307,77 +1316,165 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
1307
1316
* name. If the repo uuid was not known at the time the snapshot was added to {@link SnapshotDeletionsPending}, we try to find a
1308
1317
* repository with the same name.
1309
1318
*
1310
- * @param state the current {@link ClusterState }
1319
+ * @param event the current {@link ClusterChangedEvent }
1311
1320
*/
1312
- private void triggerSnapshotsPendingDeletions (final ClusterState state ) {
1313
- final RepositoriesMetadata repositories = state .metadata ().custom (RepositoriesMetadata .TYPE , RepositoriesMetadata .EMPTY );
1314
- final SnapshotDeletionsPending snapshotDeletionsPending = state .custom (SnapshotDeletionsPending .TYPE );
1315
- if (snapshotDeletionsPending == null
1316
- || snapshotDeletionsPending .isEmpty ()
1317
- || repositories .repositories ().isEmpty ()
1318
- || state .nodes ().isLocalNodeElectedMaster () == false
1319
- || state .custom (RepositoryCleanupInProgress .TYPE , RepositoryCleanupInProgress .EMPTY ).hasCleanupInProgress ()) {
1321
+ private void triggerSnapshotsPendingDeletions (final ClusterChangedEvent event ) {
1322
+ if (event .localNodeMaster () == false ) {
1323
+ if (event .previousState ().nodes ().isLocalNodeElectedMaster ()) {
1324
+ clearPendingDeletionsWithConflicts ();
1325
+ }
1320
1326
return ;
1321
1327
}
1322
- final Set <SnapshotId > currentDeletions = deletionsSources (state );
1323
- final Set <SnapshotId > currentRestores = restoreSources (state );
1324
- final Set <SnapshotId > currentClones = cloneSources (state );
1325
1328
1326
- // the list of snapshot ids to trigger deletion for, per repository
1327
- final Map <RepositoryMetadata , Set <SnapshotId >> snapshotsToDelete = new HashMap <>();
1329
+ if (pendingDeletionsChanged (event ) || pendingDeletionsWithConflictsChanged (event )) {
1330
+ final ClusterState state = event .state ();
1331
+ final SnapshotDeletionsPending snapshotDeletionsPending = state .custom (SnapshotDeletionsPending .TYPE );
1332
+ if (snapshotDeletionsPending == null || snapshotDeletionsPending .isEmpty ()) {
1333
+ clearPendingDeletionsWithConflicts ();
1334
+ return ;
1335
+ }
1328
1336
1329
- for (SnapshotDeletionsPending .Entry snapshot : snapshotDeletionsPending .entries ()) {
1330
- final SnapshotId snapshotId = snapshot .getSnapshotId ();
1337
+ final RepositoriesMetadata repositories = state .metadata ().custom (RepositoriesMetadata .TYPE , RepositoriesMetadata .EMPTY );
1331
1338
1332
- if (currentRestores .contains (snapshotId )) {
1333
- logger .trace ("snapshot to delete [{}] is being restored, waiting for restore to complete" , snapshotId );
1334
- continue ;
1335
- } else if (currentClones .contains (snapshotId )) {
1336
- logger .trace ("snapshot to delete [{}] is being cloned, waiting for cloning to complete" , snapshotId );
1337
- continue ;
1338
- } else if (currentDeletions .contains (snapshotId )) {
1339
- logger .trace ("snapshot to delete [{}] is already queued" , snapshotId );
1340
- continue ;
1341
- }
1339
+ final Set <SnapshotId > currentDeletions = deletionsSources (state );
1340
+ final Set <SnapshotId > currentRestores = restoreSources (state );
1341
+ final Set <SnapshotId > currentClones = cloneSources (state );
1342
1342
1343
- final Optional <RepositoryMetadata > optionalRepository = findRepositoryForPendingDeletion (
1344
- repositories ,
1345
- snapshot .getRepositoryName (),
1346
- snapshot .getRepositoryUuid ()
1347
- );
1348
- if (optionalRepository .isEmpty ()) {
1349
- logger .debug (
1350
- "repository [{}/{}] not found, cannot delete pending snapshot [{}] created at {}" ,
1343
+ // the list of snapshot ids to trigger deletion for, per repository
1344
+ final Map <RepositoryMetadata , Set <SnapshotId >> snapshotsToDelete = new HashMap <>();
1345
+
1346
+ for (SnapshotDeletionsPending .Entry snapshot : snapshotDeletionsPending .entries ()) {
1347
+ final SnapshotId snapshotId = snapshot .getSnapshotId ();
1348
+
1349
+ if (currentRestores .contains (snapshotId )) {
1350
+ logger .trace ("snapshot to delete [{}] is being restored, waiting for restore to complete" , snapshotId );
1351
+ pendingDeletionsWithConflictingRestores .add (snapshotId );
1352
+ continue ;
1353
+ }
1354
+ pendingDeletionsWithConflictingRestores .remove (snapshotId );
1355
+
1356
+ if (currentClones .contains (snapshotId )) {
1357
+ logger .trace ("snapshot to delete [{}] is being cloned, waiting for cloning to complete" , snapshotId );
1358
+ pendingDeletionsWithConflictingClones .add (snapshotId );
1359
+ continue ;
1360
+ }
1361
+ pendingDeletionsWithConflictingClones .remove (snapshotId );
1362
+
1363
+ if (currentDeletions .contains (snapshotId )) {
1364
+ logger .trace ("snapshot to delete [{}] is already queued" , snapshotId );
1365
+ pendingDeletionsWithConflictingRepos .remove (snapshotId );
1366
+ continue ;
1367
+ }
1368
+
1369
+ if (state .custom (RepositoryCleanupInProgress .TYPE , RepositoryCleanupInProgress .EMPTY ).hasCleanupInProgress ()) {
1370
+ if (pendingDeletionsWithConflictingRepos .add (snapshotId )) {
1371
+ logger .debug (
1372
+ "a repository clean-up is in progress, cannot delete pending snapshot [{}] created at {}" ,
1373
+ snapshotId ,
1374
+ Instant .ofEpochMilli (snapshot .getIndexDeletionTime ()).atZone (ZoneOffset .UTC )
1375
+ );
1376
+ }
1377
+ continue ;
1378
+ }
1379
+
1380
+ final Optional <RepositoryMetadata > optionalRepository = findRepositoryForPendingDeletion (
1381
+ repositories ,
1351
1382
snapshot .getRepositoryName (),
1352
- snapshot .getRepositoryUuid (),
1353
- snapshotId ,
1354
- Instant .ofEpochMilli (snapshot .getIndexDeletionTime ()).atZone (ZoneOffset .UTC )
1383
+ snapshot .getRepositoryUuid ()
1355
1384
);
1356
- continue ;
1385
+ if (optionalRepository .isEmpty ()) {
1386
+ if (pendingDeletionsWithConflictingRepos .add (snapshotId )) {
1387
+ logger .debug (
1388
+ "repository [{}/{}] not found, cannot delete pending snapshot [{}] created at {}" ,
1389
+ snapshot .getRepositoryName (),
1390
+ snapshot .getRepositoryUuid (),
1391
+ snapshotId ,
1392
+ Instant .ofEpochMilli (snapshot .getIndexDeletionTime ()).atZone (ZoneOffset .UTC )
1393
+ );
1394
+ }
1395
+ continue ;
1396
+ }
1397
+ final RepositoryMetadata repository = optionalRepository .get ();
1398
+ if (repository .settings ().getAsBoolean (READONLY_SETTING_KEY , false )) {
1399
+ if (pendingDeletionsWithConflictingRepos .add (snapshotId )) {
1400
+ logger .debug (
1401
+ "repository [{}/{}] is read-only, cannot delete pending snapshot [{}] created at {}" ,
1402
+ repository .name (),
1403
+ repository .uuid (),
1404
+ snapshotId ,
1405
+ Instant .ofEpochMilli (snapshot .getIndexDeletionTime ()).atZone (ZoneOffset .UTC )
1406
+ );
1407
+ }
1408
+ continue ;
1409
+ }
1410
+ pendingDeletionsWithConflictingRepos .remove (snapshotId );
1411
+
1412
+ // should we add some throttling to not always retry?
1413
+ if (ongoingSnapshotsDeletions .add (snapshotId )) {
1414
+ logger .info ("triggering snapshot deletion for [{}]" , snapshotId );
1415
+ final boolean added = snapshotsToDelete .computeIfAbsent (repository , r -> new HashSet <>()).add (snapshotId );
1416
+ assert added : snapshotId ;
1417
+ }
1357
1418
}
1419
+ snapshotsToDelete .forEach (
1420
+ (repo , snapshots ) -> threadPool .generic ().execute (new SnapshotsToDeleteRunnable (repo .name (), repo .uuid (), snapshots ))
1421
+ );
1422
+ }
1423
+ }
1358
1424
1359
- final RepositoryMetadata repository = optionalRepository .get ();
1360
- if (repository .settings ().getAsBoolean (READONLY_SETTING_KEY , false )) {
1361
- logger .debug (
1362
- "repository [{}/{}] is read-only, cannot delete pending snapshot [{}] created at {}" ,
1363
- repository .name (),
1364
- repository .uuid (),
1365
- snapshotId ,
1366
- Instant .ofEpochMilli (snapshot .getIndexDeletionTime ()).atZone (ZoneOffset .UTC )
1367
- );
1368
- continue ;
1425
+ private void clearPendingDeletionsWithConflicts () {
1426
+ pendingDeletionsWithConflictingRestores .clear ();
1427
+ pendingDeletionsWithConflictingClones .clear ();
1428
+ pendingDeletionsWithConflictingRepos .clear ();
1429
+ }
1430
+
1431
+ private static boolean pendingDeletionsChanged (ClusterChangedEvent event ) {
1432
+ SnapshotDeletionsPending previous = event .previousState ().custom (SnapshotDeletionsPending .TYPE , SnapshotDeletionsPending .EMPTY );
1433
+ SnapshotDeletionsPending currents = event .state ().custom (SnapshotDeletionsPending .TYPE , SnapshotDeletionsPending .EMPTY );
1434
+ return Objects .equals (previous , currents ) == false ;
1435
+ }
1436
+
1437
+ private boolean pendingDeletionsWithConflictsChanged (ClusterChangedEvent event ) {
1438
+ if (pendingDeletionsWithConflictingRestores .isEmpty () == false ) {
1439
+ RestoreInProgress previous = event .previousState ().custom (RestoreInProgress .TYPE , RestoreInProgress .EMPTY );
1440
+ RestoreInProgress currents = event .state ().custom (RestoreInProgress .TYPE , RestoreInProgress .EMPTY );
1441
+ if (Objects .equals (previous , currents ) == false ) {
1442
+ return true ;
1443
+ }
1444
+ }
1445
+ if (pendingDeletionsWithConflictingClones .isEmpty () == false ) {
1446
+ Set <SnapshotsInProgress .Entry > previous = event .previousState ()
1447
+ .custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY )
1448
+ .asStream ()
1449
+ .filter (SnapshotsInProgress .Entry ::isClone )
1450
+ .collect (Collectors .toSet ());
1451
+ Set <SnapshotsInProgress .Entry > currents = event .state ()
1452
+ .custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY )
1453
+ .asStream ()
1454
+ .filter (SnapshotsInProgress .Entry ::isClone )
1455
+ .collect (Collectors .toSet ());
1456
+ if (Objects .equals (previous , currents ) == false ) {
1457
+ return true ;
1458
+ }
1459
+ }
1460
+ if (pendingDeletionsWithConflictingRepos .isEmpty () == false ) {
1461
+ boolean previousCleanUp = event .previousState ()
1462
+ .custom (RepositoryCleanupInProgress .TYPE , RepositoryCleanupInProgress .EMPTY )
1463
+ .hasCleanupInProgress ();
1464
+ boolean currentCleanUp = event .state ()
1465
+ .custom (RepositoryCleanupInProgress .TYPE , RepositoryCleanupInProgress .EMPTY )
1466
+ .hasCleanupInProgress ();
1467
+ if (previousCleanUp != currentCleanUp ) {
1468
+ return true ;
1369
1469
}
1370
1470
1371
- // should we add some throttling to not always retry?
1372
- if (ongoingSnapshotsDeletions .add (snapshotId )) {
1373
- logger .trace ("triggering snapshot deletion for [{}]" , snapshotId );
1374
- final boolean added = snapshotsToDelete .computeIfAbsent (repository , r -> new HashSet <>()).add (snapshotId );
1375
- assert added : snapshotId ;
1471
+ RepositoriesMetadata previous = event .previousState ().metadata ().custom (RepositoriesMetadata .TYPE , RepositoriesMetadata .EMPTY );
1472
+ RepositoriesMetadata current = event .state ().metadata ().custom (RepositoriesMetadata .TYPE , RepositoriesMetadata .EMPTY );
1473
+ if (previous .equals (current ) == false ) {
1474
+ return true ;
1376
1475
}
1377
1476
}
1378
- snapshotsToDelete .forEach (
1379
- (repo , snapshots ) -> threadPool .generic ().execute (new SnapshotsToDeleteRunnable (repo .name (), repo .uuid (), snapshots ))
1380
- );
1477
+ return false ;
1381
1478
}
1382
1479
1383
1480
/**
@@ -1426,6 +1523,7 @@ public void onFailure(Exception e) {
1426
1523
shouldRetry = RepositoryData .MISSING_UUID .equals (repositoryUuid ) == false ;
1427
1524
1428
1525
} else if (e instanceof ConcurrentSnapshotExecutionException ) {
1526
+ assert false : e ;
1429
1527
logger .debug (
1430
1528
"[{}] failed to delete snapshot [{}]: a concurrent operation is running" ,
1431
1529
repositoryName ,
0 commit comments