23
23
import org .apache .logging .log4j .message .ParameterizedMessage ;
24
24
import org .apache .logging .log4j .util .Supplier ;
25
25
import org .apache .lucene .index .IndexCommit ;
26
+ import org .apache .lucene .util .SetOnce ;
26
27
import org .elasticsearch .ExceptionsHelper ;
27
28
import org .elasticsearch .cluster .ClusterChangedEvent ;
28
29
import org .elasticsearch .cluster .ClusterState ;
@@ -147,7 +148,6 @@ protected void doStop() {
147
148
} finally {
148
149
shutdownLock .unlock ();
149
150
}
150
-
151
151
}
152
152
153
153
@ Override
@@ -282,17 +282,18 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
282
282
snapshotStatus .abort ();
283
283
break ;
284
284
case FINALIZE :
285
- logger .debug ("[{}] trying to cancel snapshot on shard [{}] that is finalizing, letting it finish" , entry .snapshot (), shard .key );
285
+ logger .debug ("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " +
286
+ "letting it finish" , entry .snapshot (), shard .key );
286
287
break ;
287
288
case DONE :
288
- logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master" , entry . snapshot (), shard . key );
289
- updateIndexShardSnapshotStatus ( entry .snapshot (), shard .key ,
290
- new ShardSnapshotStatus ( localNodeId , State . SUCCESS ) , masterNode );
289
+ logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that is already done, " +
290
+ "updating status on the master" , entry .snapshot (), shard .key );
291
+ notifySuccessfulSnapshotShard ( entry . snapshot (), shard . key , localNodeId , masterNode );
291
292
break ;
292
293
case FAILURE :
293
- logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master" , entry . snapshot (), shard . key );
294
- updateIndexShardSnapshotStatus ( entry .snapshot (), shard .key ,
295
- new ShardSnapshotStatus ( localNodeId , State . FAILED , snapshotStatus .failure () ), masterNode );
294
+ logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " +
295
+ "updating status on the master" , entry .snapshot (), shard .key );
296
+ notifyFailedSnapshotShard ( entry . snapshot (), shard . key , localNodeId , snapshotStatus .failure (), masterNode );
296
297
break ;
297
298
default :
298
299
throw new IllegalStateException ("Unknown snapshot shard stage " + snapshotStatus .stage ());
@@ -321,34 +322,47 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
321
322
if (newSnapshots .isEmpty () == false ) {
322
323
Executor executor = threadPool .executor (ThreadPool .Names .SNAPSHOT );
323
324
for (final Map .Entry <Snapshot , Map <ShardId , IndexShardSnapshotStatus >> entry : newSnapshots .entrySet ()) {
324
- Map <String , IndexId > indicesMap = snapshotIndices .get (entry .getKey ());
325
+ final Snapshot snapshot = entry .getKey ();
326
+ final Map <String , IndexId > indicesMap = snapshotIndices .get (snapshot );
325
327
assert indicesMap != null ;
328
+
326
329
for (final Map .Entry <ShardId , IndexShardSnapshotStatus > shardEntry : entry .getValue ().entrySet ()) {
327
330
final ShardId shardId = shardEntry .getKey ();
328
- try {
329
- final IndexShard indexShard = indicesService .indexServiceSafe (shardId .getIndex ()).getShardOrNull (shardId .id ());
330
- final IndexId indexId = indicesMap .get (shardId .getIndexName ());
331
- assert indexId != null ;
332
- executor .execute (new AbstractRunnable () {
333
- @ Override
334
- public void doRun () {
335
- snapshot (indexShard , entry .getKey (), indexId , shardEntry .getValue ());
336
- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
337
- new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
338
- }
331
+ final IndexShard indexShard = indicesService .indexServiceSafe (shardId .getIndex ()).getShardOrNull (shardId .id ());
332
+ final IndexId indexId = indicesMap .get (shardId .getIndexName ());
333
+ assert indexId != null ;
334
+ executor .execute (new AbstractRunnable () {
339
335
340
- @ Override
341
- public void onFailure (Exception e ) {
342
- logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to create snapshot" , shardId , entry .getKey ()), e );
343
- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
344
- new ShardSnapshotStatus (localNodeId , State .FAILED , ExceptionsHelper .detailedMessage (e )), masterNode );
345
- }
336
+ final SetOnce <Exception > failure = new SetOnce <>();
346
337
347
- });
348
- } catch (Exception e ) {
349
- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
350
- new ShardSnapshotStatus (localNodeId , State .FAILED , ExceptionsHelper .detailedMessage (e )), masterNode );
351
- }
338
+ @ Override
339
+ public void doRun () {
340
+ snapshot (indexShard , snapshot , indexId , shardEntry .getValue ());
341
+ }
342
+
343
+ @ Override
344
+ public void onFailure (Exception e ) {
345
+ logger .warn ((Supplier <?>) () ->
346
+ new ParameterizedMessage ("[{}][{}] failed to snapshot shard" , shardId , snapshot ), e );
347
+ failure .set (e );
348
+ }
349
+
350
+ @ Override
351
+ public void onRejection (Exception e ) {
352
+ failure .set (e );
353
+ }
354
+
355
+ @ Override
356
+ public void onAfter () {
357
+ final Exception exception = failure .get ();
358
+ if (exception != null ) {
359
+ final String failure = ExceptionsHelper .detailedMessage (exception );
360
+ notifyFailedSnapshotShard (snapshot , shardId , localNodeId , failure , masterNode );
361
+ } else {
362
+ notifySuccessfulSnapshotShard (snapshot , shardId , localNodeId , masterNode );
363
+ }
364
+ }
365
+ });
352
366
}
353
367
}
354
368
}
@@ -361,37 +375,39 @@ public void onFailure(Exception e) {
361
375
* @param snapshotStatus snapshot status
362
376
*/
363
377
private void snapshot (final IndexShard indexShard , final Snapshot snapshot , final IndexId indexId , final IndexShardSnapshotStatus snapshotStatus ) {
364
- Repository repository = snapshotsService .getRepositoriesService ().repository (snapshot .getRepository ());
365
- ShardId shardId = indexShard .shardId ();
366
- if (!indexShard .routingEntry ().primary ()) {
378
+ final ShardId shardId = indexShard .shardId ();
379
+ if (indexShard .routingEntry ().primary () == false ) {
367
380
throw new IndexShardSnapshotFailedException (shardId , "snapshot should be performed only on primary" );
368
381
}
369
382
if (indexShard .routingEntry ().relocating ()) {
370
383
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
371
384
throw new IndexShardSnapshotFailedException (shardId , "cannot snapshot while relocating" );
372
385
}
373
- if (indexShard .state () == IndexShardState .CREATED || indexShard .state () == IndexShardState .RECOVERING ) {
386
+
387
+ final IndexShardState indexShardState = indexShard .state ();
388
+ if (indexShardState == IndexShardState .CREATED || indexShardState == IndexShardState .RECOVERING ) {
374
389
// shard has just been created, or still recovering
375
390
throw new IndexShardSnapshotFailedException (shardId , "shard didn't fully recover yet" );
376
391
}
377
392
393
+ final Repository repository = snapshotsService .getRepositoriesService ().repository (snapshot .getRepository ());
378
394
try {
379
395
// we flush first to make sure we get the latest writes snapshotted
380
396
IndexCommit snapshotIndexCommit = indexShard .acquireIndexCommit (true );
381
397
try {
382
398
repository .snapshotShard (indexShard , snapshot .getSnapshotId (), indexId , snapshotIndexCommit , snapshotStatus );
383
399
if (logger .isDebugEnabled ()) {
384
- StringBuilder sb = new StringBuilder ();
385
- sb .append (" index : version [" ).append (snapshotStatus .indexVersion ()).append ("], number_of_files [" ).append (snapshotStatus .numberOfFiles ()).append ("] with total_size [" ).append (new ByteSizeValue (snapshotStatus .totalSize ())).append ("]\n " );
400
+ StringBuilder details = new StringBuilder ();
401
+ details .append (" index : version [" ).append (snapshotStatus .indexVersion ());
402
+ details .append ("], number_of_files [" ).append (snapshotStatus .numberOfFiles ());
403
+ details .append ("] with total_size [" ).append (new ByteSizeValue (snapshotStatus .totalSize ())).append ("]\n " );
386
404
logger .debug ("snapshot ({}) completed to {}, took [{}]\n {}" , snapshot , repository ,
387
- TimeValue .timeValueMillis (snapshotStatus .time ()), sb );
405
+ TimeValue .timeValueMillis (snapshotStatus .time ()), details );
388
406
}
389
407
} finally {
390
408
indexShard .releaseIndexCommit (snapshotIndexCommit );
391
409
}
392
- } catch (SnapshotFailedEngineException e ) {
393
- throw e ;
394
- } catch (IndexShardSnapshotFailedException e ) {
410
+ } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e ) {
395
411
throw e ;
396
412
} catch (Exception e ) {
397
413
throw new IndexShardSnapshotFailedException (shardId , "Failed to snapshot" , e );
@@ -406,6 +422,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
406
422
if (snapshotsInProgress == null ) {
407
423
return ;
408
424
}
425
+
409
426
final String localNodeId = event .state ().nodes ().getLocalNodeId ();
410
427
final DiscoveryNode masterNode = event .state ().nodes ().getMasterNode ();
411
428
for (SnapshotsInProgress .Entry snapshot : snapshotsInProgress .entries ()) {
@@ -421,15 +438,16 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
421
438
// Master knows about the shard and thinks it has not completed
422
439
if (localShardStatus .stage () == Stage .DONE ) {
423
440
// but we think the shard is done - we need to make new master know that the shard is done
424
- logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master" , snapshot .snapshot (), shardId );
425
- updateIndexShardSnapshotStatus (snapshot .snapshot (), shardId ,
426
- new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
441
+ logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " +
442
+ "updating status on the master" , snapshot .snapshot (), shardId );
443
+ notifySuccessfulSnapshotShard (snapshot .snapshot (), shardId , localNodeId , masterNode );
444
+
427
445
} else if (localShard .getValue ().stage () == Stage .FAILURE ) {
428
446
// but we think the shard failed - we need to make new master know that the shard failed
429
- logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master" , snapshot . snapshot (), shardId );
430
- updateIndexShardSnapshotStatus ( snapshot .snapshot (), shardId ,
431
- new ShardSnapshotStatus ( localNodeId , State . FAILED , localShardStatus .failure ()), masterNode );
432
-
447
+ logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " +
448
+ "updating status on master" , snapshot .snapshot (), shardId );
449
+ final String failure = localShardStatus .failure ();
450
+ notifyFailedSnapshotShard ( snapshot . snapshot (), shardId , localNodeId , failure , masterNode );
433
451
}
434
452
}
435
453
}
@@ -449,7 +467,6 @@ private SnapshotShards(Map<ShardId, IndexShardSnapshotStatus> shards) {
449
467
}
450
468
}
451
469
452
-
453
470
/**
454
471
* Internal request that is used to send changes in snapshot status to master
455
472
*/
@@ -512,15 +529,33 @@ public boolean isProcessed() {
512
529
}
513
530
}
514
531
515
- /**
516
- * Updates the shard status
517
- */
518
- public void updateIndexShardSnapshotStatus (Snapshot snapshot , ShardId shardId , ShardSnapshotStatus status , DiscoveryNode master ) {
519
- UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest (snapshot , shardId , status );
532
+ /** Notify the master node that the given shard has been successfully snapshotted **/
533
+ void notifySuccessfulSnapshotShard (final Snapshot snapshot ,
534
+ final ShardId shardId ,
535
+ final String localNodeId ,
536
+ final DiscoveryNode masterNode ) {
537
+ sendSnapshotShardUpdate (snapshot , shardId , new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
538
+ }
539
+
540
+ /** Notify the master node that the given shard failed to be snapshotted **/
541
+ void notifyFailedSnapshotShard (final Snapshot snapshot ,
542
+ final ShardId shardId ,
543
+ final String localNodeId ,
544
+ final String failure ,
545
+ final DiscoveryNode masterNode ) {
546
+ sendSnapshotShardUpdate (snapshot , shardId , new ShardSnapshotStatus (localNodeId , State .FAILED , failure ), masterNode );
547
+ }
548
+
549
+ /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
550
+ void sendSnapshotShardUpdate (final Snapshot snapshot ,
551
+ final ShardId shardId ,
552
+ final ShardSnapshotStatus status ,
553
+ final DiscoveryNode masterNode ) {
520
554
try {
521
- transportService .sendRequest (master , UPDATE_SNAPSHOT_ACTION_NAME , request , EmptyTransportResponseHandler .INSTANCE_SAME );
555
+ final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest (snapshot , shardId , status );
556
+ transportService .sendRequest (masterNode , UPDATE_SNAPSHOT_ACTION_NAME , request , EmptyTransportResponseHandler .INSTANCE_SAME );
522
557
} catch (Exception e ) {
523
- logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to update snapshot state" , request . snapshot (), request . status () ), e );
558
+ logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to update snapshot state" , snapshot , status ), e );
524
559
}
525
560
}
526
561
0 commit comments