23
23
import org .apache .logging .log4j .LogManager ;
24
24
import org .apache .logging .log4j .Logger ;
25
25
import org .apache .logging .log4j .message .ParameterizedMessage ;
26
- import org .apache .lucene .util .SetOnce ;
27
26
import org .elasticsearch .ExceptionsHelper ;
28
27
import org .elasticsearch .action .ActionListener ;
29
28
import org .elasticsearch .action .ActionRequestValidationException ;
53
52
import org .elasticsearch .common .io .stream .StreamOutput ;
54
53
import org .elasticsearch .common .settings .Settings ;
55
54
import org .elasticsearch .common .unit .TimeValue ;
56
- import org .elasticsearch .common . util . concurrent . AbstractRunnable ;
55
+ import org .elasticsearch .core . internal . io . IOUtils ;
57
56
import org .elasticsearch .index .engine .Engine ;
58
- import org .elasticsearch .index .engine .SnapshotFailedEngineException ;
59
57
import org .elasticsearch .index .shard .IndexEventListener ;
60
58
import org .elasticsearch .index .shard .IndexShard ;
61
59
import org .elasticsearch .index .shard .IndexShardState ;
80
78
import java .util .Iterator ;
81
79
import java .util .List ;
82
80
import java .util .Map ;
83
- import java .util .concurrent .Executor ;
84
81
import java .util .function .Function ;
85
82
import java .util .stream .Collectors ;
86
83
@@ -298,46 +295,33 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
298
295
}
299
296
300
297
private void startNewShards (SnapshotsInProgress .Entry entry , Map <ShardId , IndexShardSnapshotStatus > startedShards ) {
301
- final Snapshot snapshot = entry .snapshot ();
302
- final Map <String , IndexId > indicesMap = entry .indices ().stream ().collect (Collectors .toMap (IndexId ::getName , Function .identity ()));
303
- final Executor executor = threadPool .executor (ThreadPool .Names .SNAPSHOT );
304
- for (final Map .Entry <ShardId , IndexShardSnapshotStatus > shardEntry : startedShards .entrySet ()) {
305
- final ShardId shardId = shardEntry .getKey ();
306
- final IndexId indexId = indicesMap .get (shardId .getIndexName ());
307
- assert indexId != null ;
308
- executor .execute (new AbstractRunnable () {
309
-
310
- private final SetOnce <Exception > failure = new SetOnce <>();
311
-
312
- @ Override
313
- public void doRun () {
314
- final IndexShard indexShard =
315
- indicesService .indexServiceSafe (shardId .getIndex ()).getShardOrNull (shardId .id ());
316
- snapshot (indexShard , snapshot , indexId , shardEntry .getValue ());
317
- }
318
-
319
- @ Override
320
- public void onFailure (Exception e ) {
321
- logger .warn (() -> new ParameterizedMessage ("[{}][{}] failed to snapshot shard" , shardId , snapshot ), e );
322
- failure .set (e );
323
- }
324
-
325
- @ Override
326
- public void onRejection (Exception e ) {
327
- failure .set (e );
328
- }
329
-
330
- @ Override
331
- public void onAfter () {
332
- final Exception exception = failure .get ();
333
- if (exception != null ) {
334
- notifyFailedSnapshotShard (snapshot , shardId , ExceptionsHelper .detailedMessage (exception ));
335
- } else {
298
+ threadPool .executor (ThreadPool .Names .SNAPSHOT ).execute (() -> {
299
+ final Snapshot snapshot = entry .snapshot ();
300
+ final Map <String , IndexId > indicesMap =
301
+ entry .indices ().stream ().collect (Collectors .toMap (IndexId ::getName , Function .identity ()));
302
+ for (final Map .Entry <ShardId , IndexShardSnapshotStatus > shardEntry : startedShards .entrySet ()) {
303
+ final ShardId shardId = shardEntry .getKey ();
304
+ final IndexShardSnapshotStatus snapshotStatus = shardEntry .getValue ();
305
+ final IndexId indexId = indicesMap .get (shardId .getIndexName ());
306
+ assert indexId != null ;
307
+ snapshot (shardId , snapshot , indexId , snapshotStatus , new ActionListener <>() {
308
+ @ Override
309
+ public void onResponse (final Void aVoid ) {
310
+ if (logger .isDebugEnabled ()) {
311
+ final IndexShardSnapshotStatus .Copy lastSnapshotStatus = snapshotStatus .asCopy ();
312
+ logger .debug ("snapshot ({}) completed to {} with {}" , snapshot , snapshot .getRepository (), lastSnapshotStatus );
313
+ }
336
314
notifySuccessfulSnapshotShard (snapshot , shardId );
337
315
}
338
- }
339
- });
340
- }
316
+
317
+ @ Override
318
+ public void onFailure (Exception e ) {
319
+ logger .warn (() -> new ParameterizedMessage ("[{}][{}] failed to snapshot shard" , shardId , snapshot ), e );
320
+ notifyFailedSnapshotShard (snapshot , shardId , ExceptionsHelper .detailedMessage (e ));
321
+ }
322
+ });
323
+ }
324
+ });
341
325
}
342
326
343
327
/**
@@ -346,38 +330,37 @@ public void onAfter() {
346
330
* @param snapshot snapshot
347
331
* @param snapshotStatus snapshot status
348
332
*/
349
- private void snapshot (final IndexShard indexShard , final Snapshot snapshot , final IndexId indexId ,
350
- final IndexShardSnapshotStatus snapshotStatus ) {
351
- final ShardId shardId = indexShard .shardId ();
352
- if (indexShard .routingEntry ().primary () == false ) {
353
- throw new IndexShardSnapshotFailedException (shardId , "snapshot should be performed only on primary" );
354
- }
355
- if (indexShard .routingEntry ().relocating ()) {
356
- // do not snapshot when in the process of relocation of primaries so we won't get conflicts
357
- throw new IndexShardSnapshotFailedException (shardId , "cannot snapshot while relocating" );
358
- }
333
+ private void snapshot (final ShardId shardId , final Snapshot snapshot , final IndexId indexId ,
334
+ final IndexShardSnapshotStatus snapshotStatus , ActionListener <Void > listener ) {
335
+ try {
336
+ final IndexShard indexShard = indicesService .indexServiceSafe (shardId .getIndex ()).getShardOrNull (shardId .id ());
337
+ if (indexShard .routingEntry ().primary () == false ) {
338
+ throw new IndexShardSnapshotFailedException (shardId , "snapshot should be performed only on primary" );
339
+ }
340
+ if (indexShard .routingEntry ().relocating ()) {
341
+ // do not snapshot when in the process of relocation of primaries so we won't get conflicts
342
+ throw new IndexShardSnapshotFailedException (shardId , "cannot snapshot while relocating" );
343
+ }
359
344
360
- final IndexShardState indexShardState = indexShard .state ();
361
- if (indexShardState == IndexShardState .CREATED || indexShardState == IndexShardState .RECOVERING ) {
362
- // shard has just been created, or still recovering
363
- throw new IndexShardSnapshotFailedException (shardId , "shard didn't fully recover yet" );
364
- }
345
+ final IndexShardState indexShardState = indexShard .state ();
346
+ if (indexShardState == IndexShardState .CREATED || indexShardState == IndexShardState .RECOVERING ) {
347
+ // shard has just been created, or still recovering
348
+ throw new IndexShardSnapshotFailedException (shardId , "shard didn't fully recover yet" );
349
+ }
365
350
366
- final Repository repository = repositoriesService .repository (snapshot .getRepository ());
367
- try {
368
- // we flush first to make sure we get the latest writes snapshotted
369
- try (Engine .IndexCommitRef snapshotRef = indexShard .acquireLastIndexCommit (true )) {
351
+ final Repository repository = repositoriesService .repository (snapshot .getRepository ());
352
+ Engine .IndexCommitRef snapshotRef = null ;
353
+ try {
354
+ // we flush first to make sure we get the latest writes snapshotted
355
+ snapshotRef = indexShard .acquireLastIndexCommit (true );
370
356
repository .snapshotShard (indexShard .store (), indexShard .mapperService (), snapshot .getSnapshotId (), indexId ,
371
- snapshotRef .getIndexCommit (), snapshotStatus );
372
- if (logger .isDebugEnabled ()) {
373
- final IndexShardSnapshotStatus .Copy lastSnapshotStatus = snapshotStatus .asCopy ();
374
- logger .debug ("snapshot ({}) completed to {} with {}" , snapshot , repository , lastSnapshotStatus );
375
- }
357
+ snapshotRef .getIndexCommit (), snapshotStatus , ActionListener .runBefore (listener , snapshotRef ::close ));
358
+ } catch (Exception e ) {
359
+ IOUtils .close (snapshotRef );
360
+ throw e ;
376
361
}
377
- } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e ) {
378
- throw e ;
379
362
} catch (Exception e ) {
380
- throw new IndexShardSnapshotFailedException ( shardId , "Failed to snapshot" , e );
363
+ listener . onFailure ( e );
381
364
}
382
365
}
383
366
0 commit comments