Skip to content

Commit b043d0a

Browse files
committed
Advance PRRLs to match GCP of tracked shards
This commit adjusts the behaviour of the retention lease sync to first renew any peer-recovery retention leases where either: - the corresponding shard's global checkpoint has advanced, or - the lease is older than half of its expiry time Relates elastic#41536
1 parent 00145cd commit b043d0a

File tree

8 files changed

+228
-44
lines changed

8 files changed

+228
-44
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+29-5
Original file line numberDiff line numberDiff line change
@@ -457,16 +457,40 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting)
457457
}
458458

459459
/**
460-
* Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done
461-
* properly. TODO remove this.
460+
* Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global
461+
* checkpoint, and renew any leases that are approaching expiry.
462462
*/
463-
public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
463+
public synchronized void renewPeerRecoveryRetentionLeases() {
464464
assert primaryMode;
465+
466+
/*
467+
* Peer-recovery retention leases never expire while the associated shard is assigned, but we must still renew them occasionally in
468+
* case the associated shard is temporarily unassigned. However we must not renew them too often, since each renewal must be
469+
* persisted and the resulting IO can be expensive on nodes with large numbers of shards (see #42299). We choose to renew them after
470+
* half the expiry time, so that by default the cluster has at least 6 hours to recover before these leases start to expire.
471+
*/
472+
final long renewalTimeMillis = currentTimeMillisSupplier.getAsLong() - indexSettings.getRetentionLeaseMillis() / 2;
473+
465474
for (ShardRouting shardRouting : routingTable) {
466475
if (shardRouting.assignedToNode()) {
467476
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
468-
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), Math.max(0L, checkpointState.globalCheckpoint + 1L),
469-
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
477+
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
478+
if (retentionLease == null) {
479+
if (checkpointState.tracked) {
480+
/*
481+
* BWC: We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
482+
* leases for every shard copy. TODO create leases lazily
483+
*/
484+
assert indexSettings.getIndexVersionCreated().before(Version.V_8_0_0) : indexSettings.getIndexVersionCreated();
485+
}
486+
} else {
487+
if (retentionLease.retainingSequenceNumber() <= checkpointState.globalCheckpoint
488+
|| retentionLease.timestamp() <= renewalTimeMillis) {
489+
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting),
490+
Math.max(0L, checkpointState.globalCheckpoint + 1L),
491+
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
492+
}
493+
}
470494
}
471495
}
472496
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+1-10
Original file line numberDiff line numberDiff line change
@@ -2038,6 +2038,7 @@ public void syncRetentionLeases() {
20382038
assert assertPrimaryMode();
20392039
verifyNotClosed();
20402040
ensureSoftDeletesEnabled("retention leases");
2041+
replicationTracker.renewPeerRecoveryRetentionLeases();
20412042
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
20422043
if (retentionLeases.v1()) {
20432044
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
@@ -2429,16 +2430,6 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
24292430
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
24302431
}
24312432

2432-
/**
2433-
* Test-only method to advance the all shards' peer-recovery retention leases to their tracked global checkpoints so that operations
2434-
* can be discarded. TODO Remove this when retention leases are advanced by other mechanisms.
2435-
*/
2436-
public void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
2437-
assert assertPrimaryMode();
2438-
replicationTracker.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
2439-
syncRetentionLeases();
2440-
}
2441-
24422433
class ShardEventListener implements Engine.EventListener {
24432434
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();
24442435

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java

+155
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.test.IndexSettingsModule;
3838

3939
import java.io.IOException;
40+
import java.util.ArrayList;
4041
import java.util.Arrays;
4142
import java.util.Collection;
4243
import java.util.Collections;
@@ -50,6 +51,7 @@
5051
import java.util.concurrent.atomic.AtomicBoolean;
5152
import java.util.concurrent.atomic.AtomicLong;
5253
import java.util.function.BiConsumer;
54+
import java.util.function.Consumer;
5355
import java.util.function.Function;
5456
import java.util.function.LongConsumer;
5557
import java.util.stream.Collectors;
@@ -61,6 +63,9 @@
6163
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
6264
import static org.hamcrest.Matchers.equalTo;
6365
import static org.hamcrest.Matchers.greaterThan;
66+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
67+
import static org.hamcrest.Matchers.hasItem;
68+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6469
import static org.hamcrest.Matchers.not;
6570

6671
public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
@@ -975,4 +980,154 @@ private static void addPeerRecoveryRetentionLease(final ReplicationTracker track
975980
addPeerRecoveryRetentionLease(tracker, AllocationId.newInitializing(allocationId));
976981
}
977982

983+
public void testPeerRecoveryRetentionLeaseCreationAndRenewal() {
984+
985+
final int numberOfActiveAllocationsIds = randomIntBetween(1, 8);
986+
final int numberOfInitializingIds = randomIntBetween(0, 8);
987+
final Tuple<Set<AllocationId>, Set<AllocationId>> activeAndInitializingAllocationIds =
988+
randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
989+
final Set<AllocationId> activeAllocationIds = activeAndInitializingAllocationIds.v1();
990+
final Set<AllocationId> initializingAllocationIds = activeAndInitializingAllocationIds.v2();
991+
992+
final AllocationId primaryId = activeAllocationIds.iterator().next();
993+
994+
final long initialClusterStateVersion = randomNonNegativeLong();
995+
996+
final AtomicLong currentTimeMillis = new AtomicLong(0L);
997+
final ReplicationTracker tracker = newTracker(primaryId, updatedGlobalCheckpoint::set, currentTimeMillis::get);
998+
999+
final long retentionLeaseExpiryTimeMillis = tracker.indexSettings().getRetentionLeaseMillis();
1000+
final long peerRecoveryRetentionLeaseRenewalTimeMillis = retentionLeaseExpiryTimeMillis / 2;
1001+
1002+
final long maximumTestTimeMillis = 13 * retentionLeaseExpiryTimeMillis;
1003+
final long testStartTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - maximumTestTimeMillis);
1004+
currentTimeMillis.set(testStartTimeMillis);
1005+
1006+
final Function<AllocationId, RetentionLease> retentionLeaseFromAllocationId = allocationId
1007+
-> new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeIdFromAllocationId(allocationId)),
1008+
0L, currentTimeMillis.get(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE);
1009+
1010+
final List<RetentionLease> initialLeases = new ArrayList<>();
1011+
if (randomBoolean()) {
1012+
initialLeases.add(retentionLeaseFromAllocationId.apply(primaryId));
1013+
}
1014+
for (final AllocationId replicaId : initializingAllocationIds) {
1015+
if (randomBoolean()) {
1016+
initialLeases.add(retentionLeaseFromAllocationId.apply(replicaId));
1017+
}
1018+
}
1019+
for (int i = randomIntBetween(0, 5); i > 0; i--) {
1020+
initialLeases.add(retentionLeaseFromAllocationId.apply(AllocationId.newInitializing()));
1021+
}
1022+
tracker.updateRetentionLeasesOnReplica(new RetentionLeases(randomNonNegativeLong(), randomNonNegativeLong(), initialLeases));
1023+
1024+
IndexShardRoutingTable routingTable = routingTable(initializingAllocationIds, primaryId);
1025+
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable);
1026+
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
1027+
assertTrue("primary's retention lease should exist",
1028+
tracker.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(routingTable.primaryShard())));
1029+
1030+
final Consumer<Runnable> assertAsTimePasses = assertion -> {
1031+
final long startTime = currentTimeMillis.get();
1032+
while (currentTimeMillis.get() < startTime + retentionLeaseExpiryTimeMillis * 2) {
1033+
currentTimeMillis.addAndGet(randomLongBetween(0L, retentionLeaseExpiryTimeMillis * 2));
1034+
tracker.renewPeerRecoveryRetentionLeases();
1035+
tracker.getRetentionLeases(true);
1036+
assertion.run();
1037+
}
1038+
};
1039+
1040+
assertAsTimePasses.accept(() -> {
1041+
// Leases for assigned replicas do not expire
1042+
final RetentionLeases retentionLeases = tracker.getRetentionLeases();
1043+
for (final AllocationId replicaId : initializingAllocationIds) {
1044+
final String leaseId = retentionLeaseFromAllocationId.apply(replicaId).id();
1045+
assertTrue("should not have removed lease for " + replicaId + " in " + retentionLeases,
1046+
initialLeases.stream().noneMatch(l -> l.id().equals(leaseId)) || retentionLeases.contains(leaseId));
1047+
}
1048+
});
1049+
1050+
// Leases that don't correspond to assigned replicas, however, are expired by this time.
1051+
final Set<String> expectedLeaseIds = Stream.concat(Stream.of(primaryId), initializingAllocationIds.stream())
1052+
.map(allocationId -> retentionLeaseFromAllocationId.apply(allocationId).id()).collect(Collectors.toSet());
1053+
for (final RetentionLease retentionLease : tracker.getRetentionLeases().leases()) {
1054+
assertThat(expectedLeaseIds, hasItem(retentionLease.id()));
1055+
}
1056+
1057+
for (AllocationId replicaId : initializingAllocationIds) {
1058+
markAsTrackingAndInSyncQuietly(tracker, replicaId.getId(), NO_OPS_PERFORMED);
1059+
}
1060+
1061+
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
1062+
equalTo(expectedLeaseIds));
1063+
1064+
assertAsTimePasses.accept(() -> {
1065+
// Leases still don't expire
1066+
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
1067+
equalTo(expectedLeaseIds));
1068+
1069+
// Also leases are renewed before reaching half the expiry time
1070+
//noinspection OptionalGetWithoutIsPresent
1071+
assertThat(tracker.getRetentionLeases() + " renewed before too long",
1072+
tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong(),
1073+
greaterThanOrEqualTo(currentTimeMillis.get() - peerRecoveryRetentionLeaseRenewalTimeMillis));
1074+
});
1075+
1076+
IndexShardRoutingTable.Builder routingTableBuilder = new IndexShardRoutingTable.Builder(routingTable);
1077+
for (ShardRouting replicaShard : routingTable.replicaShards()) {
1078+
routingTableBuilder.removeShard(replicaShard);
1079+
routingTableBuilder.addShard(replicaShard.moveToStarted());
1080+
}
1081+
routingTable = routingTableBuilder.build();
1082+
activeAllocationIds.addAll(initializingAllocationIds);
1083+
1084+
tracker.updateFromMaster(initialClusterStateVersion + randomLongBetween(1, 10), ids(activeAllocationIds), routingTable);
1085+
1086+
assertAsTimePasses.accept(() -> {
1087+
// Leases still don't expire
1088+
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
1089+
equalTo(expectedLeaseIds));
1090+
// ... and any extra peer recovery retention leases are expired immediately since the shard is fully active
1091+
tracker.addPeerRecoveryRetentionLease(randomAlphaOfLength(10), randomNonNegativeLong(), ActionListener.wrap(() -> {}));
1092+
});
1093+
1094+
tracker.renewPeerRecoveryRetentionLeases();
1095+
assertTrue("expired extra lease", tracker.getRetentionLeases(true).v1());
1096+
1097+
1098+
for (RetentionLease retentionLease : tracker.getRetentionLeases().leases()) {
1099+
// update all leases' timestamps so they don't need a time-based renewal for a while
1100+
tracker.renewRetentionLease(retentionLease.id(), retentionLease.retainingSequenceNumber(), retentionLease.source());
1101+
}
1102+
1103+
final AllocationId advancingAllocationId
1104+
= initializingAllocationIds.isEmpty() || rarely() ? primaryId : randomFrom(initializingAllocationIds);
1105+
final String advancingLeaseId = retentionLeaseFromAllocationId.apply(advancingAllocationId).id();
1106+
1107+
final long initialGlobalCheckpoint
1108+
= Math.max(NO_OPS_PERFORMED, tracker.getTrackedLocalCheckpointForShard(advancingAllocationId.getId()).globalCheckpoint);
1109+
assertThat(tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(initialGlobalCheckpoint + 1));
1110+
final long newGlobalCheckpoint = initialGlobalCheckpoint + randomLongBetween(1, 1000);
1111+
tracker.updateGlobalCheckpointForShard(advancingAllocationId.getId(), newGlobalCheckpoint);
1112+
tracker.renewPeerRecoveryRetentionLeases();
1113+
assertThat("lease was renewed because the shard advanced its global checkpoint",
1114+
tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(newGlobalCheckpoint + 1));
1115+
1116+
final long initialVersion = tracker.getRetentionLeases().version();
1117+
tracker.renewPeerRecoveryRetentionLeases();
1118+
assertThat("immediate renewal is a no-op", tracker.getRetentionLeases().version(), equalTo(initialVersion));
1119+
1120+
final long shorterThanRenewalTime = randomLongBetween(0L, peerRecoveryRetentionLeaseRenewalTimeMillis - 1);
1121+
currentTimeMillis.addAndGet(shorterThanRenewalTime);
1122+
tracker.renewPeerRecoveryRetentionLeases();
1123+
assertThat("renewal is a no-op after a short time", tracker.getRetentionLeases().version(), equalTo(initialVersion));
1124+
1125+
currentTimeMillis.addAndGet(peerRecoveryRetentionLeaseRenewalTimeMillis - shorterThanRenewalTime);
1126+
tracker.renewPeerRecoveryRetentionLeases();
1127+
assertThat("renewal happens after a sufficiently long time", tracker.getRetentionLeases().version(), greaterThan(initialVersion));
1128+
1129+
assertThat("test ran for too long, potentially leading to overflow",
1130+
currentTimeMillis.get(), lessThanOrEqualTo(testStartTimeMillis + maximumTestTimeMillis));
1131+
}
1132+
9781133
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -2954,7 +2954,7 @@ public void testDocStats() throws Exception {
29542954
indexShard.getLocalCheckpoint());
29552955
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
29562956
indexShard.getLocalCheckpoint());
2957-
indexShard.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
2957+
indexShard.syncRetentionLeases();
29582958
} else {
29592959
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");
29602960

@@ -3553,7 +3553,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception {
35533553
primary.updateGlobalCheckpointForShard(
35543554
primary.routingEntry().allocationId().getId(),
35553555
primary.getLastSyncedGlobalCheckpoint());
3556-
primary.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
3556+
primary.syncRetentionLeases();
35573557
primary.sync();
35583558
flushShard(primary);
35593559
}

server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
import java.util.concurrent.atomic.AtomicBoolean;
8282
import java.util.concurrent.atomic.AtomicInteger;
8383
import java.util.concurrent.atomic.AtomicReference;
84-
import java.util.stream.StreamSupport;
8584

8685
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
8786
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -1009,7 +1008,10 @@ private void assertCumulativeQueryCacheStats(IndicesStatsResponse response) {
10091008
}
10101009

10111010
public void testFilterCacheStats() throws Exception {
1012-
Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build();
1011+
Settings settings = Settings.builder().put(indexSettings())
1012+
.put("number_of_replicas", 0)
1013+
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms")
1014+
.build();
10131015
assertAcked(prepareCreate("index").setSettings(settings).get());
10141016
indexRandom(false, true,
10151017
client().prepareIndex("index", "type", "1").setSource("foo", "bar"),
@@ -1053,10 +1055,13 @@ public void testFilterCacheStats() throws Exception {
10531055
// we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything.
10541056
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
10551057
persistGlobalCheckpoint("index");
1056-
internalCluster().nodesInclude("index").stream()
1057-
.flatMap(n -> StreamSupport.stream(internalCluster().getInstance(IndicesService.class, n).spliterator(), false))
1058-
.flatMap(n -> StreamSupport.stream(n.spliterator(), false))
1059-
.forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints);
1058+
assertBusy(() -> {
1059+
for (final ShardStats shardStats : client().admin().indices().prepareStats("index").get().getIndex("index").getShards()) {
1060+
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
1061+
assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream()
1062+
.allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1));
1063+
}
1064+
});
10601065
flush("index");
10611066
}
10621067
ForceMergeResponse forceMergeResponse =

test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public List<Setting<?>> getSettings() {
5050
PROVIDED_NAME_SETTING,
5151
TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING,
5252
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
53+
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING,
5354
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING
5455
);
5556
}

0 commit comments

Comments
 (0)