Skip to content

Commit 331ef9d

Browse files
authored
Introduce retention lease state file (#39004)
This commit moves retention leases from being persisted in the Lucene commit point to being persisted in a dedicated state file.
1 parent 2c90534 commit 331ef9d

24 files changed

+485
-358
lines changed

docs/reference/indices/flush.asciidoc

+1-3
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,7 @@ which returns something similar to:
103103
"max_seq_no" : "-1",
104104
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
105105
"max_unsafe_auto_id_timestamp" : "-1",
106-
"min_retained_seq_no" : "0",
107-
"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"
106+
"min_retained_seq_no" : "0"
108107
},
109108
"num_docs" : 0
110109
}
@@ -119,7 +118,6 @@ which returns something similar to:
119118
// TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/]
120119
// TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/]
121120
// TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/]
122-
// TESTRESPONSE[s/"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/]
123121
<1> the `sync id` marker
124122

125123
[float]

server/src/main/java/org/elasticsearch/index/engine/Engine.java

-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ public abstract class Engine implements Closeable {
114114
public static final String SYNC_COMMIT_ID = "sync_id";
115115
public static final String HISTORY_UUID_KEY = "history_uuid";
116116
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
117-
public static final String RETENTION_LEASES = "retention_leases";
118117
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
119118

120119
protected final ShardId shardId;

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+1-9
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.elasticsearch.action.index.IndexRequest;
5252
import org.elasticsearch.common.Nullable;
5353
import org.elasticsearch.common.SuppressForbidden;
54-
import org.elasticsearch.common.collect.Tuple;
5554
import org.elasticsearch.common.lease.Releasable;
5655
import org.elasticsearch.common.lucene.LoggerInfoStream;
5756
import org.elasticsearch.common.lucene.Lucene;
@@ -75,7 +74,6 @@
7574
import org.elasticsearch.index.merge.MergeStats;
7675
import org.elasticsearch.index.merge.OnGoingMerge;
7776
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
78-
import org.elasticsearch.index.seqno.RetentionLeases;
7977
import org.elasticsearch.index.seqno.SeqNoStats;
8078
import org.elasticsearch.index.seqno.SequenceNumbers;
8179
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
@@ -2344,13 +2342,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
23442342
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
23452343
commitData.put(HISTORY_UUID_KEY, historyUUID);
23462344
if (softDeleteEnabled) {
2347-
/*
2348-
* We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
2349-
* retained sequence number, and the retention leases.
2350-
*/
2351-
final Tuple<Long, RetentionLeases> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
2352-
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
2353-
commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2()));
2345+
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
23542346
}
23552347
logger.trace("committing writer with commit data [{}]", commitData);
23562348
return commitData.entrySet().iterator();

server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.lucene.document.LongPoint;
2323
import org.apache.lucene.search.Query;
24-
import org.elasticsearch.common.collect.Tuple;
2524
import org.elasticsearch.common.lease.Releasable;
2625
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
2726
import org.elasticsearch.index.seqno.RetentionLease;
@@ -107,10 +106,6 @@ private synchronized void releaseRetentionLock() {
107106
* Operations whose seq# is least this value should exist in the Lucene index.
108107
*/
109108
synchronized long getMinRetainedSeqNo() {
110-
return getRetentionPolicy().v1();
111-
}
112-
113-
public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
114109
/*
115110
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
116111
* locked for peer recovery.
@@ -151,7 +146,7 @@ public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
151146
*/
152147
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
153148
}
154-
return Tuple.tuple(minRetainedSeqNo, retentionLeases);
149+
return minRetainedSeqNo;
155150
}
156151

