Skip to content

Commit 14fde1c

Browse files
committed
Base initial GCP on the cloned retention lease
1 parent 8e00661 commit 14fde1c

File tree

2 files changed

+63
-42
lines changed

2 files changed

+63
-42
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.util.Comparator;
7979
import java.util.List;
8080
import java.util.Locale;
81+
import java.util.OptionalLong;
8182
import java.util.concurrent.CompletableFuture;
8283
import java.util.concurrent.CopyOnWriteArrayList;
8384
import java.util.concurrent.atomic.AtomicInteger;
@@ -196,7 +197,6 @@ && isTargetSameHistory()
196197
}
197198

198199
final StepListener<SendFileResult> sendFileStep = new StepListener<>();
199-
final StepListener<ReplicationResponse> establishRetentionLeaseStep = new StepListener<>();
200200
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
201201
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
202202
final StepListener<Void> finalizeStep = new StepListener<>();
@@ -264,7 +264,16 @@ && isTargetSameHistory()
264264

265265
deleteRetentionLeaseStep.whenComplete(ignored -> {
266266
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
267-
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
267+
268+
final Consumer<ActionListener<Long>> getGlobalCheckpoint;
269+
if (useRetentionLeases) {
270+
getGlobalCheckpoint = l -> createRetentionLease(startingSeqNo, l);
271+
} else {
272+
final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint();
273+
getGlobalCheckpoint = l -> l.onResponse(globalCheckpoint);
274+
}
275+
276+
phase1(safeCommitRef.getIndexCommit(), getGlobalCheckpoint, () -> estimateNumOps, sendFileStep);
268277
}, onFailure);
269278

270279
} catch (final Exception e) {
@@ -274,41 +283,6 @@ && isTargetSameHistory()
274283
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
275284

276285
sendFileStep.whenComplete(r -> {
277-
if (useRetentionLeases && isSequenceNumberBasedRecovery == false) {
278-
// We can in general use retention leases for peer recovery, but there is no lease for the target node right now.
279-
runUnderPrimaryPermit(() -> {
280-
// Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the
281-
// the local checkpoint of the safe commit we're creating and this lease's retained seqno with the retention
282-
// lock, and by cloning an existing lease we (approximately) know that all our peers are also retaining history
283-
// as requested by the cloned lease. If the recovery now fails before copying enough history over then a
284-
// subsequent attempt will find this lease, determine it is not enough, and fall back to a file-based recovery.
285-
//
286-
// (approximately) because we do not guarantee to be able to satisfy every lease on every peer.
287-
logger.trace("cloning primary's retention lease");
288-
try {
289-
final RetentionLease clonedLease = shard.cloneLocalPeerRecoveryRetentionLease(request.targetNode().getId(),
290-
new ThreadedActionListener<>(logger, shard.getThreadPool(),
291-
ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false));
292-
logger.trace("cloned primary's retention lease as [{}]", clonedLease);
293-
} catch (RetentionLeaseNotFoundException e) {
294-
// it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a
295-
// version before 7.4, and in that case we just create a lease using the local checkpoint of the safe commit
296-
// which we're using for recovery as a conservative estimate for the global checkpoint.
297-
assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0);
298-
final long estimatedGlobalCheckpoint = startingSeqNo - 1;
299-
shard.addPeerRecoveryRetentionLease(request.targetNode().getId(),
300-
estimatedGlobalCheckpoint, new ThreadedActionListener<>(logger, shard.getThreadPool(),
301-
ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false));
302-
logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint);
303-
}
304-
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]",
305-
shard, cancellableThreads, logger);
306-
} else {
307-
establishRetentionLeaseStep.onResponse(null);
308-
}
309-
}, onFailure);
310-
311-
establishRetentionLeaseStep.whenComplete(r -> {
312286
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
313287
// For a sequence based recovery, the target can keep its local translog
314288
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
@@ -455,7 +429,8 @@ static final class SendFileResult {
455429
* segments that are missing. Only segments that have the same size and
456430
* checksum can be reused
457431
*/
458-
void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
432+
void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getGlobalCheckpoint,
433+
IntSupplier translogOps, ActionListener<SendFileResult> listener) {
459434
cancellableThreads.checkForCancel();
460435
// Total size of segment files that are recovered
461436
long totalSizeInBytes = 0;
@@ -518,6 +493,7 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps
518493
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));
519494
final StepListener<Void> sendFileInfoStep = new StepListener<>();
520495
final StepListener<Void> sendFilesStep = new StepListener<>();
496+
final StepListener<Long> getGlobalCheckpointStep = new StepListener<>();
521497
final StepListener<Void> cleanFilesStep = new StepListener<>();
522498
cancellableThreads.execute(() ->
523499
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
@@ -526,7 +502,9 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps
526502
sendFileInfoStep.whenComplete(r ->
527503
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure);
528504

529-
sendFilesStep.whenComplete(r ->
505+
sendFilesStep.whenComplete(r -> getGlobalCheckpoint.accept(getGlobalCheckpointStep), listener::onFailure);
506+
507+
getGlobalCheckpointStep.whenComplete(globalCheckpoint ->
530508
cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep), listener::onFailure);
531509

532510
final long totalSize = totalSizeInBytes;
@@ -550,6 +528,45 @@ void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps
550528
}
551529
}
552530

