Skip to content

Commit c54f6b0

Browse files
committed
Introduce retention lease persistence (#37375)
This commit introduces the persistence of retention leases by persisting them in index commits and recovering them when recovering a shard from store.
1 parent fcecaee commit c54f6b0

File tree

10 files changed

+383
-30
lines changed

10 files changed

+383
-30
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public abstract class Engine implements Closeable {
113113
public static final String SYNC_COMMIT_ID = "sync_id";
114114
public static final String HISTORY_UUID_KEY = "history_uuid";
115115
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
116+
public static final String RETENTION_LEASES = "retention_leases";
116117
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
117118

118119
protected final ShardId shardId;

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.action.index.IndexRequest;
5252
import org.elasticsearch.common.Nullable;
5353
import org.elasticsearch.common.SuppressForbidden;
54+
import org.elasticsearch.common.collect.Tuple;
5455
import org.elasticsearch.common.lease.Releasable;
5556
import org.elasticsearch.common.lucene.LoggerInfoStream;
5657
import org.elasticsearch.common.lucene.Lucene;
@@ -75,6 +76,7 @@
7576
import org.elasticsearch.index.merge.MergeStats;
7677
import org.elasticsearch.index.merge.OnGoingMerge;
7778
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
79+
import org.elasticsearch.index.seqno.RetentionLease;
7880
import org.elasticsearch.index.seqno.SeqNoStats;
7981
import org.elasticsearch.index.seqno.SequenceNumbers;
8082
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
@@ -2417,7 +2419,13 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
24172419
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
24182420
commitData.put(HISTORY_UUID_KEY, historyUUID);
24192421
if (softDeleteEnabled) {
2420-
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
2422+
/*
2423+
* We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
2424+
* retained sequence number, and the retention leases.
2425+
*/
2426+
final Tuple<Long, Collection<RetentionLease>> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
2427+
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
2428+
commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2()));
24212429
}
24222430
logger.trace("committing writer with commit data [{}]", commitData);
24232431
return commitData.entrySet().iterator();

server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.lucene.document.LongPoint;
2323
import org.apache.lucene.search.Query;
24+
import org.elasticsearch.common.collect.Tuple;
2425
import org.elasticsearch.common.lease.Releasable;
2526
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
2627
import org.elasticsearch.index.seqno.RetentionLease;
@@ -45,6 +46,7 @@ final class SoftDeletesPolicy {
4546
private long retentionOperations;
4647
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
4748
private long minRetainedSeqNo;
49+
private Collection<RetentionLease> retentionLeases;
4850
// provides the retention leases used to calculate the minimum sequence number to retain
4951
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
5052

@@ -57,6 +59,7 @@ final class SoftDeletesPolicy {
5759
this.retentionOperations = retentionOperations;
5860
this.minRetainedSeqNo = minRetainedSeqNo;
5961
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
62+
retentionLeases = retentionLeasesSupplier.get();
6063
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
6164
this.retentionLockCount = 0;
6265
}
@@ -106,7 +109,11 @@ private synchronized void releaseRetentionLock() {
106109
* Operations whose seq# is least this value should exist in the Lucene index.
107110
*/
108111
synchronized long getMinRetainedSeqNo() {
109-
// Do not advance if the retention lock is held
112+
return getRetentionPolicy().v1();
113+
}
114+
115+
public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy() {
116+
// do not advance if the retention lock is held
110117
if (retentionLockCount == 0) {
111118
/*
112119
* This policy retains operations for two purposes: peer-recovery and querying changes history.
@@ -119,8 +126,8 @@ synchronized long getMinRetainedSeqNo() {
119126
*/
120127

121128
// calculate the minimum sequence number to retain based on retention leases
122-
final long minimumRetainingSequenceNumber = retentionLeasesSupplier
123-
.get()
129+
retentionLeases = retentionLeasesSupplier.get();
130+
final long minimumRetainingSequenceNumber = retentionLeases
124131
.stream()
125132
.mapToLong(RetentionLease::retainingSequenceNumber)
126133
.min()
@@ -139,7 +146,7 @@ synchronized long getMinRetainedSeqNo() {
139146
*/
140147
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
141148
}
142-
return minRetainedSeqNo;
149+
return Tuple.tuple(minRetainedSeqNo, retentionLeases);
143150
}
144151

145152
/**

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,17 @@ public synchronized void addOrUpdateRetentionLease(final String id, final long r
185185
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source));
186186
}
187187

188+
/**
189+
* Updates retention leases on a replica.
190+
*
191+
* @param retentionLeases the retention leases
192+
*/
193+
public synchronized void updateRetentionLeasesOnReplica(final Collection<RetentionLease> retentionLeases) {
194+
assert primaryMode == false;
195+
this.retentionLeases.clear();
196+
this.retentionLeases.putAll(retentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())));
197+
}
198+
188199
public static class CheckpointState implements Writeable {
189200

190201
/**

server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919

2020
package org.elasticsearch.index.seqno;
2121

22+
import java.util.Arrays;
23+
import java.util.Collection;
24+
import java.util.Collections;
25+
import java.util.Locale;
26+
import java.util.Objects;
27+
import java.util.stream.Collectors;
28+
2229
/**
2330
* A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
2431
* that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could
@@ -81,18 +88,118 @@ public String source() {
8188
* @param source the source of the retention lease
8289
*/
8390
public RetentionLease(final String id, final long retainingSequenceNumber, final long timestamp, final String source) {
91+
Objects.requireNonNull(id);
92+
if (id.isEmpty()) {
93+
throw new IllegalArgumentException("retention lease ID can not be empty");
94+
}
95+
if (id.contains(":") || id.contains(";") || id.contains(",")) {
96+
// retention lease IDs can not contain these characters because they are used in encoding retention leases
97+
throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]");
98+
}
8499
if (retainingSequenceNumber < SequenceNumbers.UNASSIGNED_SEQ_NO) {
85100
throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
86101
}
87102
if (timestamp < 0) {
88103
throw new IllegalArgumentException("retention lease timestamp [" + timestamp + "] out of range");
89104
}
105+
Objects.requireNonNull(source);
106+
if (source.isEmpty()) {
107+
throw new IllegalArgumentException("retention lease source can not be empty");
108+
}
109+
if (source.contains(":") || source.contains(";") || source.contains(",")) {
110+
// retention lease sources can not contain these characters because they are used in encoding retention leases
111+
throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]");
112+
}
90113
this.id = id;
91114
this.retainingSequenceNumber = retainingSequenceNumber;
92115
this.timestamp = timestamp;
93116
this.source = source;
94117
}
95118

