Skip to content

Commit c8c596c

Browse files
authored
Introduce retention lease expiration (#37195)
This commit implements a straightforward approach to retention lease expiration. Namely, we inspect which leases are expired when obtaining the current leases through the replication tracker. At that moment, we clean the map that persists the retention leases in memory.
1 parent c5aac47 commit c8c596c

File tree

13 files changed

+245
-29
lines changed

13 files changed

+245
-29
lines changed

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
133133
IndexSettings.INDEX_GC_DELETES_SETTING,
134134
IndexSettings.INDEX_SOFT_DELETES_SETTING,
135135
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
136+
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING,
136137
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
137138
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
138139
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,

server/src/main/java/org/elasticsearch/index/IndexSettings.java

+24
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,17 @@ public final class IndexSettings {
256256
Setting.longSetting("index.soft_deletes.retention.operations", 0, 0,
257257
Property.IndexScope, Property.Dynamic);
258258

259+
/**
260+
* Controls the maximum length of time since a retention lease is created or renewed before it is considered expired.
261+
*/
262+
public static final Setting<TimeValue> INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING =
263+
Setting.timeSetting(
264+
"index.soft_deletes.retention.lease",
265+
TimeValue.timeValueHours(12),
266+
TimeValue.ZERO,
267+
Property.Dynamic,
268+
Property.IndexScope);
269+
259270
/**
260271
* The maximum number of refresh listeners allows on this shard.
261272
*/
@@ -316,6 +327,18 @@ public final class IndexSettings {
316327
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
317328
private final boolean softDeleteEnabled;
318329
private volatile long softDeleteRetentionOperations;
330+
331+
private volatile long retentionLeaseMillis;
332+
333+
/**
334+
* The maximum age of a retention lease before it is considered expired.
335+
*
336+
* @return the maximum age
337+
*/
338+
public long getRetentionLeaseMillis() {
339+
return retentionLeaseMillis;
340+
}
341+
319342
private volatile boolean warmerEnabled;
320343
private volatile int maxResultWindow;
321344
private volatile int maxInnerResultWindow;
@@ -431,6 +454,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
431454
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
432455
softDeleteEnabled = version.onOrAfter(Version.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
433456
softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
457+
retentionLeaseMillis = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING).millis();
434458
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
435459
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING);
436460
maxInnerResultWindow = scopedSettings.get(MAX_INNER_RESULT_WINDOW_SETTING);

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.elasticsearch.index.shard.ShardId;
3636

3737
import java.io.IOException;
38-
import java.util.ArrayList;
3938
import java.util.Collection;
4039
import java.util.Collections;
4140
import java.util.HashMap;
@@ -137,6 +136,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
137136
*/
138137
private final LongConsumer onGlobalCheckpointUpdated;
139138

139+
/**
140+
* A supplier of the current time. This supplier is used to add a timestamp to retention leases, and to determine retention lease
141+
* expiration.
142+
*/
143+
private final LongSupplier currentTimeMillisSupplier;
144+
140145
/**
141146
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
142147
* current global checkpoint.
@@ -151,12 +156,21 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
151156
private final Map<String, RetentionLease> retentionLeases = new HashMap<>();
152157

153158
/**
154-
* Get all retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
159+
* Get all non-expired retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
155160
*
156161
* @return the retention leases
157162
*/
158163
public synchronized Collection<RetentionLease> getRetentionLeases() {
159-
return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values()));
164+
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
165+
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
166+
final Collection<RetentionLease> nonExpiredRetentionLeases = retentionLeases
167+
.values()
168+
.stream()
169+
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() <= retentionLeaseMillis)
170+
.collect(Collectors.toList());
171+
retentionLeases.clear();
172+
retentionLeases.putAll(nonExpiredRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, lease -> lease)));
173+
return Collections.unmodifiableCollection(nonExpiredRetentionLeases);
160174
}
161175

162176
/**
@@ -168,7 +182,7 @@ public synchronized Collection<RetentionLease> getRetentionLeases() {
168182
*/
169183
public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
170184
assert primaryMode;
171-
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, source));
185+
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source));
172186
}
173187

