@@ -331,7 +331,6 @@ public void onFailure(final Exception e) {
331
331
public TimeValue timeout () {
332
332
return request .masterNodeTimeout ();
333
333
}
334
-
335
334
});
336
335
}
337
336
@@ -394,6 +393,8 @@ private void beginSnapshot(final ClusterState clusterState,
394
393
395
394
boolean snapshotCreated ;
396
395
396
+ boolean hadAbortedInitializations ;
397
+
397
398
@ Override
398
399
protected void doRun () {
399
400
assert initializingSnapshots .contains (snapshot .snapshot ());
@@ -433,6 +434,8 @@ public ClusterState execute(ClusterState currentState) {
433
434
434
435
if (entry .state () == State .ABORTED ) {
435
436
entries .add (entry );
437
+ assert entry .shards ().isEmpty ();
438
+ hadAbortedInitializations = true ;
436
439
} else {
437
440
// Replace the snapshot that was just initialized
438
441
ImmutableOpenMap <ShardId , ShardSnapshotStatus > shards =
@@ -491,6 +494,14 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
491
494
// completion listener in this method. For the snapshot completion to work properly, the snapshot
492
495
// should still exist when listener is registered.
493
496
userCreateSnapshotListener .onResponse (snapshot .snapshot ());
497
+
498
+ if (hadAbortedInitializations ) {
499
+ final SnapshotsInProgress snapshotsInProgress = newState .custom (SnapshotsInProgress .TYPE );
500
+ assert snapshotsInProgress != null ;
501
+ final SnapshotsInProgress .Entry entry = snapshotsInProgress .snapshot (snapshot .snapshot ());
502
+ assert entry != null ;
503
+ endSnapshot (entry );
504
+ }
494
505
}
495
506
});
496
507
}
@@ -701,8 +712,8 @@ public void applyClusterState(ClusterChangedEvent event) {
701
712
// 3. Snapshots in any other state that have all their shard tasks completed
702
713
snapshotsInProgress .entries ().stream ().filter (
703
714
entry -> entry .state ().completed ()
704
- || entry . state () == State . INIT && initializingSnapshots .contains (entry .snapshot ()) == false
705
- || entry .state () != State .INIT && completed (entry .shards ().values ())
715
+ || initializingSnapshots .contains (entry .snapshot ()) == false
716
+ && ( entry .state () == State .INIT || completed (entry .shards ().values () ))
706
717
).forEach (this ::endSnapshot );
707
718
}
708
719
if (newMaster ) {
0 commit comments