Skip to content

Commit 7944a0c

Browse files
authored
Track max seq_no of updates or deletes on primary (#33842)
This PR is the first step to use seq_no to optimize indexing operations. The idea is to track the max seq_no of either update or delete ops on a primary, and transfer this information to replicas, and replicas use it to optimize indexing plan for index operations (with assigned seq_no). The max_seq_no_of_updates on primary is initialized once when a primary finishes its local recovery or peer recovery in relocation or being promoted. After that, the max_seq_no_of_updates is only advanced internally inside an engine when processing update or delete operations. Relates #33656
1 parent 477391d commit 7944a0c

File tree

14 files changed

+217
-7
lines changed

14 files changed

+217
-7
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import java.util.concurrent.CountDownLatch;
9898
import java.util.concurrent.TimeUnit;
9999
import java.util.concurrent.atomic.AtomicBoolean;
100+
import java.util.concurrent.atomic.AtomicLong;
100101
import java.util.concurrent.locks.Condition;
101102
import java.util.concurrent.locks.Lock;
102103
import java.util.concurrent.locks.ReentrantLock;
@@ -136,6 +137,16 @@ public abstract class Engine implements Closeable {
136137
*/
137138
protected volatile long lastWriteNanos = System.nanoTime();
138139

140+
/*
141+
* This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine.
142+
* An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
143+
* This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized.
144+
* The value of this marker never goes backwards, and is updated/changed differently on primary and replica:
145+
* 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete.
146+
* 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes).
147+
*/
148+
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
149+
139150
protected Engine(EngineConfig engineConfig) {
140151
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
141152

@@ -1781,4 +1792,31 @@ public long getMaxSeenAutoIdTimestamp() {
17811792
public interface TranslogRecoveryRunner {
17821793
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
17831794
}
1795+
1796+
/**
1797+
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
1798+
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
1799+
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
1800+
*
1801+
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
1802+
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
1803+
*/
1804+
public final long getMaxSeqNoOfUpdatesOrDeletes() {
1805+
return maxSeqNoOfUpdatesOrDeletes.get();
1806+
}
1807+
1808+
/**
1809+
* A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the
1810+
* max_seq_no from Lucene index and translog before replaying the local translog in its local recovery.
1811+
*/
1812+
public abstract void initializeMaxSeqNoOfUpdatesOrDeletes();
1813+
1814+
/**
1815+
* A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method
1816+
* to advance this marker to at least the given sequence number.
1817+
*/
1818+
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
1819+
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo));
1820+
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo;
1821+
}
17841822
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
385385
flushLock.lock();
386386
try (ReleasableLock lock = readLock.acquire()) {
387387
ensureOpen();
388+
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is uninitialized";
388389
if (pendingTranslogRecovery.get() == false) {
389390
throw new IllegalStateException("Engine has already been recovered");
390391
}
@@ -918,6 +919,7 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw
918919

919920
protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
920921
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
922+
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
921923
final IndexingStrategy plan;
922924
// resolve an external operation into an internal one which is safe to replay
923925
if (canOptimizeAddDocument(index)) {
@@ -952,6 +954,10 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
952954
);
953955
}
954956
}
957+
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
958+
if (toAppend == false) {
959+
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing);
960+
}
955961
return plan;
956962
}
957963

@@ -1242,6 +1248,7 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) {
12421248

12431249
protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
12441250
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
1251+
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
12451252
// resolve operation from external to internal
12461253
final VersionValue versionValue = resolveDocVersion(delete);
12471254
assert incrementVersionLookup();
@@ -1263,6 +1270,7 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE
12631270
currentlyDeleted,
12641271
generateSeqNoForOperation(delete),
12651272
delete.versionType().updateVersion(currentVersion, delete.version()));
1273+
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoOfDeletion);
12661274
}
12671275
return plan;
12681276
}
@@ -2548,4 +2556,12 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
25482556
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
25492557
}
25502558

