Skip to content

Commit 5fb8bda

Browse files
committed
Create new PRRL using global checkpoint
By cloning the primary's lease we need not worry about creating a lease that retains history which has already been discarded.
1 parent e9f8ff6 commit 5fb8bda

File tree

4 files changed

+173
-12
lines changed

4 files changed

+173
-12
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+35
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,35 @@ public RetentionLease addRetentionLease(
290290
return retentionLease;
291291
}
292292

293+
/**
294+
* Atomically clones an existing retention lease to a new ID.
295+
*
296+
* @param sourceLeaseId the identifier of the source retention lease
297+
* @param targetLeaseId the identifier of the retention lease to create
298+
* @param listener the callback when the retention lease is successfully added and synced to replicas
299+
* @return the new retention lease
300+
* @throws RetentionLeaseNotFoundException if the specified source retention lease does not exist
301+
* @throws RetentionLeaseAlreadyExistsException if the specified target retention lease already exists
302+
*/
303+
RetentionLease cloneRetentionLease(String sourceLeaseId, String targetLeaseId, ActionListener<ReplicationResponse> listener) {
304+
Objects.requireNonNull(listener);
305+
final RetentionLease retentionLease;
306+
final RetentionLeases currentRetentionLeases;
307+
synchronized (this) {
308+
if (getRetentionLeases().contains(sourceLeaseId) == false) {
309+
throw new RetentionLeaseNotFoundException(sourceLeaseId);
310+
}
311+
final RetentionLease sourceLease = getRetentionLeases().get(sourceLeaseId);
312+
retentionLease = innerAddRetentionLease(targetLeaseId, sourceLease.retainingSequenceNumber(), sourceLease.source());
313+
currentRetentionLeases = retentionLeases;
314+
}
315+
316+
// Syncing here may not be strictly necessary, because this new lease isn't retaining any extra history that wasn't previously
317+
// retained by the source lease; however we prefer to sync anyway since we expect to do so whenever creating a new lease.
318+
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
319+
return retentionLease;
320+
}
321+
293322
/**
294323
* Adds a new retention lease, but does not synchronise it with the rest of the replication group.
295324
*
@@ -446,6 +475,12 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
446475
addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener);
447476
}
448477

478+
public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
479+
return cloneRetentionLease(
480+
getPeerRecoveryRetentionLeaseId(getPeerRecoveryRetentionLeaseId(routingTable.primaryShard())),
481+
getPeerRecoveryRetentionLeaseId(nodeId), listener);
482+
}
483+
449484
public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
450485
removeRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), listener);
451486
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+8
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.Assertions;
4141
import org.elasticsearch.ElasticsearchException;
4242
import org.elasticsearch.ExceptionsHelper;
43+
import org.elasticsearch.Version;
4344
import org.elasticsearch.action.ActionListener;
4445
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
4546
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
@@ -2591,9 +2592,16 @@ public boolean isRelocatedPrimary() {
25912592

25922593
public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener) {
25932594
assert assertPrimaryMode();
2595+
// only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs:
2596+
assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0);
25942597
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
25952598
}
25962599

2600+
public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
2601+
assert assertPrimaryMode();
2602+
return replicationTracker.cloneLocalPeerRecoveryRetentionLease(nodeId, listener);
2603+
}
2604+
25972605
public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
25982606
assert assertPrimaryMode();
25992607
replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener);

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

+31-12
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.lucene.util.ArrayUtil;
3232
import org.apache.lucene.util.SetOnce;
3333
import org.elasticsearch.ExceptionsHelper;
34+
import org.elasticsearch.Version;
3435
import org.elasticsearch.action.ActionListener;
3536
import org.elasticsearch.action.StepListener;
3637
import org.elasticsearch.action.support.ThreadedActionListener;
@@ -177,6 +178,11 @@ && isTargetSameHistory()
177178
&& shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())
178179
&& (useRetentionLeases == false
179180
|| (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
181+
// NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,
182+
// because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's
183+
// possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold.
184+
// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery
185+
// without having a complete history.
180186

181187
if (isSequenceNumberBasedRecovery && useRetentionLeases) {
182188
// all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
@@ -271,19 +277,32 @@ && isTargetSameHistory()
271277
if (useRetentionLeases && isSequenceNumberBasedRecovery == false) {
272278
// We can in general use retention leases for peer recovery, but there is no lease for the target node right now.
273279
runUnderPrimaryPermit(() -> {
274-
// Start the lease off retaining all the history needed from the local checkpoint of the safe commit that we've
275-
// just established on the replica. This primary is certainly retaining such history, but other replicas might
276-
// not be. No big deal if this recovery succeeds, but if this primary fails then the new primary might have to
277-
// repeat phase 1 to recover this replica.
278-
final long localCheckpointOfSafeCommit = startingSeqNo - 1;
279-
logger.trace("creating new retention lease at [{}]", localCheckpointOfSafeCommit);
280-
shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), localCheckpointOfSafeCommit,
281-
new ThreadedActionListener<>(logger, shard.getThreadPool(),
282-
ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false));
280+
// Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the
281+
// the local checkpoint of the safe commit we're creating and this lease's retained seqno with the retention
282+
// lock, and by cloning an existing lease we (approximately) know that all our peers are also retaining history
283+
// as requested by the cloned lease. If the recovery now fails before copying enough history over then a
284+
// subsequent attempt will find this lease, determine it is not enough, and fall back to a file-based recovery.
285+
//
286+
// (approximately) because we do not guarantee to be able to satisfy every lease on every peer.
287+
logger.trace("cloning primary's retention lease");
288+
try {
289+
final RetentionLease clonedLease = shard.cloneLocalPeerRecoveryRetentionLease(request.targetNode().getId(),
290+
new ThreadedActionListener<>(logger, shard.getThreadPool(),
291+
ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false));
292+
logger.trace("cloned primary's retention lease as [{}]", clonedLease);
293+
} catch (RetentionLeaseNotFoundException e) {
294+
// it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a
295+
// version before 7.4, and in that case we just create a lease using the local checkpoint of the safe commit
296+
// which we're using for recovery as a conservative estimate for the global checkpoint.
297+
assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0);
298+
final long estimatedGlobalCheckpoint = startingSeqNo - 1;
299+
shard.addPeerRecoveryRetentionLease(request.targetNode().getId(),
300+
estimatedGlobalCheckpoint, new ThreadedActionListener<>(logger, shard.getThreadPool(),
301+
ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false));
302+
logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint);
303+
}
283304
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]",
284305
shard, cancellableThreads, logger);
285-
// all the history we need is now retained by a retention lease so we can close the retention lock
286-
retentionLock.close();
287306
} else {
288307
establishRetentionLeaseStep.onResponse(null);
289308
}
@@ -313,7 +332,7 @@ && isTargetSameHistory()
313332
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
314333
resources.add(phase2Snapshot);
315334

316-
if (useRetentionLeases == false) {
335+
if (useRetentionLeases == false || isSequenceNumberBasedRecovery == false) {
317336
// we can release the retention lock here because the snapshot itself will retain the required operations.
318337
retentionLock.close();
319338
}

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

+99
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.elasticsearch.index.seqno;
2121

2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.support.PlainActionFuture;
24+
import org.elasticsearch.action.support.replication.ReplicationResponse;
2325
import org.elasticsearch.cluster.routing.AllocationId;
2426
import org.elasticsearch.common.collect.Tuple;
2527
import org.elasticsearch.common.settings.Settings;
@@ -247,6 +249,103 @@ public void testRemoveRetentionLease() {
247249
}
248250
}
249251

252+
public void testCloneRetentionLease() {
253+
final AllocationId allocationId = AllocationId.newInitializing();
254+
final AtomicReference<ReplicationTracker> replicationTrackerRef = new AtomicReference<>();
255+
final AtomicLong timeReference = new AtomicLong();
256+
final AtomicBoolean synced = new AtomicBoolean();
257+
final ReplicationTracker replicationTracker = new ReplicationTracker(
258+
new ShardId("test", "_na", 0),
259+
allocationId.getId(),
260+
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
261+
randomLongBetween(1, Long.MAX_VALUE),
262+
UNASSIGNED_SEQ_NO,
263+
value -> {},
264+
timeReference::get,
265+
(leases, listener) -> {
266+
assertFalse(Thread.holdsLock(replicationTrackerRef.get()));
267+
assertTrue(synced.compareAndSet(false, true));
268+
listener.onResponse(new ReplicationResponse());
269+
});
270+
replicationTrackerRef.set(replicationTracker);
271+
replicationTracker.updateFromMaster(
272+
randomNonNegativeLong(),
273+
Collections.singleton(allocationId.getId()),
274+
routingTable(Collections.emptySet(), allocationId));
275+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
276+
277+
final long addTime = randomLongBetween(timeReference.get(), Long.MAX_VALUE);
278+
timeReference.set(addTime);
279+
final long minimumRetainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
280+
final PlainActionFuture<ReplicationResponse> addFuture = new PlainActionFuture<>();
281+
replicationTracker.addRetentionLease("source", minimumRetainingSequenceNumber, "test-source", addFuture);
282+
addFuture.actionGet();
283+
assertTrue(synced.get());
284+
synced.set(false);
285+
286+
final long cloneTime = randomLongBetween(timeReference.get(), Long.MAX_VALUE);
287+
timeReference.set(cloneTime);
288+
final PlainActionFuture<ReplicationResponse> cloneFuture = new PlainActionFuture<>();
289+
final RetentionLease clonedLease = replicationTracker.cloneRetentionLease("source", "target", cloneFuture);
290+
cloneFuture.actionGet();
291+
assertTrue(synced.get());
292+
synced.set(false);
293+
294+
assertThat(clonedLease.id(), equalTo("target"));
295+
assertThat(clonedLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumber));
296+
assertThat(clonedLease.timestamp(), equalTo(cloneTime));
297+
assertThat(clonedLease.source(), equalTo("test-source"));
298+
299+
assertThat(replicationTracker.getRetentionLeases().get("target"), equalTo(clonedLease));
300+
}
301+
302+
public void testCloneNonexistentRetentionLease() {
303+
final AllocationId allocationId = AllocationId.newInitializing();
304+
final ReplicationTracker replicationTracker = new ReplicationTracker(
305+
new ShardId("test", "_na", 0),
306+
allocationId.getId(),
307+
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
308+
randomLongBetween(1, Long.MAX_VALUE),
309+
UNASSIGNED_SEQ_NO,
310+
value -> {},
311+
() -> 0L,
312+
(leases, listener) -> { });
313+
replicationTracker.updateFromMaster(
314+
randomNonNegativeLong(),
315+
Collections.singleton(allocationId.getId()),
316+
routingTable(Collections.emptySet(), allocationId));
317+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
318+
319+
assertThat(expectThrows(RetentionLeaseNotFoundException.class,
320+
() -> replicationTracker.cloneRetentionLease("nonexistent-lease-id", "target", ActionListener.wrap(() -> {}))).getMessage(),
321+
equalTo("retention lease with ID [nonexistent-lease-id] not found"));
322+
}
323+
324+
public void testCloneDuplicateRetentionLease() {
325+
final AllocationId allocationId = AllocationId.newInitializing();
326+
final ReplicationTracker replicationTracker = new ReplicationTracker(
327+
new ShardId("test", "_na", 0),
328+
allocationId.getId(),
329+
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
330+
randomLongBetween(1, Long.MAX_VALUE),
331+
UNASSIGNED_SEQ_NO,
332+
value -> {},
333+
() -> 0L,
334+
(leases, listener) -> { });
335+
replicationTracker.updateFromMaster(
336+
randomNonNegativeLong(),
337+
Collections.singleton(allocationId.getId()),
338+
routingTable(Collections.emptySet(), allocationId));
339+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
340+
341+
replicationTracker.addRetentionLease("source", randomLongBetween(0L, Long.MAX_VALUE), "test-source", ActionListener.wrap(() -> {}));
342+
replicationTracker.addRetentionLease("exists", randomLongBetween(0L, Long.MAX_VALUE), "test-source", ActionListener.wrap(() -> {}));
343+
344+
assertThat(expectThrows(RetentionLeaseAlreadyExistsException.class,
345+
() -> replicationTracker.cloneRetentionLease("source", "exists", ActionListener.wrap(() -> {}))).getMessage(),
346+
equalTo("retention lease with ID [exists] already exists"));
347+
}
348+
250349
public void testRemoveNotFound() {
251350
final AllocationId allocationId = AllocationId.newInitializing();
252351
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);

0 commit comments

Comments
 (0)