Skip to content

Commit 2774c89

Browse files
committed
Propagate max_auto_id_timestamp in peer recovery (elastic#33693)
Today we don't store the auto-generated timestamp of append-only operations in Lucene; and assign -1 to every index operations constructed from LuceneChangesSnapshot. This looks innocent but it generates duplicate documents on a replica if a retry append-only arrives first via peer-recovery; then an original append-only arrives via replication. Since the retry append-only (delivered via recovery) does not have timestamp, the replica will happily optimizes the original request while it should not. This change transmits the max auto-generated timestamp from the primary to replicas before translog phase in peer recovery. This timestamp will prevent replicas from optimizing append-only requests if retry counterparts have been processed. Relates elastic#33656 Relates elastic#33222
1 parent 972eb78 commit 2774c89

16 files changed

+237
-38
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.lucene.util.Accountables;
4646
import org.apache.lucene.util.SetOnce;
4747
import org.elasticsearch.ExceptionsHelper;
48+
import org.elasticsearch.action.index.IndexRequest;
4849
import org.elasticsearch.common.CheckedRunnable;
4950
import org.elasticsearch.common.FieldMemoryStats;
5051
import org.elasticsearch.common.Nullable;
@@ -1767,6 +1768,21 @@ public boolean isRecovering() {
17671768
*/
17681769
public abstract void maybePruneDeletes();
17691770

1771+
/**
1772+
* Returns the maximum auto_id_timestamp of all append-only index requests have been processed by this engine
1773+
* or the auto_id_timestamp received from its primary shard via {@link #updateMaxUnsafeAutoIdTimestamp(long)}.
1774+
* Notes this method returns the auto_id_timestamp of all append-only requests, not max_unsafe_auto_id_timestamp.
1775+
*/
1776+
public long getMaxSeenAutoIdTimestamp() {
1777+
return IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
1778+
}
1779+
1780+
/**
1781+
* Forces this engine to advance its max_unsafe_auto_id_timestamp marker to at least the given timestamp.
1782+
* The engine will disable optimization for all append-only whose timestamp at most {@code newTimestamp}.
1783+
*/
1784+
public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp);
1785+
17701786
@FunctionalInterface
17711787
public interface TranslogRecoveryRunner {
17721788
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public class InternalEngine extends Engine {
144144
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
145145
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
146146
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
147+
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
147148
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
148149
private final CounterMetric numVersionLookups = new CounterMetric();
149150
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
@@ -170,7 +171,7 @@ public InternalEngine(EngineConfig engineConfig) {
170171
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
171172
super(engineConfig);
172173
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
173-
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
174+
updateAutoIdTimestamp(Long.MAX_VALUE, true);
174175
}
175176
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
176177
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
@@ -374,7 +375,7 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
374375
if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
375376
assert maxUnsafeAutoIdTimestamp.get() == -1 :
376377
"max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
377-
maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue()));
378+
updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true);
378379
}
379380
if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
380381
assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 :
@@ -1075,11 +1076,12 @@ private boolean mayHaveBeenIndexedBefore(Index index) {
10751076
final boolean mayHaveBeenIndexBefore;
10761077
if (index.isRetry()) {
10771078
mayHaveBeenIndexBefore = true;
1078-
maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(index.getAutoGeneratedIdTimestamp(), curr));
1079+
updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), true);
10791080
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
10801081
} else {
10811082
// in this case we force
10821083
mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
1084+
updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), false);
10831085
}
10841086
return mayHaveBeenIndexBefore;
10851087
}
@@ -2367,7 +2369,7 @@ public void onSettingsChanged() {
23672369
// this is an anti-viral settings you can only opt out for the entire index
23682370
// only if a shard starts up again due to relocation or if the index is closed
23692371
// the setting will be re-interpreted if it's set to true
2370-
this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
2372+
updateAutoIdTimestamp(Long.MAX_VALUE, true);
23712373
}
23722374
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
23732375
final IndexSettings indexSettings = engineConfig.getIndexSettings();
@@ -2606,4 +2608,24 @@ void updateRefreshedCheckpoint(long checkpoint) {
26062608
assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint;
26072609
}
26082610
}
2611+
2612+
@Override
2613+
public final long getMaxSeenAutoIdTimestamp() {
2614+
return maxSeenAutoIdTimestamp.get();
2615+
}
2616+
2617+
@Override
2618+
public final void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
2619+
updateAutoIdTimestamp(newTimestamp, true);
2620+
}
2621+
2622+
private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
2623+
assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]";
2624+
maxSeenAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp));
2625+
if (unsafe) {
2626+
maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp));
2627+
}
2628+
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
2629+
}
2630+
26092631
}

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,4 +374,9 @@ public void maybePruneDeletes() {
374374
public DocsStats docStats() {
375375
return docsStats;
376376
}
377+
378+
@Override
379+
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
380+
381+
}
377382
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,6 +1257,29 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
12571257
getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo);
12581258
}
12591259