2559+
2560+
@Override
2561+
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
2562+
assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO :
2563+
"max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]";
2564+
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
2565+
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
2566+
}
25512567
}

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,4 +379,9 @@ public DocsStats docStats() {
379379
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
380380

381381
}
382+
383+
@Override
384+
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
385+
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
386+
}
382387
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,12 @@ public void updateShardState(final ShardRouting newRouting,
511511
*/
512512
engine.rollTranslogGeneration();
513513
engine.fillSeqNoGaps(newPrimaryTerm);
514+
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
515+
// TODO: Enable this assertion after we replicate max_seq_no_updates during replication
516+
// assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) :
517+
// indexSettings.getIndexVersionCreated();
518+
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
519+
}
514520
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
515521
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
516522
@Override
@@ -1321,7 +1327,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
13211327
translogRecoveryStats::incrementRecoveredOperations);
13221328
};
13231329
innerOpenEngineAndTranslog();
1324-
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
1330+
final Engine engine = getEngine();
1331+
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
1332+
engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
13251333
}
13261334

13271335
/**
@@ -1947,6 +1955,13 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
19471955
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
19481956
synchronized (mutex) {
19491957
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
1958+
// If the old primary was on an old version, this primary (was replica before)
1959+
// does not have max_of_updates yet. Thus we need to bootstrap it manually.
1960+
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
1961+
// TODO: Enable this assertion after we replicate max_seq_no_updates during replication
1962+
// assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : indexSettings.getIndexVersionCreated();
1963+
getEngine().initializeMaxSeqNoOfUpdatesOrDeletes();
1964+
}
19501965
}
19511966
}
19521967

@@ -2718,6 +2733,41 @@ void resetEngineToGlobalCheckpoint() throws IOException {
27182733
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
27192734
// TODO: add a dedicate recovery stats for the reset translog
27202735
});
2736+
// TODO: do not use init method here but use advance with the max_seq_no received from the primary
2737+
newEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
27212738
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
27222739
}
2740+
2741+
/**
2742+
* Returns the maximum sequence number of either update or delete operations have been processed in this shard
2743+
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
2744+
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
2745+
* <p>
2746+
* The primary captures this value after executes a replication request, then transfers it to a replica before
2747+
* executing that replication request on a replica.
2748+
*/
2749+
public long getMaxSeqNoOfUpdatesOrDeletes() {
2750+
return getEngine().getMaxSeqNoOfUpdatesOrDeletes();
2751+
}
2752+
2753+
/**
2754+
* A replica calls this method to advance the max_seq_no_of_updates marker of its engine to at least the max_seq_no_of_updates
2755+
* value (piggybacked in a replication request) that it receives from its primary before executing that replication request.
2756+
* The receiving value is at least as high as the max_seq_no_of_updates on the primary was when any of the operations of that
2757+
* replication request were processed on it.
2758+
* <p>
2759+
* A replica shard also calls this method to bootstrap the max_seq_no_of_updates marker with the value that it received from
2760+
* the primary in peer-recovery, before it replays remote translog operations from the primary. The receiving value is at least
2761+
* as high as the max_seq_no_of_updates on the primary was when any of these operations were processed on it.
2762+
* <p>
2763+
* These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value
2764+
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
2765+
*
2766+
* @see #acquireReplicaOperationPermit(long, long, ActionListener, String, Object)
2767+
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long)
2768+
*/
2769+
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
2770+
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
2771+
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;
2772+
}
27232773
}

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.List;
6161
import java.util.Objects;
6262
import java.util.Optional;
63+
import java.util.OptionalLong;
6364
import java.util.concurrent.atomic.AtomicBoolean;
6465
import java.util.concurrent.locks.ReadWriteLock;
6566
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -1825,6 +1826,19 @@ public String getTranslogUUID() {
18251826
return translogUUID;
18261827
}
18271828

1829+
/**
1830+
* Returns the max seq_no of translog operations found in this translog. Since this value is calculated based on the current
1831+
* existing readers, this value is not necessary to be the max seq_no of all operations have been stored in this translog.
1832+
*/
1833+
public long getMaxSeqNo() {
1834+
try (ReleasableLock ignored = readLock.acquire()) {
1835+
ensureOpen();
1836+
final OptionalLong maxSeqNo = Stream.concat(readers.stream(), Stream.of(current))
1837+
.mapToLong(reader -> reader.getCheckpoint().maxSeqNo).max();
1838+
assert maxSeqNo.isPresent() : "must have at least one translog generation";
1839+
return maxSeqNo.getAsLong();
1840+
}
1841+
}
18281842

18291843
TranslogWriter getCurrent() {
18301844
return current;

0 commit comments

Comments
 (0)