119+
/**
120+
* Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is
121+
* encoded in the format <code>id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}</code>.
122+
*
123+
* @param retentionLease the retention lease
124+
* @return the encoding of the retention lease
125+
*/
126+
static String encodeRetentionLease(final RetentionLease retentionLease) {
127+
Objects.requireNonNull(retentionLease);
128+
return String.format(
129+
Locale.ROOT,
130+
"id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
131+
retentionLease.id(),
132+
retentionLease.retainingSequenceNumber(),
133+
retentionLease.timestamp(),
134+
retentionLease.source());
135+
}
136+
137+
/**
138+
* Encodes a collection of retention leases as a string. This encoding can be decoed by {@link #decodeRetentionLeases(String)}. The
139+
* encoding is a comma-separated encoding of each retention lease as encoded by {@link #encodeRetentionLease(RetentionLease)}.
140+
*
141+
* @param retentionLeases the retention leases
142+
* @return the encoding of the retention leases
143+
*/
144+
public static String encodeRetentionLeases(final Collection<RetentionLease> retentionLeases) {
145+
Objects.requireNonNull(retentionLeases);
146+
return retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(","));
147+
}
148+
149+
/**
150+
* Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
151+
*
152+
* @param encodedRetentionLease an encoded retention lease
153+
* @return the decoded retention lease
154+
*/
155+
static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
156+
Objects.requireNonNull(encodedRetentionLease);
157+
final String[] fields = encodedRetentionLease.split(";");
158+
assert fields.length == 4 : Arrays.toString(fields);
159+
assert fields[0].matches("id:[^:;,]+") : fields[0];
160+
final String id = fields[0].substring("id:".length());
161+
assert fields[1].matches("retaining_seq_no:\\d+") : fields[1];
162+
final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length()));
163+
assert fields[2].matches("timestamp:\\d+") : fields[2];
164+
final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length()));
165+
assert fields[3].matches("source:[^:;,]+") : fields[3];
166+
final String source = fields[3].substring("source:".length());
167+
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
168+
}
169+
170+
/**
171+
* Decodes retention leases encoded by {@link #encodeRetentionLeases(Collection)}.
172+
*
173+
* @param encodedRetentionLeases an encoded collection of retention leases
174+
* @return the decoded retention leases
175+
*/
176+
public static Collection<RetentionLease> decodeRetentionLeases(final String encodedRetentionLeases) {
177+
Objects.requireNonNull(encodedRetentionLeases);
178+
if (encodedRetentionLeases.isEmpty()) {
179+
return Collections.emptyList();
180+
}
181+
assert Arrays.stream(encodedRetentionLeases.split(","))
182+
.allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
183+
: encodedRetentionLeases;
184+
return Arrays.stream(encodedRetentionLeases.split(",")).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList());
185+
}
186+
187+
@Override
188+
public boolean equals(final Object o) {
189+
if (this == o) return true;
190+
if (o == null || getClass() != o.getClass()) return false;
191+
final RetentionLease that = (RetentionLease) o;
192+
return Objects.equals(id, that.id) &&
193+
retainingSequenceNumber == that.retainingSequenceNumber &&
194+
timestamp == that.timestamp &&
195+
Objects.equals(source, that.source);
196+
}
197+
198+
@Override
199+
public int hashCode() {
200+
return Objects.hash(id, retainingSequenceNumber, timestamp, source);
201+
}
202+
96203
@Override
97204
public String toString() {
98205
return "RetentionLease{" +

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import org.elasticsearch.index.search.stats.SearchStats;
109109
import org.elasticsearch.index.search.stats.ShardSearchStats;
110110
import org.elasticsearch.index.seqno.ReplicationTracker;
111+
import org.elasticsearch.index.seqno.RetentionLease;
111112
import org.elasticsearch.index.seqno.SeqNoStats;
112113
import org.elasticsearch.index.seqno.SequenceNumbers;
113114
import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
@@ -142,6 +143,7 @@
142143
import java.nio.channels.ClosedByInterruptException;
143144
import java.nio.charset.StandardCharsets;
144145
import java.util.ArrayList;
146+
import java.util.Collection;
145147
import java.util.Collections;
146148
import java.util.EnumSet;
147149
import java.util.List;
@@ -1448,6 +1450,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
14481450
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
14491451
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
14501452
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
1453+
replicationTracker.updateRetentionLeasesOnReplica(getRetentionLeases(store.readLastCommittedSegmentsInfo()));
14511454
trimUnsafeCommits();
14521455
synchronized (mutex) {
14531456
verifyNotClosed();
@@ -1467,6 +1470,14 @@ private void innerOpenEngineAndTranslog() throws IOException {
14671470
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
14681471
}
14691472

1473+
static Collection<RetentionLease> getRetentionLeases(final SegmentInfos segmentInfos) {
1474+
final String committedRetentionLeases = segmentInfos.getUserData().get(Engine.RETENTION_LEASES);
1475+
if (committedRetentionLeases == null) {
1476+
return Collections.emptyList();
1477+
}
1478+
return RetentionLease.decodeRetentionLeases(committedRetentionLeases);
1479+
}
1480+
14701481
private void trimUnsafeCommits() throws IOException {
14711482
assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running";
14721483
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.elasticsearch.index.mapper.SourceFieldMapper;
118118
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
119119
import org.elasticsearch.index.seqno.ReplicationTracker;
120+
import org.elasticsearch.index.seqno.RetentionLease;
120121
import org.elasticsearch.index.seqno.SeqNoStats;
121122
import org.elasticsearch.index.seqno.SequenceNumbers;
122123
import org.elasticsearch.index.shard.IndexSearcherWrapper;
@@ -140,6 +141,7 @@
140141
import java.util.ArrayList;
141142
import java.util.Arrays;
142143
import java.util.Base64;
144+
import java.util.Collection;
143145
import java.util.Collections;
144146
import java.util.Comparator;
145147
import java.util.HashMap;
@@ -5266,13 +5268,14 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
52665268
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
52675269
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
52685270
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
5271+
final AtomicReference<Collection<RetentionLease>> leasesHolder = new AtomicReference<>(Collections.emptyList());
52695272
final List<Engine.Operation> operations = generateSingleDocHistory(true,
52705273
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 10, 300, "2");
52715274
Randomness.shuffle(operations);
52725275
Set<Long> existingSeqNos = new HashSet<>();
52735276
store = createStore();
5274-
engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null,
5275-
globalCheckpoint::get));
5277+
engine = createEngine(
5278+
config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, leasesHolder::get));
52765279
assertThat(engine.getMinRetainedSeqNo(), equalTo(0L));
52775280
long lastMinRetainedSeqNo = engine.getMinRetainedSeqNo();
52785281
for (Engine.Operation op : operations) {
@@ -5286,6 +5289,18 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
52865289
if (randomBoolean()) {
52875290
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint()));
52885291
}
5292+
if (randomBoolean()) {
5293+
final int length = randomIntBetween(0, 8);
5294+
final List<RetentionLease> leases = new ArrayList<>(length);
5295+
for (int i = 0; i < length; i++) {
5296+
final String id = randomAlphaOfLength(8);
5297+
final long retainingSequenceNumber = randomLongBetween(0L, Math.max(0L, globalCheckpoint.get()));
5298+
final long timestamp = randomLongBetween(0L, Long.MAX_VALUE);
5299+
final String source = randomAlphaOfLength(8);
5300+
leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source));
5301+
}
5302+
leasesHolder.set(leases);
5303+
}
52895304
if (rarely()) {
52905305
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
52915306
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
@@ -5298,6 +5313,14 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
52985313
engine.flush(true, true);
52995314
assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)),
53005315
equalTo(engine.getMinRetainedSeqNo()));
5316+
final Collection<RetentionLease> leases = leasesHolder.get();
5317+
if (leases.isEmpty()) {
5318+
assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo(""));
5319+
} else {
5320+
assertThat(
5321+
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES),
5322+
equalTo(RetentionLease.encodeRetentionLeases(leases)));
5323+
}
53015324
}
53025325
if (rarely()) {
53035326
engine.forceMerge(randomBoolean());

0 commit comments

Comments
 (0)