@@ -264,14 +264,14 @@ && isTargetSameHistory()
264
264
deleteRetentionLeaseStep .whenComplete (ignored -> {
265
265
assert Transports .assertNotTransportThread (RecoverySourceHandler .this + "[phase1]" );
266
266
267
- final Consumer <ActionListener <Long >> getInitialGlobalCheckpoint ;
267
+ final Consumer <ActionListener <RetentionLease >> createRetentionLeaseAsync ;
268
268
if (useRetentionLeases ) {
269
- getInitialGlobalCheckpoint = l -> createRetentionLease (startingSeqNo , l );
269
+ createRetentionLeaseAsync = l -> createRetentionLease (startingSeqNo , l );
270
270
} else {
271
- getInitialGlobalCheckpoint = l -> l .onResponse (SequenceNumbers . UNASSIGNED_SEQ_NO );
271
+ createRetentionLeaseAsync = l -> l .onResponse (null );
272
272
}
273
273
274
- phase1 (safeCommitRef .getIndexCommit (), getInitialGlobalCheckpoint , () -> estimateNumOps , sendFileStep );
274
+ phase1 (safeCommitRef .getIndexCommit (), createRetentionLeaseAsync , () -> estimateNumOps , sendFileStep );
275
275
}, onFailure );
276
276
277
277
} catch (final Exception e ) {
@@ -427,7 +427,7 @@ static final class SendFileResult {
427
427
* segments that are missing. Only segments that have the same size and
428
428
* checksum can be reused
429
429
*/
430
- void phase1 (IndexCommit snapshot , Consumer <ActionListener <Long >> getInitialGlobalCheckpoint ,
430
+ void phase1 (IndexCommit snapshot , Consumer <ActionListener <RetentionLease >> createRetentionLease ,
431
431
IntSupplier translogOps , ActionListener <SendFileResult > listener ) {
432
432
cancellableThreads .checkForCancel ();
433
433
// Total size of segment files that are recovered
@@ -491,7 +491,7 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getInitialGloba
491
491
phase1ExistingFileNames .size (), new ByteSizeValue (existingTotalSizeInBytes ));
492
492
final StepListener <Void > sendFileInfoStep = new StepListener <>();
493
493
final StepListener <Void > sendFilesStep = new StepListener <>();
494
- final StepListener <Long > getInitialGlobalCheckpointStep = new StepListener <>();
494
+ final StepListener <RetentionLease > createRetentionLeaseStep = new StepListener <>();
495
495
final StepListener <Void > cleanFilesStep = new StepListener <>();
496
496
cancellableThreads .execute (() ->
497
497
recoveryTarget .receiveFileInfo (phase1FileNames , phase1FileSizes , phase1ExistingFileNames ,
@@ -500,12 +500,19 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getInitialGloba
500
500
sendFileInfoStep .whenComplete (r ->
501
501
sendFiles (store , phase1Files .toArray (new StoreFileMetaData [0 ]), translogOps , sendFilesStep ), listener ::onFailure );
502
502
503
- sendFilesStep .whenComplete (r -> getInitialGlobalCheckpoint .accept (getInitialGlobalCheckpointStep ), listener ::onFailure );
504
-
505
- getInitialGlobalCheckpointStep .whenComplete (initialGlobalCheckpoint ->
506
- cleanFiles (store , recoverySourceMetadata , translogOps ,
507
- Math .max (initialGlobalCheckpoint , Long .parseLong (snapshot .getUserData ().get (SequenceNumbers .MAX_SEQ_NO ))),
508
- cleanFilesStep ),
503
+ sendFilesStep .whenComplete (r -> createRetentionLease .accept (createRetentionLeaseStep ), listener ::onFailure );
504
+
505
+ createRetentionLeaseStep .whenComplete (retentionLease ->
506
+ {
507
+ final long lastKnownGlobalCheckpoint = shard .getLastKnownGlobalCheckpoint ();
508
+ assert retentionLease == null || retentionLease .retainingSequenceNumber () - 1 <= lastKnownGlobalCheckpoint
509
+ : retentionLease + " vs " + lastKnownGlobalCheckpoint ;
510
+ // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want
511
+ // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica
512
+ // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on
513
+ // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint.
514
+ cleanFiles (store , recoverySourceMetadata , translogOps , lastKnownGlobalCheckpoint , cleanFilesStep );
515
+ },
509
516
listener ::onFailure );
510
517
511
518
final long totalSize = totalSizeInBytes ;
@@ -529,7 +536,7 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getInitialGloba
529
536
}
530
537
}
531
538
532
- private void createRetentionLease (final long startingSeqNo , ActionListener <Long > initialGlobalCheckpointListener ) {
539
+ private void createRetentionLease (final long startingSeqNo , ActionListener <RetentionLease > listener ) {
533
540
runUnderPrimaryPermit (() -> {
534
541
// Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local
535
542
// checkpoint of the safe commit we're creating and this lease's retained seqno with the retention lock, and by cloning an
@@ -546,22 +553,18 @@ private void createRetentionLease(final long startingSeqNo, ActionListener<Long>
546
553
new ThreadedActionListener <>(logger , shard .getThreadPool (),
547
554
ThreadPool .Names .GENERIC , cloneRetentionLeaseStep , false ));
548
555
logger .trace ("cloned primary's retention lease as [{}]" , clonedLease );
549
- cloneRetentionLeaseStep .whenComplete (
550
- rr -> initialGlobalCheckpointListener .onResponse (clonedLease .retainingSequenceNumber () - 1 ),
551
- initialGlobalCheckpointListener ::onFailure );
556
+ cloneRetentionLeaseStep .whenComplete (rr -> listener .onResponse (clonedLease ), listener ::onFailure );
552
557
} catch (RetentionLeaseNotFoundException e ) {
553
558
// it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before
554
559
// 7.4, and in that case we just create a lease using the local checkpoint of the safe commit which we're using for
555
560
// recovery as a conservative estimate for the global checkpoint.
556
561
assert shard .indexSettings ().getIndexVersionCreated ().before (Version .V_7_4_0 );
557
562
final StepListener <ReplicationResponse > addRetentionLeaseStep = new StepListener <>();
558
563
final long estimatedGlobalCheckpoint = startingSeqNo - 1 ;
559
- shard .addPeerRecoveryRetentionLease (request .targetNode ().getId (),
564
+ final RetentionLease newLease = shard .addPeerRecoveryRetentionLease (request .targetNode ().getId (),
560
565
estimatedGlobalCheckpoint , new ThreadedActionListener <>(logger , shard .getThreadPool (),
561
566
ThreadPool .Names .GENERIC , addRetentionLeaseStep , false ));
562
- addRetentionLeaseStep .whenComplete (
563
- rr -> initialGlobalCheckpointListener .onResponse (estimatedGlobalCheckpoint ),
564
- initialGlobalCheckpointListener ::onFailure );
567
+ addRetentionLeaseStep .whenComplete (rr -> listener .onResponse (newLease ), listener ::onFailure );
565
568
logger .trace ("created retention lease with estimated checkpoint of [{}]" , estimatedGlobalCheckpoint );
566
569
}
567
570
}, shardId + " establishing retention lease for [" + request .targetAllocationId () + "]" ,
0 commit comments