Skip to content

Commit 24b5806

Browse files
committed
Ensure that commit remains safe
1 parent 825fc8f commit 24b5806

File tree

2 files changed

+19
-17
lines changed

2 files changed

+19
-17
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

+17-15
Original file line numberDiff line numberDiff line change
@@ -264,15 +264,14 @@ && isTargetSameHistory()
264264
deleteRetentionLeaseStep.whenComplete(ignored -> {
265265
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
266266

267-
final Consumer<ActionListener<Long>> getGlobalCheckpoint;
267+
final Consumer<ActionListener<Long>> getInitialGlobalCheckpoint;
268268
if (useRetentionLeases) {
269-
getGlobalCheckpoint = l -> createRetentionLease(startingSeqNo, l);
269+
getInitialGlobalCheckpoint = l -> createRetentionLease(startingSeqNo, l);
270270
} else {
271-
final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint();
272-
getGlobalCheckpoint = l -> l.onResponse(globalCheckpoint);
271+
getInitialGlobalCheckpoint = l -> l.onResponse(SequenceNumbers.UNASSIGNED_SEQ_NO);
273272
}
274273

275-
phase1(safeCommitRef.getIndexCommit(), getGlobalCheckpoint, () -> estimateNumOps, sendFileStep);
274+
phase1(safeCommitRef.getIndexCommit(), getInitialGlobalCheckpoint, () -> estimateNumOps, sendFileStep);
276275
}, onFailure);
277276

278277
} catch (final Exception e) {
@@ -428,7 +427,7 @@ static final class SendFileResult {
428427
* segments that are missing. Only segments that have the same size and
429428
* checksum can be reused
430429
*/
431-
void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getGlobalCheckpoint,
430+
void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getInitialGlobalCheckpoint,
432431
IntSupplier translogOps, ActionListener<SendFileResult> listener) {
433432
cancellableThreads.checkForCancel();
434433
// Total size of segment files that are recovered
@@ -492,7 +491,7 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getGlobalCheckp
492491
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));
493492
final StepListener<Void> sendFileInfoStep = new StepListener<>();
494493
final StepListener<Void> sendFilesStep = new StepListener<>();
495-
final StepListener<Long> getGlobalCheckpointStep = new StepListener<>();
494+
final StepListener<Long> getInitialGlobalCheckpointStep = new StepListener<>();
496495
final StepListener<Void> cleanFilesStep = new StepListener<>();
497496
cancellableThreads.execute(() ->
498497
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
@@ -501,10 +500,13 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getGlobalCheckp
501500
sendFileInfoStep.whenComplete(r ->
502501
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure);
503502

504-
sendFilesStep.whenComplete(r -> getGlobalCheckpoint.accept(getGlobalCheckpointStep), listener::onFailure);
503+
sendFilesStep.whenComplete(r -> getInitialGlobalCheckpoint.accept(getInitialGlobalCheckpointStep), listener::onFailure);
505504

506-
getGlobalCheckpointStep.whenComplete(globalCheckpoint ->
507-
cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep), listener::onFailure);
505+
getInitialGlobalCheckpointStep.whenComplete(initialGlobalCheckpoint ->
506+
cleanFiles(store, recoverySourceMetadata, translogOps,
507+
Math.max(initialGlobalCheckpoint, Long.parseLong(snapshot.getUserData().get(SequenceNumbers.MAX_SEQ_NO))),
508+
cleanFilesStep),
509+
listener::onFailure);
508510

509511
final long totalSize = totalSizeInBytes;
510512
final long existingTotalSize = existingTotalSizeInBytes;
@@ -527,7 +529,7 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getGlobalCheckp
527529
}
528530
}
529531

530-
private void createRetentionLease(final long startingSeqNo, ActionListener<Long> listener) {
532+
private void createRetentionLease(final long startingSeqNo, ActionListener<Long> initialGlobalCheckpointListener) {
531533
runUnderPrimaryPermit(() -> {
532534
// Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local
533535
// checkpoint of the safe commit we're creating and this lease's retained seqno with the retention lock, and by cloning an
@@ -545,8 +547,8 @@ private void createRetentionLease(final long startingSeqNo, ActionListener<Long>
545547
ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false));
546548
logger.trace("cloned primary's retention lease as [{}]", clonedLease);
547549
cloneRetentionLeaseStep.whenComplete(
548-
rr -> listener.onResponse(clonedLease.retainingSequenceNumber() - 1),
549-
listener::onFailure);
550+
rr -> initialGlobalCheckpointListener.onResponse(clonedLease.retainingSequenceNumber() - 1),
551+
initialGlobalCheckpointListener::onFailure);
550552
} catch (RetentionLeaseNotFoundException e) {
551553
// it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before
552554
// 7.4, and in that case we just create a lease using the local checkpoint of the safe commit which we're using for
@@ -558,8 +560,8 @@ private void createRetentionLease(final long startingSeqNo, ActionListener<Long>
558560
estimatedGlobalCheckpoint, new ThreadedActionListener<>(logger, shard.getThreadPool(),
559561
ThreadPool.Names.GENERIC, addRetentionLeaseStep, false));
560562
addRetentionLeaseStep.whenComplete(
561-
rr -> listener.onResponse(estimatedGlobalCheckpoint),
562-
listener::onFailure);
563+
rr -> initialGlobalCheckpointListener.onResponse(estimatedGlobalCheckpoint),
564+
initialGlobalCheckpointListener::onFailure);
563565
logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint);
564566
}
565567
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]",

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -467,10 +467,10 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
467467
between(1, 8)) {
468468

469469
@Override
470-
void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getGlobalCheckpoint,
470+
void phase1(IndexCommit snapshot, Consumer<ActionListener<Long>> getInitialGlobalCheckpoint,
471471
IntSupplier translogOps, ActionListener<SendFileResult> listener) {
472472
phase1Called.set(true);
473-
super.phase1(snapshot, getGlobalCheckpoint, translogOps, listener);
473+
super.phase1(snapshot, getInitialGlobalCheckpoint, translogOps, listener);
474474
}
475475

476476
@Override

0 commit comments

Comments
 (0)