Skip to content

Commit c709708

Browse files
committed
Track max seq_no of updates or deletes on primary (#33842)
This PR is the first step to use seq_no to optimize indexing operations. The idea is to track the max seq_no of either update or delete ops on a primary, and transfer this information to replicas, and replicas use it to optimize indexing plan for index operations (with assigned seq_no). The max_seq_no_of_updates on primary is initialized once when a primary finishes its local recovery or peer recovery in relocation or being promoted. After that, the max_seq_no_of_updates is only advanced internally inside an engine when processing update or delete operations. Relates #33656
1 parent 9ca6bc1 commit c709708

File tree

14 files changed

+218
-7
lines changed

14 files changed

+218
-7
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import java.util.concurrent.CountDownLatch;
9898
import java.util.concurrent.TimeUnit;
9999
import java.util.concurrent.atomic.AtomicBoolean;
100+
import java.util.concurrent.atomic.AtomicLong;
100101
import java.util.concurrent.locks.Condition;
101102
import java.util.concurrent.locks.Lock;
102103
import java.util.concurrent.locks.ReentrantLock;
@@ -137,6 +138,16 @@ public abstract class Engine implements Closeable {
137138
*/
138139
protected volatile long lastWriteNanos = System.nanoTime();
139140

141+
/*
142+
* This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine.
143+
* An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
144+
* This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized.
145+
* The value of this marker never goes backwards, and is updated/changed differently on primary and replica:
146+
* 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete.
147+
* 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes).
148+
*/
149+
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
150+
140151
protected Engine(EngineConfig engineConfig) {
141152
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
142153

@@ -1788,4 +1799,31 @@ public long getMaxSeenAutoIdTimestamp() {
17881799
public interface TranslogRecoveryRunner {
17891800
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
17901801
}
1802+
1803+
/**
1804+
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
1805+
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
1806+
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
1807+
*
1808+
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
1809+
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
1810+
*/
1811+
public final long getMaxSeqNoOfUpdatesOrDeletes() {
1812+
return maxSeqNoOfUpdatesOrDeletes.get();
1813+
}
1814+
1815+
/**
1816+
* A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the
1817+
* max_seq_no from Lucene index and translog before replaying the local translog in its local recovery.
1818+
*/
1819+
public abstract void initializeMaxSeqNoOfUpdatesOrDeletes();
1820+
1821+
/**
1822+
* A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method
1823+
* to advance this marker to at least the given sequence number.
1824+
*/
1825+
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
1826+
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo));
1827+
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo;
1828+
}
17911829
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,8 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
389389
flushLock.lock();
390390
try (ReleasableLock lock = readLock.acquire()) {
391391
ensureOpen();
392+
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO ||
393+
engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0) : "max_seq_no_of_updates is uninitialized";
392394
if (pendingTranslogRecovery.get() == false) {
393395
throw new IllegalStateException("Engine has already been recovered");
394396
}
@@ -983,6 +985,7 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw
983985

984986
protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
985987
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
988+
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
986989
final IndexingStrategy plan;
987990
// resolve an external operation into an internal one which is safe to replay
988991
if (canOptimizeAddDocument(index)) {
@@ -1017,6 +1020,10 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
10171020
);
10181021
}
10191022
}
1023+
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
1024+
if (toAppend == false) {
1025+
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing);
1026+
}
10201027
return plan;
10211028
}
10221029

@@ -1321,6 +1328,7 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) {
13211328

13221329
protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
13231330
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
1331+
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
13241332
// resolve operation from external to internal
13251333
final VersionValue versionValue = resolveDocVersion(delete);
13261334
assert incrementVersionLookup();
@@ -1342,6 +1350,7 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE
13421350
currentlyDeleted,
13431351
generateSeqNoForOperation(delete),
13441352
delete.versionType().updateVersion(currentVersion, delete.version()));
1353+
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoOfDeletion);
13451354
}
13461355
return plan;
13471356
}
@@ -2627,4 +2636,12 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
26272636
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
26282637
}
26292638

