67
67
import org .elasticsearch .repositories .IndexId ;
68
68
import org .elasticsearch .repositories .Repository ;
69
69
import org .elasticsearch .threadpool .ThreadPool ;
70
+ import org .elasticsearch .transport .EmptyTransportResponseHandler ;
71
+ import org .elasticsearch .transport .TransportException ;
72
+ import org .elasticsearch .transport .TransportRequestDeduplicator ;
73
+ import org .elasticsearch .transport .TransportResponse ;
70
74
import org .elasticsearch .transport .TransportService ;
71
75
72
76
import java .io .IOException ;
85
89
import static java .util .Collections .emptyMap ;
86
90
import static java .util .Collections .unmodifiableMap ;
87
91
import static org .elasticsearch .cluster .SnapshotsInProgress .completed ;
88
- import static org .elasticsearch .transport .EmptyTransportResponseHandler .INSTANCE_SAME ;
89
92
90
93
/**
91
94
* This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for
@@ -112,6 +115,10 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
112
115
113
116
private volatile Map <Snapshot , Map <ShardId , IndexShardSnapshotStatus >> shardSnapshots = emptyMap ();
114
117
118
+ // A map of snapshots to the shardIds that we already reported to the master as failed
119
+ private final TransportRequestDeduplicator <UpdateIndexShardSnapshotStatusRequest > remoteFailedRequestDeduplicator =
120
+ new TransportRequestDeduplicator <>();
121
+
115
122
private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor ();
116
123
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler ;
117
124
@@ -272,12 +279,11 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
272
279
// Abort all running shards for this snapshot
273
280
Map <ShardId , IndexShardSnapshotStatus > snapshotShards = shardSnapshots .get (entry .snapshot ());
274
281
if (snapshotShards != null ) {
275
- final String failure = "snapshot has been aborted" ;
276
282
for (ObjectObjectCursor <ShardId , ShardSnapshotStatus > shard : entry .shards ()) {
277
-
278
283
final IndexShardSnapshotStatus snapshotStatus = snapshotShards .get (shard .key );
279
284
if (snapshotStatus != null ) {
280
- final IndexShardSnapshotStatus .Copy lastSnapshotStatus = snapshotStatus .abortIfNotCompleted (failure );
285
+ final IndexShardSnapshotStatus .Copy lastSnapshotStatus =
286
+ snapshotStatus .abortIfNotCompleted ("snapshot has been aborted" );
281
287
final Stage stage = lastSnapshotStatus .getStage ();
282
288
if (stage == Stage .FINALIZE ) {
283
289
logger .debug ("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " +
@@ -295,6 +301,15 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
295
301
}
296
302
}
297
303
}
304
+ } else {
305
+ final Snapshot snapshot = entry .snapshot ();
306
+ for (ObjectObjectCursor <ShardId , ShardSnapshotStatus > curr : entry .shards ()) {
307
+ // due to CS batching we might have missed the INIT state and straight went into ABORTED
308
+ // notify master that abort has completed by moving to FAILED
309
+ if (curr .value .state () == State .ABORTED ) {
310
+ notifyFailedSnapshotShard (snapshot , curr .key , localNodeId , curr .value .reason ());
311
+ }
312
+ }
298
313
}
299
314
}
300
315
}
@@ -515,12 +530,33 @@ void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, f
515
530
516
531
/** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
517
532
void sendSnapshotShardUpdate (final Snapshot snapshot , final ShardId shardId , final ShardSnapshotStatus status ) {
518
- try {
519
- UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest (snapshot , shardId , status );
520
- transportService .sendRequest (transportService .getLocalNode (), UPDATE_SNAPSHOT_STATUS_ACTION_NAME , request , INSTANCE_SAME );
521
- } catch (Exception e ) {
522
- logger .warn (() -> new ParameterizedMessage ("[{}] [{}] failed to update snapshot state" , snapshot , status ), e );
523
- }
533
+ remoteFailedRequestDeduplicator .executeOnce (
534
+ new UpdateIndexShardSnapshotStatusRequest (snapshot , shardId , status ),
535
+ new ActionListener <Void >() {
536
+ @ Override
537
+ public void onResponse (Void aVoid ) {
538
+ logger .trace ("[{}] [{}] updated snapshot state" , snapshot , status );
539
+ }
540
+
541
+ @ Override
542
+ public void onFailure (Exception e ) {
543
+ logger .warn (
544
+ () -> new ParameterizedMessage ("[{}] [{}] failed to update snapshot state" , snapshot , status ), e );
545
+ }
546
+ },
547
+ (req , reqListener ) -> transportService .sendRequest (transportService .getLocalNode (), UPDATE_SNAPSHOT_STATUS_ACTION_NAME , req ,
548
+ new EmptyTransportResponseHandler (ThreadPool .Names .SAME ) {
549
+ @ Override
550
+ public void handleResponse (TransportResponse .Empty response ) {
551
+ reqListener .onResponse (null );
552
+ }
553
+
554
+ @ Override
555
+ public void handleException (TransportException exp ) {
556
+ reqListener .onFailure (exp );
557
+ }
558
+ })
559
+ );
524
560
}
525
561
526
562
/**
0 commit comments