Skip to content

Commit d3d210a

Browse files
Allow to trim all ops above a certain seq# with a term lower than X
Relates to #10708
1 parent 0a6312a commit d3d210a

17 files changed

+314
-41
lines changed

server/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
2323
import org.elasticsearch.common.io.stream.StreamInput;
2424
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.index.seqno.SequenceNumbers;
2526
import org.elasticsearch.index.shard.ShardId;
2627
import org.elasticsearch.index.translog.Translog;
2728

@@ -33,25 +34,38 @@
3334
*/
3435
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {
3536

37+
private long belowTermId;
38+
private long trimmedAboveSeqNo;
3639
private Translog.Operation[] operations;
3740

3841
ResyncReplicationRequest() {
3942
super();
4043
}
4144

42-
public ResyncReplicationRequest(final ShardId shardId, final Translog.Operation[] operations) {
45+
public ResyncReplicationRequest(final ShardId shardId, final long belowTermId, final long trimmedAboveSeqNo,
46+
final Translog.Operation[] operations) {
4347
super(shardId);
48+
this.belowTermId = belowTermId;
49+
this.trimmedAboveSeqNo = trimmedAboveSeqNo;
4450
this.operations = operations;
4551
}
4652

53+
public long getBelowTermId() {
54+
return belowTermId;
55+
}
56+
57+
public long getTrimmedAboveSeqNo() {
58+
return trimmedAboveSeqNo;
59+
}
60+
4761
public Translog.Operation[] getOperations() {
4862
return operations;
4963
}
5064

5165
@Override
5266
public void readFrom(final StreamInput in) throws IOException {
5367
assert Version.CURRENT.major <= 7;
54-
if (in.getVersion().equals(Version.V_6_0_0)) {
68+
if (in.getVersion().onOrBefore(Version.V_6_0_0)) {
5569
/*
5670
* Resync replication request serialization was broken in 6.0.0 due to the elements of the stream not being prefixed with a
5771
* byte indicating the type of the operation.
@@ -60,12 +74,22 @@ public void readFrom(final StreamInput in) throws IOException {
6074
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
6175
}
6276
super.readFrom(in);
77+
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
78+
belowTermId = in.readVLong();
79+
trimmedAboveSeqNo = in.readVLong();
80+
} else {
81+
trimmedAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
82+
}
6383
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
6484
}
6585

6686
@Override
6787
public void writeTo(final StreamOutput out) throws IOException {
6888
super.writeTo(out);
89+
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
90+
out.writeVLong(belowTermId);
91+
out.writeVLong(trimmedAboveSeqNo);
92+
}
6993
out.writeArray(Translog.Operation::writeOperation, operations);
7094
}
7195

@@ -74,12 +98,14 @@ public boolean equals(final Object o) {
7498
if (this == o) return true;
7599
if (o == null || getClass() != o.getClass()) return false;
76100
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
77-
return Arrays.equals(operations, that.operations);
101+
return belowTermId == that.belowTermId
102+
&& trimmedAboveSeqNo == that.trimmedAboveSeqNo
103+
&& Arrays.equals(operations, that.operations);
78104
}
79105

80106
@Override
81107
public int hashCode() {
82-
return Arrays.hashCode(operations);
108+
return Long.hashCode(belowTermId) + 31 * (Long.hashCode(trimmedAboveSeqNo) + 31 * Arrays.hashCode(operations));
83109
}
84110

85111
@Override
@@ -88,6 +114,8 @@ public String toString() {
88114
"shardId=" + shardId +
89115
", timeout=" + timeout +
90116
", index='" + index + '\'' +
117+
", belowTermId=" + belowTermId +
118+
", trimmedAboveSeqNo=" + trimmedAboveSeqNo +
91119
", ops=" + operations.length +
92120
"}";
93121
}

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ public static Translog.Location performOnReplica(ResyncReplicationRequest reques
135135
}
136136
}
137137
}
138+
replica.trimTranslog(request.getBelowTermId(), request.getTrimmedAboveSeqNo());
138139
return location;
139140
}
140141

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,13 @@ boolean isThrottled() {
235235
*/
236236
public abstract boolean isThrottled();
237237

238+
/**
239+
* Trims translog for terms below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code>
240+
*/
241+
public void trimTranslog(long belowTerm, long aboveSeqNo) throws IOException {
242+
getTranslog().trim(belowTerm, aboveSeqNo);
243+
}
244+
238245
/** A Lock implementation that always allows the lock to be acquired */
239246
protected static final class NoOpLock implements Lock {
240247

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,10 @@ public void prepareForIndexRecovery() {
11971197
assert currentEngineReference.get() == null;
11981198
}
11991199

1200+
public void trimTranslog(long belowTerm, long aboveSeqNo) throws IOException {
1201+
getEngine().trimTranslog(belowTerm, aboveSeqNo);
1202+
}
1203+
12001204
public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin,
12011205
Consumer<Mapping> onMappingUpdate) throws IOException {
12021206
final Engine.Result result;

server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.unit.ByteSizeValue;
3636
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3737
import org.elasticsearch.common.xcontent.XContentBuilder;
38+
import org.elasticsearch.index.seqno.SeqNoStats;
3839
import org.elasticsearch.index.seqno.SequenceNumbers;
3940
import org.elasticsearch.index.translog.Translog;
4041
import org.elasticsearch.tasks.Task;
@@ -83,6 +84,8 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
8384
ActionListener<ResyncTask> resyncListener = null;
8485
try {
8586
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
87+
final SeqNoStats seqNoStats = indexShard.seqNoStats();
88+
final long maxSeqNo = seqNoStats != null ? seqNoStats.getMaxSeqNo() : SequenceNumbers.UNASSIGNED_SEQ_NO;
8689
Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
8790
resyncListener = new ActionListener<ResyncTask>() {
8891
@Override
@@ -135,7 +138,7 @@ public synchronized Translog.Operation next() throws IOException {
135138
}
136139
};
137140
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
138-
startingSeqNo, resyncListener);
141+
startingSeqNo, maxSeqNo, resyncListener);
139142
} catch (Exception e) {
140143
if (resyncListener != null) {
141144
resyncListener.onFailure(e);
@@ -146,7 +149,7 @@ public synchronized Translog.Operation next() throws IOException {
146149
}
147150

148151
private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
149-
long startingSeqNo, ActionListener<ResyncTask> listener) {
152+
long startingSeqNo, long maxSeqNo, ActionListener<ResyncTask> listener) {
150153
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
151154
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
152155
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
@@ -166,7 +169,7 @@ public void onFailure(Exception e) {
166169
};
167170
try {
168171
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
169-
startingSeqNo, wrappedListener).run();
172+
startingSeqNo, maxSeqNo, wrappedListener).run();
170173
} catch (Exception e) {
171174
wrappedListener.onFailure(e);
172175
}
@@ -186,14 +189,16 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
186189
private final ShardId shardId;
187190
private final Translog.Snapshot snapshot;
188191
private final long startingSeqNo;
192+
private final long maxSeqNo;
189193
private final int chunkSizeInBytes;
190194
private final ActionListener<Void> listener;
195+
private final AtomicInteger totalPackets = new AtomicInteger();
191196
private final AtomicInteger totalSentOps = new AtomicInteger();
192197
private final AtomicInteger totalSkippedOps = new AtomicInteger();
193198
private AtomicBoolean closed = new AtomicBoolean();
194199

195200
SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
196-
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, ActionListener<Void> listener) {
201+
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener<Void> listener) {
197202
this.logger = logger;
198203
this.syncAction = syncAction;
199204
this.task = task;
@@ -203,6 +208,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
203208
this.snapshot = snapshot;
204209
this.chunkSizeInBytes = chunkSizeInBytes;
205210
this.startingSeqNo = startingSeqNo;
211+
this.maxSeqNo = maxSeqNo;
206212
this.listener = listener;
207213
task.setTotalOperations(snapshot.totalOperations());
208214
}
@@ -248,11 +254,15 @@ protected void doRun() throws Exception {
248254
}
249255
}
250256

251-
if (!operations.isEmpty()) {
257+
final long trimmedAboveSeqNo = totalPackets.get() == 0 ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO;
258+
// have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least
259+
if (!operations.isEmpty() || totalPackets.get() == 0) {
252260
task.setPhase("sending_ops");
253-
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY));
261+
ResyncReplicationRequest request =
262+
new ResyncReplicationRequest(shardId, primaryTerm, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY));
254263
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
255264
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
265+
totalPackets.incrementAndGet();
256266
syncAction.sync(request, task, primaryAllocationId, primaryTerm, this);
257267
} else if (closed.compareAndSet(false, true)) {
258268
logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get());