1260+
/**
1261+
* Returns the maximum auto_id_timestamp of all append-only requests have been processed by this shard or the auto_id_timestamp received
1262+
* from the primary via {@link #updateMaxUnsafeAutoIdTimestamp(long)} at the beginning of a peer-recovery or a primary-replica resync.
1263+
*
1264+
* @see #updateMaxUnsafeAutoIdTimestamp(long)
1265+
*/
1266+
public long getMaxSeenAutoIdTimestamp() {
1267+
return getEngine().getMaxSeenAutoIdTimestamp();
1268+
}
1269+
1270+
/**
1271+
* Since operations stored in soft-deletes do not have max_auto_id_timestamp, the primary has to propagate its max_auto_id_timestamp
1272+
* (via {@link #getMaxSeenAutoIdTimestamp()} of all processed append-only requests to replicas at the beginning of a peer-recovery
1273+
* or a primary-replica resync to force a replica to disable optimization for all append-only requests which are replicated via
1274+
* replication while its retry variants are replicated via recovery without auto_id_timestamp.
1275+
* <p>
1276+
* Without this force-update, a replica can generate duplicate documents (for the same id) if it first receives
1277+
* a retry append-only (without timestamp) via recovery, then an original append-only (with timestamp) via replication.
1278+
*/
1279+
public void updateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimary) {
1280+
getEngine().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampFromPrimary);
1281+
}
1282+
12601283
public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
12611284
final Engine.Result result;
12621285
switch (operation.opType()) {

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,8 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin
451451
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
452452
final RecoveryTarget recoveryTarget = recoveryRef.target();
453453
try {
454-
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps());
454+
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
455+
request.maxSeenAutoIdTimestampOnPrimary());
455456
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
456457
} catch (MapperException exception) {
457458
// in very rare cases a translog replay from primary is processed before a mapping update on this node

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,10 @@ public RecoveryResponse recoverToTarget() throws IOException {
215215
}
216216
final long targetLocalCheckpoint;
217217
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
218-
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
218+
// We have to capture the max auto_id_timestamp after taking a snapshot of operations to guarantee
219+
// that the auto_id_timestamp of every operation in the snapshot is at most this timestamp value.
220+
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
221+
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp);
219222
} catch (Exception e) {
220223
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
221224
}
@@ -447,9 +450,11 @@ void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTr
447450
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
448451
* @param endingSeqNo the highest sequence number that should be sent
449452
* @param snapshot a snapshot of the translog
453+
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
450454
* @return the local checkpoint on the target
451455
*/
452-
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot)
456+
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot,
457+
final long maxSeenAutoIdTimestamp)
453458
throws IOException {
454459
if (shard.state() == IndexShardState.CLOSED) {
455460
throw new IndexShardClosedException(request.shardId());
@@ -462,7 +467,8 @@ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingS
462467
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");
463468

464469
// send all the snapshot's translog operations to the target
465-
final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
470+
final SendSnapshotResult result = sendSnapshot(
471+
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp);
466472

467473
stopWatch.stop();
468474
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
@@ -530,10 +536,11 @@ static class SendSnapshotResult {
530536
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
531537
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
532538
* total number of operations sent
539+
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
533540
* @throws IOException if an I/O exception occurred reading the translog snapshot
534541
*/
535542
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
536-
final Translog.Snapshot snapshot) throws IOException {
543+
final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp) throws IOException {
537544
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
538545
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
539546
assert startingSeqNo <= requiredSeqNoRangeStart :
@@ -551,8 +558,8 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require
551558
logger.trace("no translog operations to send");
552559
}
553560

554-
final CancellableThreads.IOInterruptable sendBatch =
555-
() -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps));
561+
final CancellableThreads.IOInterruptable sendBatch = () ->
562+
targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, maxSeenAutoIdTimestamp));
556563