157152
/**

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+37
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,16 @@
3232
import org.elasticsearch.common.io.stream.StreamInput;
3333
import org.elasticsearch.common.io.stream.StreamOutput;
3434
import org.elasticsearch.common.io.stream.Writeable;
35+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
36+
import org.elasticsearch.gateway.WriteStateException;
3537
import org.elasticsearch.index.IndexSettings;
3638
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
3739
import org.elasticsearch.index.shard.IndexShard;
3840
import org.elasticsearch.index.shard.ReplicationGroup;
3941
import org.elasticsearch.index.shard.ShardId;
4042

4143
import java.io.IOException;
44+
import java.nio.file.Path;
4245
import java.util.Collection;
4346
import java.util.Collections;
4447
import java.util.HashMap;
@@ -318,6 +321,40 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re
318321
}
319322
}
320323

324+
/**
325+
* Loads the latest retention leases from their dedicated state file.
326+
*
327+
* @param path the path to the directory containing the state file
328+
* @return the retention leases
329+
* @throws IOException if an I/O exception occurs reading the retention leases
330+
*/
331+
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
332+
final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
333+
if (retentionLeases == null) {
334+
return RetentionLeases.EMPTY;
335+
}
336+
return retentionLeases;
337+
}
338+
339+
private final Object retentionLeasePersistenceLock = new Object();
340+
341+
/**
342+
* Persists the current retention leases to their dedicated state file.
343+
*
344+
* @param path the path to the directory containing the state file
345+
* @throws WriteStateException if an exception occurs writing the state file
346+
*/
347+
public void persistRetentionLeases(final Path path) throws WriteStateException {
348+
synchronized (retentionLeasePersistenceLock) {
349+
final RetentionLeases currentRetentionLeases;
350+
synchronized (this) {
351+
currentRetentionLeases = retentionLeases;
352+
}
353+
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
354+
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
355+
}
356+
}
357+
321358
public static class CheckpointState implements Writeable {
322359

323360
/**

server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java

+44-43
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919

2020
package org.elasticsearch.index.seqno;
2121

22+
import org.elasticsearch.common.ParseField;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.common.io.stream.Writeable;
26+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
27+
import org.elasticsearch.common.xcontent.ToXContent;
28+
import org.elasticsearch.common.xcontent.XContentBuilder;
29+
import org.elasticsearch.common.xcontent.XContentParser;
2530

2631
import java.io.IOException;
27-
import java.util.Arrays;
28-
import java.util.Locale;
2932
import java.util.Objects;
3033

3134
/**
@@ -34,7 +37,7 @@
3437
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
3538
* number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr").
3639
*/
37-
public final class RetentionLease implements Writeable {
40+
public final class RetentionLease implements ToXContent, Writeable {
3841

3942
private final String id;
4043

@@ -94,10 +97,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final
9497
if (id.isEmpty()) {
9598
throw new IllegalArgumentException("retention lease ID can not be empty");
9699
}
97-
if (id.contains(":") || id.contains(";") || id.contains(",")) {
98-
// retention lease IDs can not contain these characters because they are used in encoding retention leases
99-
throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]");
100-
}
101100
if (retainingSequenceNumber < 0) {
102101
throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
103102
}
@@ -108,10 +107,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final
108107
if (source.isEmpty()) {
109108
throw new IllegalArgumentException("retention lease source can not be empty");
110109
}
111-
if (source.contains(":") || source.contains(";") || source.contains(",")) {
112-
// retention lease sources can not contain these characters because they are used in encoding retention leases
113-
throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]");
114-
}
115110
this.id = id;
116111
this.retainingSequenceNumber = retainingSequenceNumber;
117112
this.timestamp = timestamp;
@@ -145,43 +140,49 @@ public void writeTo(final StreamOutput out) throws IOException {
145140
out.writeString(source);
146141
}
147142

148-
/**
149-
* Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is
150-
* encoded in the format <code>id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}</code>.
151-
*
152-
* @param retentionLease the retention lease
153-
* @return the encoding of the retention lease
154-
*/
155-
static String encodeRetentionLease(final RetentionLease retentionLease) {
156-
Objects.requireNonNull(retentionLease);
157-
return String.format(
158-
Locale.ROOT,
159-
"id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
160-
retentionLease.id,
161-
retentionLease.retainingSequenceNumber,
162-
retentionLease.timestamp,
163-
retentionLease.source);
143+
private static final ParseField ID_FIELD = new ParseField("id");
144+
private static final ParseField RETAINING_SEQUENCE_NUMBER_FIELD = new ParseField("retaining_sequence_number");
145+
private static final ParseField TIMESTAMP_FIELD = new ParseField("timestamp");
146+
private static final ParseField SOURCE_FIELD = new ParseField("source");
147+
148+
private static ConstructingObjectParser<RetentionLease, Void> PARSER = new ConstructingObjectParser<>(
149+
"retention_leases",
150+
(a) -> new RetentionLease((String) a[0], (Long) a[1], (Long) a[2], (String) a[3]));
151+
152+
static {
153+
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
154+
PARSER.declareLong(ConstructingObjectParser.constructorArg(), RETAINING_SEQUENCE_NUMBER_FIELD);
155+
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD);
156+
PARSER.declareString(ConstructingObjectParser.constructorArg(), SOURCE_FIELD);
157+
}
158+
159+
@Override
160+
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
161+
builder.startObject();
162+
{
163+
builder.field(ID_FIELD.getPreferredName(), id);
164+
builder.field(RETAINING_SEQUENCE_NUMBER_FIELD.getPreferredName(), retainingSequenceNumber);
165+
builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp);
166+
builder.field(SOURCE_FIELD.getPreferredName(), source);
167+
}
168+
builder.endObject();
169+
return builder;
170+
}
171+
172+
@Override
173+
public boolean isFragment() {
174+
return false;
164175
}
165176