server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,29 @@ final class Checkpoint {
4545
final long maxSeqNo;
4646
final long globalCheckpoint;
4747
final long minTranslogGeneration;
48+
final long trimmedAboveSeqNo;
4849

4950
private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before
50-
private static final int CURRENT_VERSION = 2; // introduction of global checkpoints
51+
private static final int VERSION_6_0_0 = 2; // introduction of global checkpoints
52+
private static final int CURRENT_VERSION = 3; // introduction of global checkpoints
5153

5254
private static final String CHECKPOINT_CODEC = "ckp";
5355

54-
// size of 6.0.0 checkpoint
56+
// size of 6.4.0 checkpoint
57+
5558
static final int FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC)
59+
+ Integer.BYTES // ops
60+
+ Long.BYTES // offset
61+
+ Long.BYTES // generation
62+
+ Long.BYTES // minimum sequence number, introduced in 6.0.0
63+
+ Long.BYTES // maximum sequence number, introduced in 6.0.0
64+
+ Long.BYTES // global checkpoint, introduced in 6.0.0
65+
+ Long.BYTES // minimum translog generation in the translog - introduced in 6.0.0
66+
+ Long.BYTES // maximum reachable (trimmed) sequence number, introduced in 6.4.0
67+
+ CodecUtil.footerLength();
68+
69+
// size of 6.0.0 checkpoint
70+
static final int V2_FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC)
5671
+ Integer.BYTES // ops
5772
+ Long.BYTES // offset
5873
+ Long.BYTES // generation
@@ -79,9 +94,12 @@ final class Checkpoint {
7994
* @param maxSeqNo the current maximum sequence number of all operations in the translog
8095
* @param globalCheckpoint the last-known global checkpoint
8196
* @param minTranslogGeneration the minimum generation referenced by the translog at this moment.
97+
* @param trimmedAboveSeqNo the current maximum reachable (trimmed) sequence number of all operations in the translog
8298
*/
83-
Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint, long minTranslogGeneration) {
99+
Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint,
100+
long minTranslogGeneration, long trimmedAboveSeqNo) {
84101
assert minSeqNo <= maxSeqNo : "minSeqNo [" + minSeqNo + "] is higher than maxSeqNo [" + maxSeqNo + "]";
102+
assert trimmedAboveSeqNo <= maxSeqNo : "trimmedAboveSeqNo [" + trimmedAboveSeqNo + "] is higher than maxSeqNo [" + maxSeqNo + "]";
85103
assert minTranslogGeneration <= generation :
86104
"minTranslogGen [" + minTranslogGeneration + "] is higher than generation [" + generation + "]";
87105
this.offset = offset;
@@ -91,6 +109,7 @@ final class Checkpoint {
91109
this.maxSeqNo = maxSeqNo;
92110
this.globalCheckpoint = globalCheckpoint;
93111
this.minTranslogGeneration = minTranslogGeneration;
112+
this.trimmedAboveSeqNo = trimmedAboveSeqNo;
94113
}
95114

96115
private void write(DataOutput out) throws IOException {
@@ -101,26 +120,49 @@ private void write(DataOutput out) throws IOException {
101120
out.writeLong(maxSeqNo);
102121
out.writeLong(globalCheckpoint);
103122
out.writeLong(minTranslogGeneration);
123+
out.writeLong(trimmedAboveSeqNo);
104124
}
105125

106126
static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint,
107127
long minTranslogGeneration) {
108128
final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
109129
final long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
110-
return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration);
130+
return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, maxSeqNo);
131+
}
132+
133+
static Checkpoint readCheckpointV6_4_0(final DataInput in) throws IOException {
134+
final long offset = in.readLong();
135+
final int numOps = in.readInt();
136+
final long generation = in.readLong();
137+
final long minSeqNo = in.readLong();
138+
final long maxSeqNo = in.readLong();
139+
final long globalCheckpoint = in.readLong();
140+
final long minTranslogGeneration = in.readLong();
141+
final long trimmedAboveSeqNo = in.readLong();
142+
return new Checkpoint(offset, numOps, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, trimmedAboveSeqNo);
111143
}
112144