557564
// send operations in batches
558565
Translog.Operation operation;

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,13 +389,21 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
389389
}
390390

391391
@Override
392-
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
392+
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
393+
long maxSeenAutoIdTimestampOnPrimary) throws IOException {
393394
final RecoveryState.Translog translog = state().getTranslog();
394395
translog.totalOperations(totalTranslogOps);
395396
assert indexShard().recoveryState() == state();
396397
if (indexShard().state() != IndexShardState.RECOVERING) {
397398
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
398399
}
400+
/*
401+
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
402+
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
403+
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
404+
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
405+
*/
406+
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
399407
for (Translog.Operation operation : operations) {
400408
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
401409
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,13 @@ public interface RecoveryTargetHandler {
6060

6161
/**
6262
* Index a set of translog operations on the target
63-
* @param operations operations to index
64-
* @param totalTranslogOps current number of total operations expected to be indexed
65-
*
63+
* @param operations operations to index
64+
* @param totalTranslogOps current number of total operations expected to be indexed
65+
* @param maxSeenAutoIdTimestampOnPrimary the maximum auto_id_timestamp of all append-only requests processed by the primary shard
6666
* @return the local checkpoint on the target shard
6767
*/
68-
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException;
68+
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
69+
long maxSeenAutoIdTimestampOnPrimary) throws IOException;
6970

7071
/**
7172
* Notifies the target of the files it is going to receive

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.indices.recovery;
2121

22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.action.index.IndexRequest;
2224
import org.elasticsearch.common.io.stream.StreamInput;
2325
import org.elasticsearch.common.io.stream.StreamOutput;
2426
import org.elasticsearch.index.shard.ShardId;
@@ -34,15 +36,18 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
3436
private ShardId shardId;
3537
private List<Translog.Operation> operations;
3638
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
39+
private long maxSeenAutoIdTimestampOnPrimary;
3740

3841
public RecoveryTranslogOperationsRequest() {
3942
}
4043

41-
RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations, int totalTranslogOps) {
44+
RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations,
45+
int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) {
4246
this.recoveryId = recoveryId;
4347
this.shardId = shardId;
4448
this.operations = operations;
4549
this.totalTranslogOps = totalTranslogOps;
50+
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
4651
}
4752

4853
public long recoveryId() {
@@ -61,13 +66,22 @@ public int totalTranslogOps() {
6166
return totalTranslogOps;
6267
}
6368

69+
public long maxSeenAutoIdTimestampOnPrimary() {
70+
return maxSeenAutoIdTimestampOnPrimary;
71+
}
72+
6473
@Override
6574
public void readFrom(StreamInput in) throws IOException {
6675
super.readFrom(in);
6776
recoveryId = in.readLong();
6877
shardId = ShardId.readShardId(in);
6978
operations = Translog.readOperations(in, "recovery");
7079
totalTranslogOps = in.readVInt();
80+
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
81+
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
82+
} else {
83+
maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
84+
}
7185
}
7286

7387
@Override
@@ -77,5 +91,8 @@ public void writeTo(StreamOutput out) throws IOException {
7791
shardId.writeTo(out);
7892
Translog.writeOperations(out, operations);
7993
out.writeVInt(totalTranslogOps);
94+
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
95+
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
96+
}
8097
}
8198
}

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,9 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
110110
}
111111

112112
@Override
113-
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
113+
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) {
114114
final RecoveryTranslogOperationsRequest translogOperationsRequest =
115-
new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps);
115+
new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary);
116116
final TransportFuture<RecoveryTranslogOperationsResponse> future = transportService.submitRequest(
117117
targetNode,
118118
PeerRecoveryTargetService.Actions.TRANSLOG_OPS,

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3556,6 +3556,8 @@ public void run() {
35563556
}
35573557
assertEquals(0, engine.getNumVersionLookups());
35583558
assertEquals(0, engine.getNumIndexVersionsLookups());
3559+
assertThat(engine.getMaxSeenAutoIdTimestamp(),
3560+
equalTo(docs.stream().mapToLong(Engine.Index::getAutoGeneratedIdTimestamp).max().getAsLong()));
35593561
assertLuceneOperations(engine, numDocs, 0, 0);
35603562
}
35613563

0 commit comments

Comments
 (0)