531+
private void createRetentionLease(final long startingSeqNo, ActionListener<Long> listener) {
532+
runUnderPrimaryPermit(() -> {
533+
// Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local
534+
// checkpoint of the safe commit we're creating and this lease's retained seqno with the retention lock, and by cloning an
535+
// existing lease we (approximately) know that all our peers are also retaining history as requested by the cloned lease. If
536+
// the recovery now fails before copying enough history over then a subsequent attempt will find this lease, determine it is
537+
// not enough, and fall back to a file-based recovery.
538+
//
539+
// (approximately) because we do not guarantee to be able to satisfy every lease on every peer.
540+
logger.trace("cloning primary's retention lease");
541+
try {
542+
final StepListener<ReplicationResponse> cloneRetentionLeaseStep = new StepListener<>();
543+
final RetentionLease clonedLease
544+
= shard.cloneLocalPeerRecoveryRetentionLease(request.targetNode().getId(),
545+
new ThreadedActionListener<>(logger, shard.getThreadPool(),
546+
ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false));
547+
logger.trace("cloned primary's retention lease as [{}]", clonedLease);
548+
cloneRetentionLeaseStep.whenComplete(
549+
rr -> listener.onResponse(clonedLease.retainingSequenceNumber() - 1),
550+
listener::onFailure);
551+
} catch (RetentionLeaseNotFoundException e) {
552+
// it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before
553+
// 7.4, and in that case we just create a lease using the local checkpoint of the safe commit which we're using for
554+
// recovery as a conservative estimate for the global checkpoint.
555+
assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0);
556+
final StepListener<ReplicationResponse> addRetentionLeaseStep = new StepListener<>();
557+
final long estimatedGlobalCheckpoint = startingSeqNo - 1;
558+
shard.addPeerRecoveryRetentionLease(request.targetNode().getId(),
559+
estimatedGlobalCheckpoint, new ThreadedActionListener<>(logger, shard.getThreadPool(),
560+
ThreadPool.Names.GENERIC, addRetentionLeaseStep, false));
561+
addRetentionLeaseStep.whenComplete(
562+
rr -> listener.onResponse(estimatedGlobalCheckpoint),
563+
listener::onFailure);
564+
logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint);
565+
}
566+
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]",
567+
shard, cancellableThreads, logger);
568+
}
569+
553570
boolean canSkipPhase1(Store.MetadataSnapshot source, Store.MetadataSnapshot target) {
554571
if (source.getSyncId() == null || source.getSyncId().equals(target.getSyncId()) == false) {
555572
return false;

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import java.util.concurrent.atomic.AtomicBoolean;
102102
import java.util.concurrent.atomic.AtomicInteger;
103103
import java.util.concurrent.atomic.AtomicLong;
104+
import java.util.function.Consumer;
104105
import java.util.function.IntSupplier;
105106
import java.util.zip.CRC32;
106107

@@ -466,9 +467,10 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
466467
between(1, 8)) {
467468

468469
@Override
469-
void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
470+
void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getGlobalCheckpoint,
471+
IntSupplier translogOps, ActionListener<SendFileResult> listener) {
470472
phase1Called.set(true);
471-
super.phase1(snapshot, globalCheckpoint, translogOps, listener);
473+
super.phase1(snapshot, getGlobalCheckpoint, translogOps, listener);
472474
}
473475

474476
@Override
@@ -683,7 +685,9 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada
683685
final StepListener<RecoverySourceHandler.SendFileResult> phase1Listener = new StepListener<>();
684686
try {
685687
final CountDownLatch latch = new CountDownLatch(1);
686-
handler.phase1(DirectoryReader.listCommits(dir).get(0), randomNonNegativeLong(), () -> 0,
688+
handler.phase1(DirectoryReader.listCommits(dir).get(0),
689+
l -> recoveryExecutor.execute(() -> l.onResponse(randomNonNegativeLong())),
690+
() -> 0,
687691
new LatchedActionListener<>(phase1Listener, latch));
688692
latch.await();
689693
phase1Listener.result();

0 commit comments

Comments
 (0)