24
24
import org .elasticsearch .Version ;
25
25
import org .elasticsearch .action .ActionListener ;
26
26
import org .elasticsearch .action .support .replication .ReplicationResponse ;
27
+ import org .elasticsearch .cluster .metadata .IndexMetaData ;
27
28
import org .elasticsearch .cluster .routing .AllocationId ;
28
29
import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
29
30
import org .elasticsearch .cluster .routing .ShardRouting ;
@@ -217,10 +218,22 @@ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boo
217
218
// the primary calculates the non-expired retention leases and syncs them to replicas
218
219
final long currentTimeMillis = currentTimeMillisSupplier .getAsLong ();
219
220
final long retentionLeaseMillis = indexSettings .getRetentionLeaseMillis ();
221
+ final Set <String > leaseIdsForCurrentPeers
222
+ = routingTable .assignedShards ().stream ().map (ReplicationTracker ::getPeerRecoveryRetentionLeaseId ).collect (Collectors .toSet ());
220
223
final Map <Boolean , List <RetentionLease >> partitionByExpiration = retentionLeases
221
224
.leases ()
222
225
.stream ()
223
- .collect (Collectors .groupingBy (lease -> currentTimeMillis - lease .timestamp () > retentionLeaseMillis ));
226
+ .collect (Collectors .groupingBy (lease -> {
227
+ if (lease .source ().equals (PEER_RECOVERY_RETENTION_LEASE_SOURCE )) {
228
+ if (leaseIdsForCurrentPeers .contains (lease .id ())) {
229
+ return false ;
230
+ }
231
+ if (routingTable .allShardsStarted ()) {
232
+ return true ;
233
+ }
234
+ }
235
+ return currentTimeMillis - lease .timestamp () > retentionLeaseMillis ;
236
+ }));
224
237
final Collection <RetentionLease > expiredLeases = partitionByExpiration .get (true );
225
238
if (expiredLeases == null ) {
226
239
// early out as no retention leases have expired
@@ -242,7 +255,7 @@ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boo
242
255
* @param source the source of the retention lease
243
256
* @param listener the callback when the retention lease is successfully added and synced to replicas
244
257
* @return the new retention lease
245
- * @throws IllegalArgumentException if the specified retention lease already exists
258
+ * @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists
246
259
*/
247
260
public RetentionLease addRetentionLease (
248
261
final String id ,
@@ -253,30 +266,46 @@ public RetentionLease addRetentionLease(
253
266
final RetentionLease retentionLease ;
254
267
final RetentionLeases currentRetentionLeases ;
255
268
synchronized (this ) {
256
- assert primaryMode ;
257
- if (retentionLeases .contains (id )) {
258
- throw new RetentionLeaseAlreadyExistsException (id );
259
- }
260
- retentionLease = new RetentionLease (id , retainingSequenceNumber , currentTimeMillisSupplier .getAsLong (), source );
261
- logger .debug ("adding new retention lease [{}] to current retention leases [{}]" , retentionLease , retentionLeases );
262
- retentionLeases = new RetentionLeases (
263
- operationPrimaryTerm ,
264
- retentionLeases .version () + 1 ,
265
- Stream .concat (retentionLeases .leases ().stream (), Stream .of (retentionLease )).collect (Collectors .toList ()));
269
+ retentionLease = innerAddRetentionLease (id , retainingSequenceNumber , source );
266
270
currentRetentionLeases = retentionLeases ;
267
271
}
268
272
onSyncRetentionLeases .accept (currentRetentionLeases , listener );
269
273
return retentionLease ;
270
274
}
271
275
276
+ /**
277
+ * Adds a new retention lease, but does not synchronise it with the rest of the replication group.
278
+ *
279
+ * @param id the identifier of the retention lease
280
+ * @param retainingSequenceNumber the retaining sequence number
281
+ * @param source the source of the retention lease
282
+ * @return the new retention lease
283
+ * @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists
284
+ */
285
+ private RetentionLease innerAddRetentionLease (String id , long retainingSequenceNumber , String source ) {
286
+ assert Thread .holdsLock (this );
287
+ assert primaryMode : id + "/" + retainingSequenceNumber + "/" + source ;
288
+ if (retentionLeases .contains (id )) {
289
+ throw new RetentionLeaseAlreadyExistsException (id );
290
+ }
291
+ final RetentionLease retentionLease
292
+ = new RetentionLease (id , retainingSequenceNumber , currentTimeMillisSupplier .getAsLong (), source );
293
+ logger .debug ("adding new retention lease [{}] to current retention leases [{}]" , retentionLease , retentionLeases );
294
+ retentionLeases = new RetentionLeases (
295
+ operationPrimaryTerm ,
296
+ retentionLeases .version () + 1 ,
297
+ Stream .concat (retentionLeases .leases ().stream (), Stream .of (retentionLease )).collect (Collectors .toList ()));
298
+ return retentionLease ;
299
+ }
300
+
272
301
/**
273
302
* Renews an existing retention lease.
274
303
*
275
304
* @param id the identifier of the retention lease
276
305
* @param retainingSequenceNumber the retaining sequence number
277
306
* @param source the source of the retention lease
278
307
* @return the renewed retention lease
279
- * @throws IllegalArgumentException if the specified retention lease does not exist
308
+ * @throws RetentionLeaseNotFoundException if the specified retention lease does not exist
280
309
*/
281
310
public synchronized RetentionLease renewRetentionLease (final String id , final long retainingSequenceNumber , final String source ) {
282
311
assert primaryMode ;
@@ -390,6 +419,45 @@ public boolean assertRetentionLeasesPersisted(final Path path) throws IOExceptio
390
419
return true ;
391
420
}
392
421
422
+
423
+ /**
424
+ * Retention leases for peer recovery have source {@link ReplicationTracker#PEER_RECOVERY_RETENTION_LEASE_SOURCE}, a lease ID
425
+ * containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations
426
+ * with sequence numbers strictly greater than the given global checkpoint.
427
+ */
428
+ public void addPeerRecoveryRetentionLease (String nodeId , long globalCheckpoint , ActionListener <ReplicationResponse > listener ) {
429
+ addRetentionLease (getPeerRecoveryRetentionLeaseId (nodeId ), globalCheckpoint + 1 , PEER_RECOVERY_RETENTION_LEASE_SOURCE , listener );
430
+ }
431
+
432
+ /**
433
+ * Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
434
+ */
435
+ public static final String PEER_RECOVERY_RETENTION_LEASE_SOURCE = "peer recovery" ;
436
+
437
+ /**
438
+ * Id for a peer recovery retention lease for the given node. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
439
+ */
440
+ static String getPeerRecoveryRetentionLeaseId (String nodeId ) {
441
+ return "peer_recovery/" + nodeId ;
442
+ }
443
+
444
+ /**
445
+ * Id for a peer recovery retention lease for the given {@link ShardRouting}.
446
+ * See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
447
+ */
448
+ public static String getPeerRecoveryRetentionLeaseId (ShardRouting shardRouting ) {
449
+ return getPeerRecoveryRetentionLeaseId (shardRouting .currentNodeId ());
450
+ }
451
+
452
+ /**
453
+ * Renew the peer-recovery retention lease for the given shard, advancing the retained sequence number to discard operations up to the
454
+ * given global checkpoint. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
455
+ */
456
+ public void renewPeerRecoveryRetentionLease (ShardRouting shardRouting , long globalCheckpoint ) {
457
+ assert primaryMode ;
458
+ renewRetentionLease (getPeerRecoveryRetentionLeaseId (shardRouting ), globalCheckpoint + 1 , PEER_RECOVERY_RETENTION_LEASE_SOURCE );
459
+ }
460
+
393
461
public static class CheckpointState implements Writeable {
394
462
395
463
/**
@@ -616,6 +684,22 @@ private boolean invariant() {
616
684
assert checkpoints .get (aId ) != null : "aId [" + aId + "] is pending in sync but isn't tracked" ;
617
685
}
618
686
687
+ if (primaryMode
688
+ && indexSettings .isSoftDeleteEnabled ()
689
+ && indexSettings .getIndexMetaData ().getState () == IndexMetaData .State .OPEN
690
+ && indexSettings .getIndexVersionCreated ().onOrAfter (Version .V_8_0_0 )) {
691
+ // all tracked shard copies have a corresponding peer-recovery retention lease
692
+ for (final ShardRouting shardRouting : routingTable .assignedShards ()) {
693
+ assert checkpoints .get (shardRouting .allocationId ().getId ()).tracked == false
694
+ || retentionLeases .contains (getPeerRecoveryRetentionLeaseId (shardRouting )) :
695
+ "no retention lease for tracked shard " + shardRouting + " in " + retentionLeases ;
696
+ assert shardRouting .relocating () == false
697
+ || checkpoints .get (shardRouting .allocationId ().getRelocationId ()).tracked == false
698
+ || retentionLeases .contains (getPeerRecoveryRetentionLeaseId (shardRouting .getTargetRelocatingShard ()))
699
+ : "no retention lease for relocation target " + shardRouting + " in " + retentionLeases ;
700
+ }
701
+ }
702
+
619
703
return true ;
620
704
}
621
705
@@ -669,6 +753,7 @@ public ReplicationTracker(
669
753
this .pendingInSync = new HashSet <>();
670
754
this .routingTable = null ;
671
755
this .replicationGroup = null ;
756
+ assert Version .V_EMPTY .equals (indexSettings .getIndexVersionCreated ()) == false ;
672
757
assert invariant ();
673
758
}
674
759
@@ -772,6 +857,31 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) {
772
857
primaryMode = true ;
773
858
updateLocalCheckpoint (shardAllocationId , checkpoints .get (shardAllocationId ), localCheckpoint );
774
859
updateGlobalCheckpointOnPrimary ();
860
+
861
+ if (indexSettings .isSoftDeleteEnabled ()) {
862
+ final ShardRouting primaryShard = routingTable .primaryShard ();
863
+ final String leaseId = getPeerRecoveryRetentionLeaseId (primaryShard );
864
+ if (retentionLeases .get (leaseId ) == null ) {
865
+ /*
866
+ * We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
867
+ * leases for every shard copy, but in this case we do not expect any leases to exist.
868
+ */
869
+ if (indexSettings .getIndexVersionCreated ().onOrAfter (Version .V_8_0_0 )) {
870
+ // We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then
871
+ // this copy must already be in-sync and active and therefore holds a retention lease for itself.
872
+ assert routingTable .activeShards ().equals (Collections .singletonList (primaryShard )) : routingTable .activeShards ();
873
+ assert primaryShard .allocationId ().getId ().equals (shardAllocationId )
874
+ : routingTable .activeShards () + " vs " + shardAllocationId ;
875
+ assert replicationGroup .getReplicationTargets ().equals (Collections .singletonList (primaryShard ));
876
+
877
+ // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
878
+ // group.
879
+ innerAddRetentionLease (leaseId , Math .max (0L , checkpoints .get (shardAllocationId ).globalCheckpoint + 1 ),
880
+ PEER_RECOVERY_RETENTION_LEASE_SOURCE );
881
+ }
882
+ }
883
+ }
884
+
775
885
assert invariant ();
776
886
}
777
887
0 commit comments