113145
static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException {
114-
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
146+
final long offset = in.readLong();
147+
final int numOps = in.readInt();
148+
final long generation = in.readLong();
149+
final long minSeqNo = in.readLong();
150+
final long maxSeqNo = in.readLong();
151+
final long globalCheckpoint = in.readLong();
152+
final long minTranslogGeneration = in.readLong();
153+
return new Checkpoint(offset, numOps, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, maxSeqNo);
115154
}
116155

117156
// reads a checksummed checkpoint introduced in ES 5.0.0
118157
static Checkpoint readCheckpointV5_0_0(final DataInput in) throws IOException {
158+
final long offset = in.readLong();
159+
final int numOps = in.readInt();
160+
final long generation = in.readLong();
119161
final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
120162
final long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
121163
final long globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
122-
final long minTranslogGeneration = -1L;
123-
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration);
164+
final long minTranslogGeneration = SequenceNumbers.UNASSIGNED_SEQ_NO;
165+
return new Checkpoint(offset, numOps, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration, maxSeqNo);
124166
}
125167

126168
@Override
@@ -133,6 +175,7 @@ public String toString() {
133175
", maxSeqNo=" + maxSeqNo +
134176
", globalCheckpoint=" + globalCheckpoint +
135177
", minTranslogGeneration=" + minTranslogGeneration +
178+
", trimmedAboveSeqNo=" + trimmedAboveSeqNo +
136179
'}';
137180
}
138181

