@@ -290,6 +290,36 @@ public RetentionLease addRetentionLease(
290
290
return retentionLease ;
291
291
}
292
292
293
+ /**
294
+ * Atomically clones an existing retention lease to a new ID.
295
+ *
296
+ * @param sourceLeaseId the identifier of the source retention lease
297
+ * @param targetLeaseId the identifier of the retention lease to create
298
+ * @param listener the callback when the retention lease is successfully added and synced to replicas
299
+ * @return the new retention lease
300
+ * @throws RetentionLeaseNotFoundException if the specified source retention lease does not exist
301
+ * @throws RetentionLeaseAlreadyExistsException if the specified target retention lease already exists
302
+ */
303
+ RetentionLease cloneRetentionLease (String sourceLeaseId , String targetLeaseId , ActionListener <ReplicationResponse > listener ) {
304
+ Objects .requireNonNull (listener );
305
+ final RetentionLease retentionLease ;
306
+ final RetentionLeases currentRetentionLeases ;
307
+ synchronized (this ) {
308
+ assert primaryMode ;
309
+ if (getRetentionLeases ().contains (sourceLeaseId ) == false ) {
310
+ throw new RetentionLeaseNotFoundException (sourceLeaseId );
311
+ }
312
+ final RetentionLease sourceLease = getRetentionLeases ().get (sourceLeaseId );
313
+ retentionLease = innerAddRetentionLease (targetLeaseId , sourceLease .retainingSequenceNumber (), sourceLease .source ());
314
+ currentRetentionLeases = retentionLeases ;
315
+ }
316
+
317
+ // Syncing here may not be strictly necessary, because this new lease isn't retaining any extra history that wasn't previously
318
+ // retained by the source lease; however we prefer to sync anyway since we expect to do so whenever creating a new lease.
319
+ onSyncRetentionLeases .accept (currentRetentionLeases , listener );
320
+ return retentionLease ;
321
+ }
322
+
293
323
/**
294
324
* Adds a new retention lease, but does not synchronise it with the rest of the replication group.
295
325
*
@@ -442,8 +472,16 @@ public boolean assertRetentionLeasesPersisted(final Path path) throws IOExceptio
442
472
* containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations
443
473
* with sequence numbers strictly greater than the given global checkpoint.
444
474
*/
445
- public void addPeerRecoveryRetentionLease (String nodeId , long globalCheckpoint , ActionListener <ReplicationResponse > listener ) {
446
- addRetentionLease (getPeerRecoveryRetentionLeaseId (nodeId ), globalCheckpoint + 1 , PEER_RECOVERY_RETENTION_LEASE_SOURCE , listener );
475
+ public RetentionLease addPeerRecoveryRetentionLease (String nodeId , long globalCheckpoint ,
476
+ ActionListener <ReplicationResponse > listener ) {
477
+ return addRetentionLease (getPeerRecoveryRetentionLeaseId (nodeId ), globalCheckpoint + 1 ,
478
+ PEER_RECOVERY_RETENTION_LEASE_SOURCE , listener );
479
+ }
480
+
481
+ public RetentionLease cloneLocalPeerRecoveryRetentionLease (String nodeId , ActionListener <ReplicationResponse > listener ) {
482
+ return cloneRetentionLease (
483
+ getPeerRecoveryRetentionLeaseId (routingTable .primaryShard ()),
484
+ getPeerRecoveryRetentionLeaseId (nodeId ), listener );
447
485
}
448
486
449
487
public void removePeerRecoveryRetentionLease (String nodeId , ActionListener <ReplicationResponse > listener ) {
0 commit comments