24
24
import org .elasticsearch .Version ;
25
25
import org .elasticsearch .action .ActionListener ;
26
26
import org .elasticsearch .action .support .replication .ReplicationResponse ;
27
+ import org .elasticsearch .cluster .metadata .IndexMetaData ;
27
28
import org .elasticsearch .cluster .routing .AllocationId ;
28
29
import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
29
30
import org .elasticsearch .cluster .routing .ShardRouting ;
@@ -217,10 +218,22 @@ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boo
217
218
// the primary calculates the non-expired retention leases and syncs them to replicas
218
219
final long currentTimeMillis = currentTimeMillisSupplier .getAsLong ();
219
220
final long retentionLeaseMillis = indexSettings .getRetentionLeaseMillis ();
221
+ final Set <String > leaseIdsForCurrentPeers
222
+ = routingTable .assignedShards ().stream ().map (ReplicationTracker ::getPeerRecoveryRetentionLeaseId ).collect (Collectors .toSet ());
220
223
final Map <Boolean , List <RetentionLease >> partitionByExpiration = retentionLeases
221
224
.leases ()
222
225
.stream ()
223
- .collect (Collectors .groupingBy (lease -> currentTimeMillis - lease .timestamp () > retentionLeaseMillis ));
226
+ .collect (Collectors .groupingBy (lease -> {
227
+ if (lease .source ().equals (PEER_RECOVERY_RETENTION_LEASE_SOURCE )) {
228
+ if (leaseIdsForCurrentPeers .contains (lease .id ())) {
229
+ return false ;
230
+ }
231
+ if (routingTable .allShardsStarted ()) {
232
+ return true ;
233
+ }
234
+ }
235
+ return currentTimeMillis - lease .timestamp () > retentionLeaseMillis ;
236
+ }));
224
237
final Collection <RetentionLease > expiredLeases = partitionByExpiration .get (true );
225
238
if (expiredLeases == null ) {
226
239
// early out as no retention leases have expired
@@ -242,7 +255,7 @@ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boo
242
255
* @param source the source of the retention lease
243
256
* @param listener the callback when the retention lease is successfully added and synced to replicas
244
257
* @return the new retention lease
245
- * @throws IllegalArgumentException if the specified retention lease already exists
258
+ * @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists
246
259
*/
247
260
public RetentionLease addRetentionLease (
248
261
final String id ,
@@ -253,30 +266,46 @@ public RetentionLease addRetentionLease(
253
266
final RetentionLease retentionLease ;
254
267
final RetentionLeases currentRetentionLeases ;
255
268
synchronized (this ) {
256
- assert primaryMode ;
257
- if (retentionLeases .contains (id )) {
258
- throw new RetentionLeaseAlreadyExistsException (id );
259
- }
260
- retentionLease = new RetentionLease (id , retainingSequenceNumber , currentTimeMillisSupplier .getAsLong (), source );
261
- logger .debug ("adding new retention lease [{}] to current retention leases [{}]" , retentionLease , retentionLeases );
262
- retentionLeases = new RetentionLeases (
263
- operationPrimaryTerm ,
264
- retentionLeases .version () + 1 ,
265
- Stream .concat (retentionLeases .leases ().stream (), Stream .of (retentionLease )).collect (Collectors .toList ()));
269
+ retentionLease = innerAddRetentionLease (id , retainingSequenceNumber , source );
266
270
currentRetentionLeases = retentionLeases ;
267
271
}
268
272
onSyncRetentionLeases .accept (currentRetentionLeases , listener );
269
273
return retentionLease ;
270
274
}
271
275
276
+ /**
277
+ * Adds a new retention lease, but does not synchronise it with the rest of the replication group.
278
+ *
279
+ * @param id the identifier of the retention lease
280
+ * @param retainingSequenceNumber the retaining sequence number
281
+ * @param source the source of the retention lease
282
+ * @return the new retention lease
283
+ * @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists
284
+ */
285
+ private RetentionLease innerAddRetentionLease (String id , long retainingSequenceNumber , String source ) {
286
+ assert Thread .holdsLock (this );
287
+ assert primaryMode : id + "/" + retainingSequenceNumber + "/" + source ;
288
+ if (retentionLeases .contains (id )) {
289
+ throw new RetentionLeaseAlreadyExistsException (id );
290
+ }
291
+ final RetentionLease retentionLease
292
+ = new RetentionLease (id , retainingSequenceNumber , currentTimeMillisSupplier .getAsLong (), source );
293
+ logger .debug ("adding new retention lease [{}] to current retention leases [{}]" , retentionLease , retentionLeases );
294
+ retentionLeases = new RetentionLeases (
295
+ operationPrimaryTerm ,
296
+ retentionLeases .version () + 1 ,
297
+ Stream .concat (retentionLeases .leases ().stream (), Stream .of (retentionLease )).collect (Collectors .toList ()));
298
+ return retentionLease ;
299
+ }
300
+
272
301
/**
273
302
* Renews an existing retention lease.
274
303
*
275
304
* @param id the identifier of the retention lease
276
305
* @param retainingSequenceNumber the retaining sequence number
277
306
* @param source the source of the retention lease
278
307
* @return the renewed retention lease
279
- * @throws IllegalArgumentException if the specified retention lease does not exist
308
+ * @throws RetentionLeaseNotFoundException if the specified retention lease does not exist
280
309
*/
281
310
public synchronized RetentionLease renewRetentionLease (final String id , final long retainingSequenceNumber , final String source ) {
282
311
assert primaryMode ;
@@ -390,6 +419,51 @@ public boolean assertRetentionLeasesPersisted(final Path path) throws IOExceptio
390
419
return true ;
391
420
}
392
421
422
+
423
+ /**
424
+ * Retention leases for peer recovery have source {@link ReplicationTracker#PEER_RECOVERY_RETENTION_LEASE_SOURCE}, a lease ID
425
+ * containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations
426
+ * with sequence numbers strictly greater than the given global checkpoint.
427
+ */
428
+ public void addPeerRecoveryRetentionLease (String nodeId , long globalCheckpoint , ActionListener <ReplicationResponse > listener ) {
429
+ addRetentionLease (getPeerRecoveryRetentionLeaseId (nodeId ), globalCheckpoint + 1 , PEER_RECOVERY_RETENTION_LEASE_SOURCE , listener );
430
+ }
431
+
432
+ /**
433
+ * Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
434
+ */
435
+ public static final String PEER_RECOVERY_RETENTION_LEASE_SOURCE = "peer recovery" ;
436
+
437
+ /**
438
+ * Id for a peer recovery retention lease for the given node. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
439
+ */
440
+ static String getPeerRecoveryRetentionLeaseId (String nodeId ) {
441
+ return "peer_recovery/" + nodeId ;
442
+ }
443
+
444
+ /**
445
+ * Id for a peer recovery retention lease for the given {@link ShardRouting}.
446
+ * See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
447
+ */
448
+ public static String getPeerRecoveryRetentionLeaseId (ShardRouting shardRouting ) {
449
+ return getPeerRecoveryRetentionLeaseId (shardRouting .currentNodeId ());
450
+ }
451
+
452
+ /**
453
+ * Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done
454
+ * properly. TODO remove this.
455
+ */
456
+ public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints () {
457
+ assert primaryMode ;
458
+ for (ShardRouting shardRouting : routingTable ) {
459
+ if (shardRouting .assignedToNode ()) {
460
+ final CheckpointState checkpointState = checkpoints .get (shardRouting .allocationId ().getId ());
461
+ renewRetentionLease (getPeerRecoveryRetentionLeaseId (shardRouting ), checkpointState .globalCheckpoint + 1 ,
462
+ PEER_RECOVERY_RETENTION_LEASE_SOURCE );
463
+ }
464
+ }
465
+ }
466
+
393
467
public static class CheckpointState implements Writeable {
394
468
395
469
/**
@@ -616,6 +690,23 @@ private boolean invariant() {
616
690
assert checkpoints .get (aId ) != null : "aId [" + aId + "] is pending in sync but isn't tracked" ;
617
691
}
618
692
693
+ if (primaryMode
694
+ && indexSettings .isSoftDeleteEnabled ()
695
+ && indexSettings .getIndexMetaData ().getState () == IndexMetaData .State .OPEN
696
+ && indexSettings .getIndexVersionCreated ().onOrAfter (Version .V_8_0_0 )) {
697
+ // all tracked shard copies have a corresponding peer-recovery retention lease
698
+ for (final ShardRouting shardRouting : routingTable .assignedShards ()) {
699
+ if (checkpoints .get (shardRouting .allocationId ().getId ()).tracked ) {
700
+ assert retentionLeases .contains (getPeerRecoveryRetentionLeaseId (shardRouting ))
701
+ : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases ;
702
+ assert PEER_RECOVERY_RETENTION_LEASE_SOURCE .equals (
703
+ retentionLeases .get (getPeerRecoveryRetentionLeaseId (shardRouting )).source ())
704
+ : "incorrect source [" + retentionLeases .get (getPeerRecoveryRetentionLeaseId (shardRouting )).source ()
705
+ + "] for [" + shardRouting + "] in " + retentionLeases ;
706
+ }
707
+ }
708
+ }
709
+
619
710
return true ;
620
711
}
621
712
@@ -669,6 +760,7 @@ public ReplicationTracker(
669
760
this .pendingInSync = new HashSet <>();
670
761
this .routingTable = null ;
671
762
this .replicationGroup = null ;
763
+ assert Version .V_EMPTY .equals (indexSettings .getIndexVersionCreated ()) == false ;
672
764
assert invariant ();
673
765
}
674
766
@@ -772,6 +864,31 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) {
772
864
primaryMode = true ;
773
865
updateLocalCheckpoint (shardAllocationId , checkpoints .get (shardAllocationId ), localCheckpoint );
774
866
updateGlobalCheckpointOnPrimary ();
867
+
868
+ if (indexSettings .isSoftDeleteEnabled ()) {
869
+ final ShardRouting primaryShard = routingTable .primaryShard ();
870
+ final String leaseId = getPeerRecoveryRetentionLeaseId (primaryShard );
871
+ if (retentionLeases .get (leaseId ) == null ) {
872
+ /*
873
+ * We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
874
+ * leases for every shard copy, but in this case we do not expect any leases to exist.
875
+ */
876
+ if (indexSettings .getIndexVersionCreated ().onOrAfter (Version .V_8_0_0 )) {
877
+ // We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then
878
+ // this copy must already be in-sync and active and therefore holds a retention lease for itself.
879
+ assert routingTable .activeShards ().equals (Collections .singletonList (primaryShard )) : routingTable .activeShards ();
880
+ assert primaryShard .allocationId ().getId ().equals (shardAllocationId )
881
+ : routingTable .activeShards () + " vs " + shardAllocationId ;
882
+ assert replicationGroup .getReplicationTargets ().equals (Collections .singletonList (primaryShard ));
883
+
884
+ // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
885
+ // group.
886
+ innerAddRetentionLease (leaseId , Math .max (0L , checkpoints .get (shardAllocationId ).globalCheckpoint + 1 ),
887
+ PEER_RECOVERY_RETENTION_LEASE_SOURCE );
888
+ }
889
+ }
890
+ }
891
+
775
892
assert invariant ();
776
893
}
777
894
0 commit comments