Skip to content

Commit 73bfdc4

Browse files
authored
Simplify initialization of max_seq_no of updates (#41161)
Today we choose to initialize max_seq_no_of_updates on primaries only so we can deal with a situation where a primary is on an old node (before 6.5) which does not have MUS while replicas on new nodes (6.5+). However, this strategy is quite complex and can lead to bugs (for example #40249) since we have to assign a correct value (not too low) to MSU in all possible situations (before recovering from translog, restoring history on promotion, and handing off relocation). Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes in the cluster should have MSU. This change simplifies the initialization of MSU by always assigning it a correct value in the constructor of Engine regardless of whether it's a replica or primary. Relates #33842
1 parent 7a407f5 commit 73bfdc4

File tree

11 files changed

+40
-112
lines changed

11 files changed

+40
-112
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@
9898
import java.util.concurrent.CountDownLatch;
9999
import java.util.concurrent.TimeUnit;
100100
import java.util.concurrent.atomic.AtomicBoolean;
101-
import java.util.concurrent.atomic.AtomicLong;
102101
import java.util.concurrent.locks.Condition;
103102
import java.util.concurrent.locks.Lock;
104103
import java.util.concurrent.locks.ReentrantLock;
@@ -142,16 +141,6 @@ public abstract class Engine implements Closeable {
142141
*/
143142
protected volatile long lastWriteNanos = System.nanoTime();
144143

145-
/*
146-
* This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine.
147-
* An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
148-
* This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized.
149-
* The value of this marker never goes backwards, and is updated/changed differently on primary and replica:
150-
* 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete.
151-
* 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes).
152-
*/
153-
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(UNASSIGNED_SEQ_NO);
154-
155144
protected Engine(EngineConfig engineConfig) {
156145
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
157146

@@ -1961,25 +1950,13 @@ public interface TranslogRecoveryRunner {
19611950
* Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates
19621951
* in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed.
19631952
*
1964-
* @see #reinitializeMaxSeqNoOfUpdatesOrDeletes()
19651953
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
19661954
*/
1967-
public final long getMaxSeqNoOfUpdatesOrDeletes() {
1968-
return maxSeqNoOfUpdatesOrDeletes.get();
1969-
}
1970-
1971-
/**
1972-
* A primary shard calls this method to re-initialize the max_seq_no_of_updates marker using the
1973-
* max_seq_no from Lucene index and translog before replaying the local translog in its local recovery.
1974-
*/
1975-
public abstract void reinitializeMaxSeqNoOfUpdatesOrDeletes();
1955+
public abstract long getMaxSeqNoOfUpdatesOrDeletes();
19761956

19771957
/**
19781958
* A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method
19791959
* to advance this marker to at least the given sequence number.
19801960
*/
1981-
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
1982-
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo));
1983-
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo;
1984-
}
1961+
public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);
19851962
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.lucene.util.InfoStream;
4848
import org.elasticsearch.Assertions;
4949
import org.elasticsearch.ExceptionsHelper;
50-
import org.elasticsearch.Version;
5150
import org.elasticsearch.action.index.IndexRequest;
5251
import org.elasticsearch.common.Nullable;
5352
import org.elasticsearch.common.SuppressForbidden;
@@ -146,6 +145,10 @@ public class InternalEngine extends Engine {
146145
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
147146
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
148147
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
148+
// max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine.
149+
// An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
150+
// The value of this marker never goes backwards, and is tracked/updated differently on primary and replica.
151+
private final AtomicLong maxSeqNoOfUpdatesOrDeletes;
149152
private final CounterMetric numVersionLookups = new CounterMetric();
150153
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
151154
// Lucene operations since this engine was opened - not include operations from existing segments.
@@ -228,6 +231,7 @@ public InternalEngine(EngineConfig engineConfig) {
228231
() -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
229232
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
230233
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
234+
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
231235
success = true;
232236
} finally {
233237
if (success == false) {
@@ -405,7 +409,6 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
405409
flushLock.lock();
406410
try (ReleasableLock lock = readLock.acquire()) {
407411
ensureOpen();
408-
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is uninitialized";
409412
if (pendingTranslogRecovery.get() == false) {
410413
throw new IllegalStateException("Engine has already been recovered");
411414
}
@@ -874,7 +877,7 @@ public IndexResult index(Index index) throws IOException {
874877

875878
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
876879
if (toAppend == false) {
877-
advanceMaxSeqNoOfUpdatesOrDeletes(index.seqNo());
880+
advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo());
878881
}
879882
} else {
880883
markSeqNoAsSeen(index.seqNo());
@@ -981,7 +984,6 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw
981984

982985
protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
983986
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
984-
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
985987
final IndexingStrategy plan;
986988
// resolve an external operation into an internal one which is safe to replay
987989
if (canOptimizeAddDocument(index)) {
@@ -1322,7 +1324,6 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) {
13221324

13231325
protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
13241326
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
1325-
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
13261327
// resolve operation from external to internal
13271328
final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
13281329
assert incrementVersionLookup();
@@ -2718,13 +2719,22 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
27182719
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
27192720
}
27202721

2722+
@Override
2723+
public long getMaxSeqNoOfUpdatesOrDeletes() {
2724+
return maxSeqNoOfUpdatesOrDeletes.get();
2725+
}
2726+
2727+
@Override
2728+
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
2729+
if (maxSeqNoOfUpdatesOnPrimary == SequenceNumbers.UNASSIGNED_SEQ_NO) {
2730+
assert false : "max_seq_no_of_updates on primary is unassigned";
2731+
throw new IllegalArgumentException("max_seq_no_of_updates on primary is unassigned");
2732+
}
2733+
this.maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, maxSeqNoOfUpdatesOnPrimary));
2734+
}
2735+
27212736
private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean allowDeleted, boolean relaxIfGapInSeqNo) {
27222737
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
2723-
// If the primary is on an old version which does not replicate msu, we need to relax this assertion for that.
2724-
if (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO) {
2725-
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_5_0);
2726-
return true;
2727-
}
27282738
// We treat a delete on the tombstones on replicas as a regular document, then use updateDocument (not addDocument).
27292739
if (allowDeleted) {
27302740
final VersionValue versionValue = versionMap.getVersionForAssert(id.bytes());
@@ -2742,12 +2752,6 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
27422752
return true;
27432753
}
27442754

2745-
@Override
2746-
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
2747-
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
2748-
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
2749-
}
2750-
27512755
private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOException {
27522756
final Store store = engineConfig.getStore();
27532757
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -456,11 +456,6 @@ public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
456456

457457
}
458458