@@ -145,10 +188,13 @@ public static Checkpoint read(Path path) throws IOException {
145188
if (fileVersion == INITIAL_VERSION) {
146189
assert indexInput.length() == V1_FILE_SIZE : indexInput.length();
147190
return Checkpoint.readCheckpointV5_0_0(indexInput);
191+
} else if (fileVersion == VERSION_6_0_0) {
192+
assert indexInput.length() == V2_FILE_SIZE : indexInput.length();
193+
return Checkpoint.readCheckpointV6_0_0(indexInput);
148194
} else {
149195
assert fileVersion == CURRENT_VERSION : fileVersion;
150196
assert indexInput.length() == FILE_SIZE : indexInput.length();
151-
return Checkpoint.readCheckpointV6_0_0(indexInput);
197+
return Checkpoint.readCheckpointV6_4_0(indexInput);
152198
}
153199
}
154200
}
@@ -164,7 +210,7 @@ public synchronized byte[] toByteArray() {
164210
};
165211
final String resourceDesc = "checkpoint(path=\"" + checkpointFile + "\", gen=" + checkpoint + ")";
166212
try (OutputStreamIndexOutput indexOutput =
167-
new OutputStreamIndexOutput(resourceDesc, checkpointFile.toString(), byteOutputStream, FILE_SIZE)) {
213+
new OutputStreamIndexOutput(resourceDesc, checkpointFile.toString(), byteOutputStream, V2_FILE_SIZE)) {
168214
CodecUtil.writeHeader(indexOutput, CHECKPOINT_CODEC, CURRENT_VERSION);
169215
checkpoint.write(indexOutput);
170216
CodecUtil.writeFooter(indexOutput);
@@ -196,7 +242,8 @@ public boolean equals(Object o) {
196242
if (generation != that.generation) return false;
197243
if (minSeqNo != that.minSeqNo) return false;
198244
if (maxSeqNo != that.maxSeqNo) return false;
199-
return globalCheckpoint == that.globalCheckpoint;
245+
if (globalCheckpoint != that.globalCheckpoint) return false;
246+
return trimmedAboveSeqNo == that.trimmedAboveSeqNo;
200247
}
201248

202249
@Override
@@ -207,6 +254,7 @@ public int hashCode() {
207254
result = 31 * result + Long.hashCode(minSeqNo);
208255
result = 31 * result + Long.hashCode(maxSeqNo);
209256
result = 31 * result + Long.hashCode(globalCheckpoint);
257+
result = 31 * result + Long.hashCode(trimmedAboveSeqNo);
210258
return result;
211259
}
212260

0 commit comments

Comments
 (0)