Skip to content

Commit c4f042b

Browse files
committed
Advance PRRLs to match GCP of tracked shards (#43751)
This commit adjusts the behaviour of the retention lease sync to first renew any peer-recovery retention leases where either: - the corresponding shard's global checkpoint has advanced, or - the lease is older than half of its expiry time Relates #41536
1 parent a455b47 commit c4f042b

File tree

9 files changed

+252
-57
lines changed

9 files changed

+252
-57
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+46-8
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.stream.Collectors;
6161
import java.util.stream.LongStream;
6262
import java.util.stream.Stream;
63+
import java.util.stream.StreamSupport;
6364

6465
/**
6566
* This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
@@ -457,18 +458,55 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting)
457458
}
458459

459460
/**
460-
* Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done
461-
* properly. TODO remove this.
461+
* Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global
462+
* checkpoint, and renew any leases that are approaching expiry.
462463
*/
463-
public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
464+
public synchronized void renewPeerRecoveryRetentionLeases() {
464465
assert primaryMode;
465-
for (ShardRouting shardRouting : routingTable) {
466-
if (shardRouting.assignedToNode()) {
467-
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
468-
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), Math.max(0L, checkpointState.globalCheckpoint + 1L),
469-
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
466+
assert invariant();
467+
468+
/*
469+
* Peer-recovery retention leases never expire while the associated shard is assigned, but we must still renew them occasionally in
470+
* case the associated shard is temporarily unassigned. However we must not renew them too often, since each renewal must be
471+
* persisted and the resulting IO can be expensive on nodes with large numbers of shards (see #42299). We choose to renew them after
472+
* half the expiry time, so that by default the cluster has at least 6 hours to recover before these leases start to expire.
473+
*/
474+
final long renewalTimeMillis = currentTimeMillisSupplier.getAsLong() - indexSettings.getRetentionLeaseMillis() / 2;
475+
476+
/*
477+
* If any of the peer-recovery retention leases need renewal, it's a good opportunity to renew them all.
478+
*/
479+
final boolean renewalNeeded = StreamSupport.stream(routingTable.spliterator(), false).filter(ShardRouting::assignedToNode)
480+
.anyMatch(shardRouting -> {
481+
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
482+
if (retentionLease == null) {
483+
/*
484+
* If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't
485+
* create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation.
486+
*/
487+
assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false
488+
|| indexSettings.getIndexVersionCreated().before(Version.V_7_3_0);
489+
return false;
490+
}
491+
return retentionLease.timestamp() <= renewalTimeMillis
492+
|| retentionLease.retainingSequenceNumber() <= checkpoints.get(shardRouting.allocationId().getId()).globalCheckpoint;
493+
});
494+
495+
if (renewalNeeded) {
496+
for (ShardRouting shardRouting : routingTable) {
497+
if (shardRouting.assignedToNode()) {
498+
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
499+
if (retentionLease != null) {
500+
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
501+
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting),
502+
Math.max(0L, checkpointState.globalCheckpoint + 1L),
503+
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
504+
}
505+
}
470506
}
471507
}
508+
509+
assert invariant();
472510
}
473511

474512
public static class CheckpointState implements Writeable {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+1-10
Original file line numberDiff line numberDiff line change
@@ -2109,6 +2109,7 @@ public void syncRetentionLeases() {
21092109
assert assertPrimaryMode();
21102110
verifyNotClosed();
21112111
ensureSoftDeletesEnabled("retention leases");
2112+
replicationTracker.renewPeerRecoveryRetentionLeases();
21122113
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
21132114
if (retentionLeases.v1()) {
21142115
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
@@ -2500,16 +2501,6 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
25002501
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
25012502
}
25022503

2503-
/**
2504-
* Test-only method to advance the all shards' peer-recovery retention leases to their tracked global checkpoints so that operations
2505-
* can be discarded. TODO Remove this when retention leases are advanced by other mechanisms.
2506-
*/
2507-
public void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
2508-
assert assertPrimaryMode();
2509-
replicationTracker.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
2510-
syncRetentionLeases();
2511-
}
2512-
25132504
class ShardEventListener implements Engine.EventListener {
25142505
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();
25152506

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java

+159
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.test.IndexSettingsModule;
3838

3939
import java.io.IOException;
40+
import java.util.ArrayList;
4041
import java.util.Arrays;
4142
import java.util.Collection;
4243
import java.util.Collections;
@@ -50,6 +51,7 @@
5051
import java.util.concurrent.atomic.AtomicBoolean;
5152
import java.util.concurrent.atomic.AtomicLong;
5253
import java.util.function.BiConsumer;
54+
import java.util.function.Consumer;
5355
import java.util.function.Function;
5456
import java.util.function.LongConsumer;
5557
import java.util.stream.Collectors;
@@ -61,6 +63,9 @@
6163
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
6264
import static org.hamcrest.Matchers.equalTo;
6365
import static org.hamcrest.Matchers.greaterThan;
66+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
67+
import static org.hamcrest.Matchers.hasItem;
68+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6469
import static org.hamcrest.Matchers.not;
6570

6671
public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
@@ -975,4 +980,158 @@ private static void addPeerRecoveryRetentionLease(final ReplicationTracker track
975980
addPeerRecoveryRetentionLease(tracker, AllocationId.newInitializing(allocationId));
976981
}
977982

983+
public void testPeerRecoveryRetentionLeaseCreationAndRenewal() {
984+
985+
final int numberOfActiveAllocationsIds = randomIntBetween(1, 8);
986+
final int numberOfInitializingIds = randomIntBetween(0, 8);
987+
final Tuple<Set<AllocationId>, Set<AllocationId>> activeAndInitializingAllocationIds =
988+
randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
989+
final Set<AllocationId> activeAllocationIds = activeAndInitializingAllocationIds.v1();
990+
final Set<AllocationId> initializingAllocationIds = activeAndInitializingAllocationIds.v2();
991+
992+
final AllocationId primaryId = activeAllocationIds.iterator().next();
993+
994+
final long initialClusterStateVersion = randomNonNegativeLong();
995+
996+
final AtomicLong currentTimeMillis = new AtomicLong(0L);
997+
final ReplicationTracker tracker = newTracker(primaryId, updatedGlobalCheckpoint::set, currentTimeMillis::get);
998+
999+
final long retentionLeaseExpiryTimeMillis = tracker.indexSettings().getRetentionLeaseMillis();
1000+
final long peerRecoveryRetentionLeaseRenewalTimeMillis = retentionLeaseExpiryTimeMillis / 2;
1001+
1002+
final long maximumTestTimeMillis = 13 * retentionLeaseExpiryTimeMillis;
1003+
final long testStartTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - maximumTestTimeMillis);
1004+
currentTimeMillis.set(testStartTimeMillis);
1005+
1006+
final Function<AllocationId, RetentionLease> retentionLeaseFromAllocationId = allocationId
1007+
-> new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeIdFromAllocationId(allocationId)),
1008+
0L, currentTimeMillis.get(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE);
1009+
1010+
final List<RetentionLease> initialLeases = new ArrayList<>();
1011+
if (randomBoolean()) {
1012+
initialLeases.add(retentionLeaseFromAllocationId.apply(primaryId));
1013+
}
1014+
for (final AllocationId replicaId : initializingAllocationIds) {
1015+
if (randomBoolean()) {
1016+
initialLeases.add(retentionLeaseFromAllocationId.apply(replicaId));
1017+
}
1018+
}
1019+
for (int i = randomIntBetween(0, 5); i > 0; i--) {
1020+
initialLeases.add(retentionLeaseFromAllocationId.apply(AllocationId.newInitializing()));
1021+
}
1022+
tracker.updateRetentionLeasesOnReplica(new RetentionLeases(randomNonNegativeLong(), randomNonNegativeLong(), initialLeases));
1023+
1024+
IndexShardRoutingTable routingTable = routingTable(initializingAllocationIds, primaryId);
1025+
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable);
1026+
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
1027+
assertTrue("primary's retention lease should exist",
1028+
tracker.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(routingTable.primaryShard())));
1029+
1030+
final Consumer<Runnable> assertAsTimePasses = assertion -> {
1031+
final long startTime = currentTimeMillis.get();
1032+
while (currentTimeMillis.get() < startTime + retentionLeaseExpiryTimeMillis * 2) {
1033+
currentTimeMillis.addAndGet(randomLongBetween(0L, retentionLeaseExpiryTimeMillis * 2));
1034+
tracker.renewPeerRecoveryRetentionLeases();
1035+
tracker.getRetentionLeases(true);
1036+
assertion.run();
1037+
}
1038+
};
1039+
1040+
assertAsTimePasses.accept(() -> {
1041+
// Leases for assigned replicas do not expire
1042+
final RetentionLeases retentionLeases = tracker.getRetentionLeases();
1043+
for (final AllocationId replicaId : initializingAllocationIds) {
1044+
final String leaseId = retentionLeaseFromAllocationId.apply(replicaId).id();
1045+
assertTrue("should not have removed lease for " + replicaId + " in " + retentionLeases,
1046+
initialLeases.stream().noneMatch(l -> l.id().equals(leaseId)) || retentionLeases.contains(leaseId));
1047+
}
1048+
});
1049+
1050+
// Leases that don't correspond to assigned replicas, however, are expired by this time.
1051+
final Set<String> expectedLeaseIds = Stream.concat(Stream.of(primaryId), initializingAllocationIds.stream())
1052+
.map(allocationId -> retentionLeaseFromAllocationId.apply(allocationId).id()).collect(Collectors.toSet());
1053+
for (final RetentionLease retentionLease : tracker.getRetentionLeases().leases()) {
1054+
assertThat(expectedLeaseIds, hasItem(retentionLease.id()));
1055+
}
1056+
1057+
for (AllocationId replicaId : initializingAllocationIds) {
1058+
markAsTrackingAndInSyncQuietly(tracker, replicaId.getId(), NO_OPS_PERFORMED);
1059+
}
1060+
1061+
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
1062+
equalTo(expectedLeaseIds));
1063+
1064+
assertAsTimePasses.accept(() -> {
1065+
// Leases still don't expire
1066+
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
1067+
equalTo(expectedLeaseIds));
1068+
1069+
// Also leases are renewed before reaching half the expiry time
1070+
//noinspection OptionalGetWithoutIsPresent
1071+
assertThat(tracker.getRetentionLeases() + " renewed before too long",
1072+
tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong(),
1073+
greaterThanOrEqualTo(currentTimeMillis.get() - peerRecoveryRetentionLeaseRenewalTimeMillis));
1074+
});
1075+
1076+
IndexShardRoutingTable.Builder routingTableBuilder = new IndexShardRoutingTable.Builder(routingTable);
1077+
for (ShardRouting replicaShard : routingTable.replicaShards()) {
1078+
routingTableBuilder.removeShard(replicaShard);
1079+
routingTableBuilder.addShard(replicaShard.moveToStarted());
1080+
}
1081+
routingTable = routingTableBuilder.build();
1082+
activeAllocationIds.addAll(initializingAllocationIds);
1083+
1084+
tracker.updateFromMaster(initialClusterStateVersion + randomLongBetween(1, 10), ids(activeAllocationIds), routingTable);
1085+
1086+
assertAsTimePasses.accept(() -> {
1087+
// Leases still don't expire
1088+
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
1089+
equalTo(expectedLeaseIds));
1090+
// ... and any extra peer recovery retention leases are expired immediately since the shard is fully active
1091+
tracker.addPeerRecoveryRetentionLease(randomAlphaOfLength(10), randomNonNegativeLong(), ActionListener.wrap(() -> {}));
1092+
});
1093+
1094+
tracker.renewPeerRecoveryRetentionLeases();
1095+
assertTrue("expired extra lease", tracker.getRetentionLeases(true).v1());
1096+
1097+
final AllocationId advancingAllocationId
1098+
= initializingAllocationIds.isEmpty() || rarely() ? primaryId : randomFrom(initializingAllocationIds);
1099+
final String advancingLeaseId = retentionLeaseFromAllocationId.apply(advancingAllocationId).id();
1100+
1101+
final long initialGlobalCheckpoint
1102+
= Math.max(NO_OPS_PERFORMED, tracker.getTrackedLocalCheckpointForShard(advancingAllocationId.getId()).globalCheckpoint);
1103+
assertThat(tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(initialGlobalCheckpoint + 1));
1104+
final long newGlobalCheckpoint = initialGlobalCheckpoint + randomLongBetween(1, 1000);
1105+
tracker.updateGlobalCheckpointForShard(advancingAllocationId.getId(), newGlobalCheckpoint);
1106+
tracker.renewPeerRecoveryRetentionLeases();
1107+
assertThat("lease was renewed because the shard advanced its global checkpoint",
1108+
tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(newGlobalCheckpoint + 1));
1109+
1110+
final long initialVersion = tracker.getRetentionLeases().version();
1111+
tracker.renewPeerRecoveryRetentionLeases();
1112+
assertThat("immediate renewal is a no-op", tracker.getRetentionLeases().version(), equalTo(initialVersion));
1113+
1114+
//noinspection OptionalGetWithoutIsPresent
1115+
final long millisUntilFirstRenewal
1116+
= tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong()
1117+
+ peerRecoveryRetentionLeaseRenewalTimeMillis
1118+
- currentTimeMillis.get();
1119+
1120+
if (millisUntilFirstRenewal != 0) {
1121+
final long shorterThanRenewalTime = randomLongBetween(0L, millisUntilFirstRenewal - 1);
1122+
currentTimeMillis.addAndGet(shorterThanRenewalTime);
1123+
tracker.renewPeerRecoveryRetentionLeases();
1124+
assertThat("renewal is a no-op after a short time", tracker.getRetentionLeases().version(), equalTo(initialVersion));
1125+
currentTimeMillis.addAndGet(millisUntilFirstRenewal - shorterThanRenewalTime);
1126+
}
1127+
1128+
tracker.renewPeerRecoveryRetentionLeases();
1129+
assertThat("renewal happens after a sufficiently long time", tracker.getRetentionLeases().version(), greaterThan(initialVersion));
1130+
assertTrue("all leases were renewed",
1131+
tracker.getRetentionLeases().leases().stream().allMatch(l -> l.timestamp() == currentTimeMillis.get()));
1132+
1133+
assertThat("test ran for too long, potentially leading to overflow",
1134+
currentTimeMillis.get(), lessThanOrEqualTo(testStartTimeMillis + maximumTestTimeMillis));
1135+
}
1136+
9781137
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -2932,7 +2932,7 @@ public void testDocStats() throws Exception {
29322932
indexShard.getLocalCheckpoint());
29332933
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
29342934
indexShard.getLocalCheckpoint());
2935-
indexShard.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
2935+
indexShard.syncRetentionLeases();
29362936
} else {
29372937
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");
29382938

@@ -3531,7 +3531,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception {
35313531
primary.updateGlobalCheckpointForShard(
35323532
primary.routingEntry().allocationId().getId(),
35333533
primary.getLastSyncedGlobalCheckpoint());
3534-
primary.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
3534+
primary.syncRetentionLeases();
35353535
primary.sync();
35363536
flushShard(primary);
35373537
}

server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
import java.util.concurrent.atomic.AtomicBoolean;
8282
import java.util.concurrent.atomic.AtomicInteger;
8383
import java.util.concurrent.atomic.AtomicReference;
84-
import java.util.stream.StreamSupport;
8584

8685
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
8786
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -1009,7 +1008,10 @@ private void assertCumulativeQueryCacheStats(IndicesStatsResponse response) {
10091008
}
10101009

10111010
public void testFilterCacheStats() throws Exception {
1012-
Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build();
1011+
Settings settings = Settings.builder().put(indexSettings())
1012+
.put("number_of_replicas", 0)
1013+
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms")
1014+
.build();
10131015
assertAcked(prepareCreate("index").setSettings(settings).get());
10141016
indexRandom(false, true,
10151017
client().prepareIndex("index", "type", "1").setSource("foo", "bar"),
@@ -1053,10 +1055,13 @@ public void testFilterCacheStats() throws Exception {
10531055
// we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything.
10541056
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
10551057
persistGlobalCheckpoint("index");
1056-
internalCluster().nodesInclude("index").stream()
1057-
.flatMap(n -> StreamSupport.stream(internalCluster().getInstance(IndicesService.class, n).spliterator(), false))
1058-
.flatMap(n -> StreamSupport.stream(n.spliterator(), false))
1059-
.forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints);
1058+
assertBusy(() -> {
1059+
for (final ShardStats shardStats : client().admin().indices().prepareStats("index").get().getIndex("index").getShards()) {
1060+
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
1061+
assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream()
1062+
.allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1));
1063+
}
1064+
});
10601065
flush("index");
10611066
}
10621067
ForceMergeResponse forceMergeResponse =

test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public List<Setting<?>> getSettings() {
5050
PROVIDED_NAME_SETTING,
5151
TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING,
5252
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
53+
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING,
5354
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING
5455
);
5556
}

0 commit comments

Comments
 (0)