459-
@Override
460-
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
461-
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
462-
}
463-
464459
protected void processReaders(IndexReader reader, IndexReader previousReader) {
465460
searcherFactory.processReaders(reader, previousReader);
466461
}
@@ -487,4 +482,15 @@ public Translog.Operation next() {
487482
}
488483
};
489484
}
485+
486+
@Override
487+
public long getMaxSeqNoOfUpdatesOrDeletes() {
488+
return seqNoStats.getMaxSeqNo();
489+
}
490+
491+
@Override
492+
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
493+
assert maxSeqNoOfUpdatesOnPrimary <= getMaxSeqNoOfUpdatesOrDeletes() :
494+
maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes();
495+
}
490496
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -532,14 +532,6 @@ public void updateShardState(final ShardRouting newRouting,
532532
* the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes.
533533
*/
534534
final Engine engine = getEngine();
535-
if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) {
536-
// If the old primary was on an old version that did not replicate the msu,
537-
// we need to bootstrap it manually from its local history.
538-
assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
539-
engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
540-
}
541-
// in case we previously reset engine, we need to forward MSU before replaying translog.
542-
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
543535
engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
544536
runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
545537
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
@@ -1411,9 +1403,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
14111403
translogRecoveryStats::incrementRecoveredOperations);
14121404
};
14131405
innerOpenEngineAndTranslog();
1414-
final Engine engine = getEngine();
1415-
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
1416-
engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
1406+
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
14171407
}
14181408

14191409
/**
@@ -2206,12 +2196,6 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
22062196
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
22072197
synchronized (mutex) {
22082198
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
2209-
if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) {
2210-
// If the old primary was on an old version that did not replicate the msu,
2211-
// we need to bootstrap it manually from its local history.
2212-
assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
2213-
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
2214-
}
22152199
}
22162200
}
22172201

@@ -3138,7 +3122,6 @@ public void close() throws IOException {
31383122
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig()));
31393123
onNewEngine(newEngineReference.get());
31403124
}
3141-
newEngineReference.get().advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint);
31423125
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
31433126
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
31443127
// TODO: add a dedicate recovery stats for the reset translog
@@ -3185,11 +3168,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
31853168
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, long, ActionListener)
31863169
*/
31873170
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
3188-
assert seqNo != UNASSIGNED_SEQ_NO
3189-
|| getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO :
3190-
"replica has max_seq_no_of_updates=" + getMaxSeqNoOfUpdatesOrDeletes() + " but primary does not";
31913171
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
3192-
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;
31933172
}
31943173

31953174
/**

0 commit comments

Comments
 (0)