166177
/**
167-
* Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
178+
* Parses a retention lease from {@link org.elasticsearch.common.xcontent.XContent}. This method assumes that the retention lease was
179+
* converted to {@link org.elasticsearch.common.xcontent.XContent} via {@link #toXContent(XContentBuilder, Params)}.
168180
*
169-
* @param encodedRetentionLease an encoded retention lease
170-
* @return the decoded retention lease
181+
* @param parser the parser
182+
* @return a retention lease
171183
*/
172-
static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
173-
Objects.requireNonNull(encodedRetentionLease);
174-
final String[] fields = encodedRetentionLease.split(";");
175-
assert fields.length == 4 : Arrays.toString(fields);
176-
assert fields[0].matches("id:[^:;,]+") : fields[0];
177-
final String id = fields[0].substring("id:".length());
178-
assert fields[1].matches("retaining_seq_no:\\d+") : fields[1];
179-
final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length()));
180-
assert fields[2].matches("timestamp:\\d+") : fields[2];
181-
final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length()));
182-
assert fields[3].matches("source:[^:;,]+") : fields[3];
183-
final String source = fields[3].substring("source:".length());
184-
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
184+
public static RetentionLease fromXContent(final XContentParser parser) {
185+
return PARSER.apply(parser, null);
185186
}
186187

187188
@Override

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.common.io.stream.StreamOutput;
3838
import org.elasticsearch.common.settings.Settings;
3939
import org.elasticsearch.common.util.concurrent.ThreadContext;
40+
import org.elasticsearch.gateway.WriteStateException;
4041
import org.elasticsearch.index.shard.IndexShard;
4142
import org.elasticsearch.index.shard.IndexShardClosedException;
4243
import org.elasticsearch.index.shard.ShardId;
@@ -119,19 +120,21 @@ public void backgroundSync(
119120
}
120121

121122
@Override
122-
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(final Request request, final IndexShard primary) {
123+
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
124+
final Request request,
125+
final IndexShard primary) throws WriteStateException {
123126
Objects.requireNonNull(request);
124127
Objects.requireNonNull(primary);
125-
primary.afterWriteOperation();
128+
primary.persistRetentionLeases();
126129
return new PrimaryResult<>(request, new ReplicationResponse());
127130
}
128131

129132
@Override
130-
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){
133+
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws WriteStateException {
131134
Objects.requireNonNull(request);
132135
Objects.requireNonNull(replica);
133136
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
134-
replica.afterWriteOperation();
137+
replica.persistRetentionLeases();
135138
return new ReplicaResult();
136139
}
137140

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

+9-14
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.lucene.store.AlreadyClosedException;
2626
import org.elasticsearch.ExceptionsHelper;
2727
import org.elasticsearch.action.ActionListener;
28-
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2928
import org.elasticsearch.action.support.ActionFilters;
3029
import org.elasticsearch.action.support.WriteResponse;
3130
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
@@ -39,6 +38,7 @@
3938
import org.elasticsearch.common.io.stream.StreamOutput;
4039
import org.elasticsearch.common.settings.Settings;
4140
import org.elasticsearch.common.util.concurrent.ThreadContext;
41+
import org.elasticsearch.gateway.WriteStateException;
4242
import org.elasticsearch.index.shard.IndexShard;
4343
import org.elasticsearch.index.shard.IndexShardClosedException;
4444
import org.elasticsearch.index.shard.ShardId;
@@ -121,31 +121,26 @@ public void sync(
121121
}
122122

123123
@Override
124-
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(final Request request, final IndexShard primary) {
124+
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
125+
final Request request,
126+
final IndexShard primary) throws WriteStateException {
125127
Objects.requireNonNull(request);
126128
Objects.requireNonNull(primary);
127-
// we flush to ensure that retention leases are committed
128-
flush(primary);
129+
primary.persistRetentionLeases();
129130
return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
130131
}
131132

132133
@Override
133-
protected WriteReplicaResult<Request> shardOperationOnReplica(final Request request, final IndexShard replica) {
134+
protected WriteReplicaResult<Request> shardOperationOnReplica(
135+
final Request request,
136+
final IndexShard replica) throws WriteStateException {
134137
Objects.requireNonNull(request);
135138
Objects.requireNonNull(replica);
136139
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
137-
// we flush to ensure that retention leases are committed
138-
flush(replica);
140+
replica.persistRetentionLeases();
139141
return new WriteReplicaResult<>(request, null, null, replica, logger);
140142
}
141143

142-
private void flush(final IndexShard indexShard) {
143-
final FlushRequest flushRequest = new FlushRequest();
144-
flushRequest.force(true);
145-
flushRequest.waitIfOngoing(true);
146-
indexShard.flush(flushRequest);
147-
}
148-
149144
public static final class Request extends ReplicatedWriteRequest<Request> {
150145

151146
private RetentionLeases retentionLeases;

0 commit comments

Comments
 (0)