174188
public static class CheckpointState implements Writeable {
@@ -425,7 +439,8 @@ public ReplicationTracker(
425439
final String allocationId,
426440
final IndexSettings indexSettings,
427441
final long globalCheckpoint,
428-
final LongConsumer onGlobalCheckpointUpdated) {
442+
final LongConsumer onGlobalCheckpointUpdated,
443+
final LongSupplier currentTimeMillisSupplier) {
429444
super(shardId, indexSettings);
430445
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
431446
this.shardAllocationId = allocationId;
@@ -435,6 +450,7 @@ public ReplicationTracker(
435450
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());
436451
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
437452
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
453+
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
438454
this.pendingInSync = new HashSet<>();
439455
this.routingTable = null;
440456
this.replicationGroup = null;

server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
* A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
2424
* that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could
2525
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
26-
* number, and the source of the retention lease (e.g., "ccr").
26+
* number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr").
2727
*/
28-
public class RetentionLease {
28+
public final class RetentionLease {
2929

3030
private final String id;
3131

@@ -50,6 +50,17 @@ public long retainingSequenceNumber() {
5050
return retainingSequenceNumber;
5151
}
5252

53+
private final long timestamp;
54+
55+
/**
56+
* The timestamp of when this retention lease was created or renewed.
57+
*
58+
* @return the timestamp used as a basis for determining lease expiration
59+
*/
60+
public long timestamp() {
61+
return timestamp;
62+
}
63+
5364
private final String source;
5465

5566
/**
@@ -66,19 +77,22 @@ public String source() {
6677
*
6778
* @param id the identifier of the retention lease
6879
* @param retainingSequenceNumber the retaining sequence number
80+
* @param timestamp the timestamp of when the retention lease was created or renewed
6981
* @param source the source of the retention lease
7082
*/
71-
public RetentionLease(final String id, final long retainingSequenceNumber, final String source) {
83+
public RetentionLease(final String id, final long retainingSequenceNumber, final long timestamp, final String source) {
7284
this.id = id;
7385
this.retainingSequenceNumber = retainingSequenceNumber;
86+
this.timestamp = timestamp;
7487
this.source = source;
7588
}
7689

7790
@Override
7891
public String toString() {
79-
return "ShardHistoryRetentionLease{" +
92+
return "RetentionLease{" +
8093
"id='" + id + '\'' +
8194
", retainingSequenceNumber=" + retainingSequenceNumber +
95+
", timestamp=" + timestamp +
8296
", source='" + source + '\'' +
8397
'}';
8498
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,13 @@ public IndexShard(
305305
this.globalCheckpointListeners =
306306
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
307307
this.replicationTracker =
308-
new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated);
308+
new ReplicationTracker(
309+
shardId,
310+
aId,
311+
indexSettings,
312+
UNASSIGNED_SEQ_NO,
313+
globalCheckpointListeners::globalCheckpointUpdated,
314+
threadPool::absoluteTimeInMillis);
309315

310316
// the query cache is a node-level thing, however we want the most popular filters
311317
// to be computed on a per-shard basis

server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void testSoftDeletesRetentionLock() {
5555
() -> {
5656
final Set<RetentionLease> leases = new HashSet<>(retainingSequenceNumbers.length);
5757
for (int i = 0; i < retainingSequenceNumbers.length; i++) {
58-
leases.add(new RetentionLease(Integer.toString(i), retainingSequenceNumbers[i].get(), "test"));
58+
leases.add(new RetentionLease(Integer.toString(i), retainingSequenceNumbers[i].get(), 0L, "test"));
5959
}
6060
return leases;
6161
};

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

+67-4
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,23 @@
2121

2222
import org.elasticsearch.cluster.routing.AllocationId;
2323
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.common.unit.TimeValue;
25+
import org.elasticsearch.index.IndexSettings;
2426
import org.elasticsearch.index.shard.ShardId;
2527
import org.elasticsearch.test.IndexSettingsModule;
2628

2729
import java.util.Collection;
2830
import java.util.Collections;
2931
import java.util.HashMap;
3032
import java.util.Map;
33+
import java.util.concurrent.atomic.AtomicLong;
34+
import java.util.function.LongSupplier;
3135

3236
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
3337
import static org.hamcrest.Matchers.equalTo;
3438
import static org.hamcrest.Matchers.hasItem;
3539
import static org.hamcrest.Matchers.hasSize;
40+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3641

3742
public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTestCase {
3843

@@ -43,7 +48,8 @@ public void testAddOrUpdateRetentionLease() {
4348
id.getId(),
4449
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
4550
UNASSIGNED_SEQ_NO,
46-
value -> {});
51+
value -> {},
52+
() -> 0L);
4753
replicationTracker.updateFromMaster(
4854
randomNonNegativeLong(),
4955
Collections.singleton(id.getId()),
@@ -55,19 +61,73 @@ public void testAddOrUpdateRetentionLease() {
5561
for (int i = 0; i < length; i++) {
5662
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
5763
replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
58-
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers);
64+
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L);
5965
}
6066

6167
for (int i = 0; i < length; i++) {
6268
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
6369
replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
64-
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers);
70+
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L);
6571
}
6672

6773
}
6874

75+
public void testExpiration() {
76+
final AllocationId id = AllocationId.newInitializing();
77+
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
78+
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
79+
final Settings settings = Settings
80+
.builder()
81+
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
82+
.build();
83+
final ReplicationTracker replicationTracker = new ReplicationTracker(
84+
new ShardId("test", "_na", 0),
85+
id.getId(),
86+
IndexSettingsModule.newIndexSettings("test", settings),
87+
UNASSIGNED_SEQ_NO,
88+
value -> {},
89+
currentTimeMillis::get);
90+
replicationTracker.updateFromMaster(
91+
randomNonNegativeLong(),
92+
Collections.singleton(id.getId()),
93+
routingTable(Collections.emptySet(), id),
94+
Collections.emptySet());
95+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
96+
final long[] retainingSequenceNumbers = new long[1];
97+
retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
98+
replicationTracker.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0");
99+
100+
{
101+
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
102+
assertThat(retentionLeases, hasSize(1));
103+
final RetentionLease retentionLease = retentionLeases.iterator().next();
104+
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
105+
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
106+
}
107+
108+
// renew the lease
109+
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024));
110+
retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE);
111+
replicationTracker.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0");
112+
113+
{
114+
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
115+
assertThat(retentionLeases, hasSize(1));
116+
final RetentionLease retentionLease = retentionLeases.iterator().next();
117+
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
118+
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
119+
}
120+
121+
// now force the lease to expire
122+
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
123+
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get);
124+
}
125+
69126
private void assertRetentionLeases(
70-
final ReplicationTracker replicationTracker, final int size, final long[] minimumRetainingSequenceNumbers) {
127+
final ReplicationTracker replicationTracker,
128+
final int size,
129+
final long[] minimumRetainingSequenceNumbers,
130+
final LongSupplier currentTimeMillisSupplier) {
71131
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
72132
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
73133
for (final RetentionLease retentionLease : retentionLeases) {
@@ -79,6 +139,9 @@ private void assertRetentionLeases(
79139
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
80140
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
81141
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
142+
assertThat(
143+
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
144+
lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
82145
assertThat(retentionLease.source(), equalTo("test-" + i));
83146
}
84147
}

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,23 @@
3131

3232
import java.util.Set;
3333
import java.util.function.LongConsumer;
34+
import java.util.function.LongSupplier;
3435

3536
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
3637

3738
public abstract class ReplicationTrackerTestCase extends ESTestCase {
3839

39-
ReplicationTracker newTracker(final AllocationId allocationId, final LongConsumer updatedGlobalCheckpoint) {
40+
ReplicationTracker newTracker(
41+
final AllocationId allocationId,
42+
final LongConsumer updatedGlobalCheckpoint,
43+
final LongSupplier currentTimeMillisSupplier) {
4044
return new ReplicationTracker(
4145
new ShardId("test", "_na_", 0),
4246
allocationId.getId(),
4347
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
4448
UNASSIGNED_SEQ_NO,
45-
updatedGlobalCheckpoint);
49+
updatedGlobalCheckpoint,
50+
currentTimeMillisSupplier);
4651
}
4752

4853
static IndexShardRoutingTable routingTable(final Set<AllocationId> initializingIds, final AllocationId primaryId) {

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ public void testWaitForAllocationIdToBeInSync() throws Exception {
406406
private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO);
407407

408408
private ReplicationTracker newTracker(final AllocationId allocationId) {
409-
return newTracker(allocationId, updatedGlobalCheckpoint::set);
409+
return newTracker(allocationId, updatedGlobalCheckpoint::set, () -> 0L);
410410
}
411411

412412
public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException {
@@ -683,10 +683,10 @@ public void testPrimaryContextHandoff() throws IOException {
683683
final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId();
684684
final LongConsumer onUpdate = updatedGlobalCheckpoint -> {};
685685
final long globalCheckpoint = UNASSIGNED_SEQ_NO;
686-
ReplicationTracker oldPrimary =
687-
new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate);
688-
ReplicationTracker newPrimary =
689-
new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate);
686+
ReplicationTracker oldPrimary = new ReplicationTracker(
687+
shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L);
688+
ReplicationTracker newPrimary = new ReplicationTracker(
689+
shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L);
690690

691691
Set<String> allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId));
692692

0 commit comments

Comments
 (0)