22
22
import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
23
23
import org .apache .logging .log4j .message .ParameterizedMessage ;
24
24
import org .apache .logging .log4j .util .Supplier ;
25
+ import org .apache .lucene .util .SetOnce ;
25
26
import org .elasticsearch .ExceptionsHelper ;
26
27
import org .elasticsearch .cluster .ClusterChangedEvent ;
27
28
import org .elasticsearch .cluster .ClusterState ;
@@ -146,7 +147,6 @@ protected void doStop() {
146
147
} finally {
147
148
shutdownLock .unlock ();
148
149
}
149
-
150
150
}
151
151
152
152
@ Override
@@ -157,14 +157,16 @@ protected void doClose() {
157
157
@ Override
158
158
public void applyClusterState (ClusterChangedEvent event ) {
159
159
try {
160
- SnapshotsInProgress prev = event .previousState ().custom (SnapshotsInProgress .TYPE );
161
- SnapshotsInProgress curr = event .state ().custom (SnapshotsInProgress .TYPE );
162
-
163
- if (( prev == null && curr != null ) || (prev != null && prev .equals (curr ) == false )) {
160
+ SnapshotsInProgress previousSnapshots = event .previousState ().custom (SnapshotsInProgress .TYPE );
161
+ SnapshotsInProgress currentSnapshots = event .state ().custom (SnapshotsInProgress .TYPE );
162
+ if (( previousSnapshots == null && currentSnapshots != null )
163
+ || (previousSnapshots != null && previousSnapshots .equals (currentSnapshots ) == false )) {
164
164
processIndexShardSnapshots (event );
165
165
}
166
- String masterNodeId = event .state ().nodes ().getMasterNodeId ();
167
- if (masterNodeId != null && masterNodeId .equals (event .previousState ().nodes ().getMasterNodeId ()) == false ) {
166
+
167
+ String previousMasterNodeId = event .previousState ().nodes ().getMasterNodeId ();
168
+ String currentMasterNodeId = event .state ().nodes ().getMasterNodeId ();
169
+ if (currentMasterNodeId != null && currentMasterNodeId .equals (previousMasterNodeId ) == false ) {
168
170
syncShardStatsOnNewMaster (event );
169
171
}
170
172
@@ -281,17 +283,18 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
281
283
snapshotStatus .abort ();
282
284
break ;
283
285
case FINALIZE :
284
- logger .debug ("[{}] trying to cancel snapshot on shard [{}] that is finalizing, letting it finish" , entry .snapshot (), shard .key );
286
+ logger .debug ("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " +
287
+ "letting it finish" , entry .snapshot (), shard .key );
285
288
break ;
286
289
case DONE :
287
- logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master" , entry . snapshot (), shard . key );
288
- updateIndexShardSnapshotStatus ( entry .snapshot (), shard .key ,
289
- new ShardSnapshotStatus ( localNodeId , State . SUCCESS ) , masterNode );
290
+ logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that is already done, " +
291
+ "updating status on the master" , entry .snapshot (), shard .key );
292
+ notifySuccessfulSnapshotShard ( entry . snapshot (), shard . key , localNodeId , masterNode );
290
293
break ;
291
294
case FAILURE :
292
- logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master" , entry . snapshot (), shard . key );
293
- updateIndexShardSnapshotStatus ( entry .snapshot (), shard .key ,
294
- new ShardSnapshotStatus ( localNodeId , State . FAILED , snapshotStatus .failure () ), masterNode );
295
+ logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " +
296
+ "updating status on the master" , entry .snapshot (), shard .key );
297
+ notifyFailedSnapshotShard ( entry . snapshot (), shard . key , localNodeId , snapshotStatus .failure (), masterNode );
295
298
break ;
296
299
default :
297
300
throw new IllegalStateException ("Unknown snapshot shard stage " + snapshotStatus .stage ());
@@ -320,34 +323,47 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
320
323
if (newSnapshots .isEmpty () == false ) {
321
324
Executor executor = threadPool .executor (ThreadPool .Names .SNAPSHOT );
322
325
for (final Map .Entry <Snapshot , Map <ShardId , IndexShardSnapshotStatus >> entry : newSnapshots .entrySet ()) {
323
- Map <String , IndexId > indicesMap = snapshotIndices .get (entry .getKey ());
326
+ final Snapshot snapshot = entry .getKey ();
327
+ final Map <String , IndexId > indicesMap = snapshotIndices .get (snapshot );
324
328
assert indicesMap != null ;
329
+
325
330
for (final Map .Entry <ShardId , IndexShardSnapshotStatus > shardEntry : entry .getValue ().entrySet ()) {
326
331
final ShardId shardId = shardEntry .getKey ();
327
- try {
328
- final IndexShard indexShard = indicesService .indexServiceSafe (shardId .getIndex ()).getShardOrNull (shardId .id ());
329
- final IndexId indexId = indicesMap .get (shardId .getIndexName ());
330
- assert indexId != null ;
331
- executor .execute (new AbstractRunnable () {
332
- @ Override
333
- public void doRun () {
334
- snapshot (indexShard , entry .getKey (), indexId , shardEntry .getValue ());
335
- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
336
- new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
337
- }
332
+ final IndexShard indexShard = indicesService .indexServiceSafe (shardId .getIndex ()).getShardOrNull (shardId .id ());
333
+ final IndexId indexId = indicesMap .get (shardId .getIndexName ());
334
+ assert indexId != null ;
335
+ executor .execute (new AbstractRunnable () {
338
336
339
- @ Override
340
- public void onFailure (Exception e ) {
341
- logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to create snapshot" , shardId , entry .getKey ()), e );
342
- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
343
- new ShardSnapshotStatus (localNodeId , State .FAILED , ExceptionsHelper .detailedMessage (e )), masterNode );
344
- }
337
+ final SetOnce <Exception > failure = new SetOnce <>();
345
338
346
- });
347
- } catch (Exception e ) {
348
- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
349
- new ShardSnapshotStatus (localNodeId , State .FAILED , ExceptionsHelper .detailedMessage (e )), masterNode );
350
- }
339
+ @ Override
340
+ public void doRun () {
341
+ snapshot (indexShard , snapshot , indexId , shardEntry .getValue ());
342
+ }
343
+
344
+ @ Override
345
+ public void onFailure (Exception e ) {
346
+ logger .warn ((Supplier <?>) () ->
347
+ new ParameterizedMessage ("[{}][{}] failed to snapshot shard" , shardId , snapshot ), e );
348
+ failure .set (e );
349
+ }
350
+
351
+ @ Override
352
+ public void onRejection (Exception e ) {
353
+ failure .set (e );
354
+ }
355
+
356
+ @ Override
357
+ public void onAfter () {
358
+ final Exception exception = failure .get ();
359
+ if (exception != null ) {
360
+ final String failure = ExceptionsHelper .detailedMessage (exception );
361
+ notifyFailedSnapshotShard (snapshot , shardId , localNodeId , failure , masterNode );
362
+ } else {
363
+ notifySuccessfulSnapshotShard (snapshot , shardId , localNodeId , masterNode );
364
+ }
365
+ }
366
+ });
351
367
}
352
368
}
353
369
}
@@ -360,34 +376,36 @@ public void onFailure(Exception e) {
360
376
* @param snapshotStatus snapshot status
361
377
*/
362
378
private void snapshot (final IndexShard indexShard , final Snapshot snapshot , final IndexId indexId , final IndexShardSnapshotStatus snapshotStatus ) {
363
- Repository repository = snapshotsService .getRepositoriesService ().repository (snapshot .getRepository ());
364
- ShardId shardId = indexShard .shardId ();
365
- if (!indexShard .routingEntry ().primary ()) {
379
+ final ShardId shardId = indexShard .shardId ();
380
+ if (indexShard .routingEntry ().primary () == false ) {
366
381
throw new IndexShardSnapshotFailedException (shardId , "snapshot should be performed only on primary" );
367
382
}
368
383
if (indexShard .routingEntry ().relocating ()) {
369
384
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
370
385
throw new IndexShardSnapshotFailedException (shardId , "cannot snapshot while relocating" );
371
386
}
372
- if (indexShard .state () == IndexShardState .CREATED || indexShard .state () == IndexShardState .RECOVERING ) {
387
+
388
+ final IndexShardState indexShardState = indexShard .state ();
389
+ if (indexShardState == IndexShardState .CREATED || indexShardState == IndexShardState .RECOVERING ) {
373
390
// shard has just been created, or still recovering
374
391
throw new IndexShardSnapshotFailedException (shardId , "shard didn't fully recover yet" );
375
392
}
376
393
394
+ final Repository repository = snapshotsService .getRepositoriesService ().repository (snapshot .getRepository ());
377
395
try {
378
396
// we flush first to make sure we get the latest writes snapshotted
379
397
try (Engine .IndexCommitRef snapshotRef = indexShard .acquireIndexCommit (true )) {
380
398
repository .snapshotShard (indexShard , snapshot .getSnapshotId (), indexId , snapshotRef .getIndexCommit (), snapshotStatus );
381
399
if (logger .isDebugEnabled ()) {
382
- StringBuilder sb = new StringBuilder ();
383
- sb .append (" index : version [" ).append (snapshotStatus .indexVersion ()).append ("], number_of_files [" ).append (snapshotStatus .numberOfFiles ()).append ("] with total_size [" ).append (new ByteSizeValue (snapshotStatus .totalSize ())).append ("]\n " );
400
+ StringBuilder details = new StringBuilder ();
401
+ details .append (" index : version [" ).append (snapshotStatus .indexVersion ());
402
+ details .append ("], number_of_files [" ).append (snapshotStatus .numberOfFiles ());
403
+ details .append ("] with total_size [" ).append (new ByteSizeValue (snapshotStatus .totalSize ())).append ("]\n " );
384
404
logger .debug ("snapshot ({}) completed to {}, took [{}]\n {}" , snapshot , repository ,
385
- TimeValue .timeValueMillis (snapshotStatus .time ()), sb );
405
+ TimeValue .timeValueMillis (snapshotStatus .time ()), details );
386
406
}
387
407
}
388
- } catch (SnapshotFailedEngineException e ) {
389
- throw e ;
390
- } catch (IndexShardSnapshotFailedException e ) {
408
+ } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e ) {
391
409
throw e ;
392
410
} catch (Exception e ) {
393
411
throw new IndexShardSnapshotFailedException (shardId , "Failed to snapshot" , e );
@@ -402,6 +420,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
402
420
if (snapshotsInProgress == null ) {
403
421
return ;
404
422
}
423
+
405
424
final String localNodeId = event .state ().nodes ().getLocalNodeId ();
406
425
final DiscoveryNode masterNode = event .state ().nodes ().getMasterNode ();
407
426
for (SnapshotsInProgress .Entry snapshot : snapshotsInProgress .entries ()) {
@@ -417,15 +436,16 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
417
436
// Master knows about the shard and thinks it has not completed
418
437
if (localShardStatus .stage () == Stage .DONE ) {
419
438
// but we think the shard is done - we need to make new master know that the shard is done
420
- logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master" , snapshot .snapshot (), shardId );
421
- updateIndexShardSnapshotStatus (snapshot .snapshot (), shardId ,
422
- new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
439
+ logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " +
440
+ "updating status on the master" , snapshot .snapshot (), shardId );
441
+ notifySuccessfulSnapshotShard (snapshot .snapshot (), shardId , localNodeId , masterNode );
442
+
423
443
} else if (localShard .getValue ().stage () == Stage .FAILURE ) {
424
444
// but we think the shard failed - we need to make new master know that the shard failed
425
- logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master" , snapshot . snapshot (), shardId );
426
- updateIndexShardSnapshotStatus ( snapshot .snapshot (), shardId ,
427
- new ShardSnapshotStatus ( localNodeId , State . FAILED , localShardStatus .failure ()), masterNode );
428
-
445
+ logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " +
446
+ "updating status on master" , snapshot .snapshot (), shardId );
447
+ final String failure = localShardStatus .failure ();
448
+ notifyFailedSnapshotShard ( snapshot . snapshot (), shardId , localNodeId , failure , masterNode );
429
449
}
430
450
}
431
451
}
@@ -445,7 +465,6 @@ private SnapshotShards(Map<ShardId, IndexShardSnapshotStatus> shards) {
445
465
}
446
466
}
447
467
448
-
449
468
/**
450
469
* Internal request that is used to send changes in snapshot status to master
451
470
*/
@@ -498,15 +517,33 @@ public String toString() {
498
517
}
499
518
}
500
519
501
- /**
502
- * Updates the shard status
503
- */
504
- public void updateIndexShardSnapshotStatus (Snapshot snapshot , ShardId shardId , ShardSnapshotStatus status , DiscoveryNode master ) {
505
- UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest (snapshot , shardId , status );
520
+ /** Notify the master node that the given shard has been successfully snapshotted **/
521
+ void notifySuccessfulSnapshotShard (final Snapshot snapshot ,
522
+ final ShardId shardId ,
523
+ final String localNodeId ,
524
+ final DiscoveryNode masterNode ) {
525
+ sendSnapshotShardUpdate (snapshot , shardId , new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
526
+ }
527
+
528
+ /** Notify the master node that the given shard failed to be snapshotted **/
529
+ void notifyFailedSnapshotShard (final Snapshot snapshot ,
530
+ final ShardId shardId ,
531
+ final String localNodeId ,
532
+ final String failure ,
533
+ final DiscoveryNode masterNode ) {
534
+ sendSnapshotShardUpdate (snapshot , shardId , new ShardSnapshotStatus (localNodeId , State .FAILED , failure ), masterNode );
535
+ }
536
+
537
+ /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
538
+ void sendSnapshotShardUpdate (final Snapshot snapshot ,
539
+ final ShardId shardId ,
540
+ final ShardSnapshotStatus status ,
541
+ final DiscoveryNode masterNode ) {
506
542
try {
507
- transportService .sendRequest (master , UPDATE_SNAPSHOT_ACTION_NAME , request , EmptyTransportResponseHandler .INSTANCE_SAME );
543
+ final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest (snapshot , shardId , status );
544
+ transportService .sendRequest (masterNode , UPDATE_SNAPSHOT_ACTION_NAME , request , EmptyTransportResponseHandler .INSTANCE_SAME );
508
545
} catch (Exception e ) {
509
- logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to update snapshot state" , request . snapshot (), request . status () ), e );
546
+ logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to update snapshot state" , snapshot , status ), e );
510
547
}
511
548
}
512
549
0 commit comments