2639+
2640+
@Override
2641+
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
2642+
assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO :
2643+
"max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]";
2644+
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
2645+
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
2646+
}
26302647
}

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,4 +402,9 @@ public DocsStats docStats() {
402402
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
403403

404404
}
405+
406+
@Override
407+
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
408+
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
409+
}
405410
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,12 @@ public void updateShardState(final ShardRouting newRouting,
531531
*/
532532
engine.rollTranslogGeneration();
533533
engine.fillSeqNoGaps(newPrimaryTerm);
534+
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
535+
// TODO: Enable this assertion after we replicate max_seq_no_updates during replication
536+
// assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) :
537+
// indexSettings.getIndexVersionCreated();
538+
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
539+
}
534540
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
535541
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
536542
@Override
@@ -1356,7 +1362,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
13561362
translogRecoveryStats::incrementRecoveredOperations);
13571363
};
13581364
innerOpenEngineAndTranslog();
1359-
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
1365+
final Engine engine = getEngine();
1366+
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
1367+
engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
13601368
}
13611369

13621370
/**
@@ -1986,6 +1994,13 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
19861994
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
19871995
synchronized (mutex) {
19881996
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
1997+
// If the old primary was on an old version, this primary (was replica before)
1998+
// does not have max_of_updates yet. Thus we need to bootstrap it manually.
1999+
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
2000+
// TODO: Enable this assertion after we replicate max_seq_no_updates during replication
2001+
// assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : indexSettings.getIndexVersionCreated();
2002+
getEngine().initializeMaxSeqNoOfUpdatesOrDeletes();
2003+
}
19892004
}
19902005
}
19912006

@@ -2692,6 +2707,41 @@ void resetEngineToGlobalCheckpoint() throws IOException {
26922707
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
26932708
// TODO: add a dedicate recovery stats for the reset translog
26942709
});
2710+
// TODO: do not use init method here but use advance with the max_seq_no received from the primary
2711+
newEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
26952712
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
26962713
}
2714+
2715+
/**
2716+
* Returns the maximum sequence number of either update or delete operations have been processed in this shard
2717+
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
2718+
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
2719+
* <p>
2720+
* The primary captures this value after executes a replication request, then transfers it to a replica before
2721+
* executing that replication request on a replica.
2722+
*/
2723+
public long getMaxSeqNoOfUpdatesOrDeletes() {
2724+
return getEngine().getMaxSeqNoOfUpdatesOrDeletes();
2725+
}
2726+
2727+
/**
2728+
* A replica calls this method to advance the max_seq_no_of_updates marker of its engine to at least the max_seq_no_of_updates
2729+
* value (piggybacked in a replication request) that it receives from its primary before executing that replication request.
2730+
* The receiving value is at least as high as the max_seq_no_of_updates on the primary was when any of the operations of that
2731+
* replication request were processed on it.
2732+
* <p>
2733+
* A replica shard also calls this method to bootstrap the max_seq_no_of_updates marker with the value that it received from
2734+
* the primary in peer-recovery, before it replays remote translog operations from the primary. The receiving value is at least
2735+
* as high as the max_seq_no_of_updates on the primary was when any of these operations were processed on it.
2736+
* <p>
2737+
* These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value
2738+
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
2739+
*
2740+
* @see #acquireReplicaOperationPermit(long, long, ActionListener, String, Object)
2741+
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long)
2742+
*/
2743+
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
2744+
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
2745+
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;
2746+
}
26972747
}

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.List;
6363
import java.util.Objects;
6464
import java.util.Optional;
65+
import java.util.OptionalLong;
6566
import java.util.concurrent.atomic.AtomicBoolean;
6667
import java.util.concurrent.locks.ReadWriteLock;
6768
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -1877,6 +1878,19 @@ public String getTranslogUUID() {
18771878
return translogUUID;
18781879
}
18791880

1881+
/**
1882+
* Returns the max seq_no of translog operations found in this translog. Since this value is calculated based on the current
1883+
* existing readers, this value is not necessary to be the max seq_no of all operations have been stored in this translog.
1884+
*/
1885+
public long getMaxSeqNo() {
1886+
try (ReleasableLock ignored = readLock.acquire()) {
1887+
ensureOpen();
1888+
final OptionalLong maxSeqNo = Stream.concat(readers.stream(), Stream.of(current))
1889+
.mapToLong(reader -> reader.getCheckpoint().maxSeqNo).max();
1890+
assert maxSeqNo.isPresent() : "must have at least one translog generation";
1891+
return maxSeqNo.getAsLong();
1892+
}
1893+
}
18801894

18811895
TranslogWriter getCurrent() {
18821896
return current;

0 commit comments

Comments
 (0)