Skip to content

Commit ad61398

Browse files
authored
CCR: Optimize indexing ops using seq_no on followers (#34099)
This change introduces the indexing optimization using sequence numbers in the FollowingEngine. This optimization uses the max_seq_no_updates which is tracked on the primary of the leader and replicated to replicas and followers. Relates #33656
1 parent 47cbae9 commit ad61398

File tree

8 files changed

+367
-66
lines changed

8 files changed

+367
-66
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1834,6 +1834,33 @@ public interface TranslogRecoveryRunner {
18341834
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
18351835
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
18361836
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
1837+
* <p>
1838+
* A note on the optimization using max_seq_no_of_updates_or_deletes:
1839+
* For each operation O, the key invariants are:
1840+
* <ol>
1841+
* <li> I1: There is no operation on docID(O) with seqno that is {@literal > MSU(O) and < seqno(O)} </li>
1842+
* <li> I2: If {@literal MSU(O) < seqno(O)} then docID(O) did not exist when O was applied; more precisely, if there is any O'
1843+
* with {@literal seqno(O') < seqno(O) and docID(O') = docID(O)} then the one with the greatest seqno is a delete.</li>
1844+
* </ol>
1845+
* <p>
1846+
* When a receiving shard (either a replica or a follower) receives an operation O, it must first ensure its own MSU at least MSU(O),
1847+
* and then compares its MSU to its local checkpoint (LCP). If {@literal LCP < MSU} then there's a gap: there may be some operations
1848+
* that act on docID(O) about which we do not yet know, so we cannot perform an add. Note this also covers the case where a future
1849+
* operation O' with {@literal seqNo(O') > seqNo(O) and docId(O') = docID(O)} is processed before O. In that case MSU(O') is at least
1850+
* seqno(O') and this means {@literal MSU >= seqNo(O') > seqNo(O) > LCP} (because O wasn't processed yet).
1851+
* <p>
1852+
* However, if {@literal MSU <= LCP} then there is no gap: we have processed every {@literal operation <= LCP}, and no operation O'
1853+
* with {@literal seqno(O') > LCP and seqno(O') < seqno(O) also has docID(O') = docID(O)}, because such an operation would have
1854+
* {@literal seqno(O') > LCP >= MSU >= MSU(O)} which contradicts the first invariant. Furthermore in this case we immediately know
1855+
* that docID(O) has been deleted (or never existed) without needing to check Lucene for the following reason. If there's no earlier
1856+
* operation on docID(O) then this is clear, so suppose instead that the preceding operation on docID(O) is O':
1857+
* 1. The first invariant above tells us that {@literal seqno(O') <= MSU(O) <= LCP} so we have already applied O' to Lucene.
1858+
* 2. Also {@literal MSU(O) <= MSU <= LCP < seqno(O)} (we discard O if {@literal seqno(O) <= LCP}) so the second invariant applies,
1859+
* meaning that the O' was a delete.
1860+
* <p>
1861+
* Therefore, if {@literal MSU <= LCP < seqno(O)} we know that O can safely be optimized with and added to lucene with addDocument.
1862+
* Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates
1863+
* in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed.
18371864
*
18381865
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
18391866
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
875875
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
876876
*/
877877
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
878-
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
878+
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo(), 1L);
879879
} else {
880880
if (appendOnlyRequest == false) {
881881
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
@@ -927,7 +927,7 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
927927
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
928928
versionMap.enforceSafeAccess();
929929
} else {
930-
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
930+
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index), 1L);
931931
}
932932
} else {
933933
versionMap.enforceSafeAccess();
@@ -1082,8 +1082,8 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda
10821082
Optional.of(earlyResultOnPreFlightError);
10831083
}
10841084

1085-
static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
1086-
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null);
1085+
public static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing, long versionForIndexing) {
1086+
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, versionForIndexing, null);
10871087
}
10881088

10891089
static IndexingStrategy skipDueToVersionConflict(
@@ -1104,7 +1104,8 @@ static IndexingStrategy overrideExistingAsIfNotThere(
11041104
return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null);
11051105
}
11061106

1107-
static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) {
1107+
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing,
1108+
long versionForIndexing) {
11081109
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null);
11091110
}
11101111

@@ -2331,6 +2332,16 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
23312332
localCheckpointTracker.waitForOpsToComplete(seqNo);
23322333
}
23332334

2335+
/**
2336+
* Checks if the given operation has been processed in this engine or not.
2337+
* @return true if the given operation was processed; otherwise false.
2338+
*/
2339+
protected final boolean hasBeenProcessedBefore(Operation op) {
2340+
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
2341+
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
2342+
return localCheckpointTracker.contains(op.seqNo());
2343+
}
2344+
23342345
@Override
23352346
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
23362347
return localCheckpointTracker.getStats(globalCheckpoint);

server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ Releasable acquireLock(BytesRef uid) {
470470
return keyedLock.acquire(uid);
471471
}
472472

473-
private boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
473+
boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
474474
assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]";
475475
return true;
476476
}

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -575,11 +575,11 @@ protected static BytesArray bytesArray(String string) {
575575
return new BytesArray(string.getBytes(Charset.defaultCharset()));
576576
}
577577

578-
protected static Term newUid(String id) {
578+
public static Term newUid(String id) {
579579
return new Term("_id", Uid.encodeId(id));
580580
}
581581

582-
protected Term newUid(ParsedDocument doc) {
582+
public static Term newUid(ParsedDocument doc) {
583583
return newUid(doc.id());
584584
}
585585

@@ -643,7 +643,7 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
643643
throw new UnsupportedOperationException("unknown version type: " + versionType);
644644
}
645645
if (randomBoolean()) {
646-
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null),
646+
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), SOURCE, null),
647647
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
648648
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
649649
version,
@@ -734,7 +734,7 @@ public static void assertOpsOnReplica(
734734
}
735735
}
736736

737-
protected void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
737+
public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
738738
Thread[] thread = new Thread[randomIntBetween(3, 5)];
739739
CountDownLatch startGun = new CountDownLatch(thread.length);
740740
AtomicInteger offset = new AtomicInteger(-1);
@@ -877,7 +877,7 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
877877
}
878878
}
879879

880-
protected MapperService createMapperService(String type) throws IOException {
880+
public static MapperService createMapperService(String type) throws IOException {
881881
IndexMetaData indexMetaData = IndexMetaData.builder("test")
882882
.settings(Settings.builder()
883883
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.ccr.index.engine;
77

8+
import org.elasticsearch.common.metrics.CounterMetric;
89
import org.elasticsearch.index.VersionType;
910
import org.elasticsearch.index.engine.EngineConfig;
1011
import org.elasticsearch.index.engine.InternalEngine;
@@ -18,6 +19,8 @@
1819
*/
1920
public final class FollowingEngine extends InternalEngine {
2021

22+
private final CounterMetric numOfOptimizedIndexing = new CounterMetric();
23+
2124
/**
2225
* Construct a new following engine with the specified engine configuration.
2326
*
@@ -51,7 +54,20 @@ private void preFlight(final Operation operation) {
5154
@Override
5255
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
5356
preFlight(index);
54-
return planIndexingAsNonPrimary(index);
57+
// NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
58+
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
59+
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
60+
if (hasBeenProcessedBefore(index)) {
61+
return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
62+
63+
} else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) {
64+
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]";
65+
numOfOptimizedIndexing.inc();
66+
return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.seqNo(), index.version());
67+
68+
} else {
69+
return planIndexingAsNonPrimary(index);
70+
}
5571
}
5672

5773
@Override
@@ -85,4 +101,11 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
85101
return true;
86102
}
87103

104+
/**
105+
* Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine.
106+
* This metric is not persisted, and started from 0 when the engine is opened.
107+
*/
108+
public long getNumberOfOptimizedIndexing() {
109+
return numOfOptimizedIndexing.count();
110+
}
88111
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.index.engine.Engine;
4141
import org.elasticsearch.index.seqno.SequenceNumbers;
4242
import org.elasticsearch.index.shard.IndexShard;
43+
import org.elasticsearch.index.shard.IndexShardTestCase;
4344
import org.elasticsearch.index.shard.ShardId;
4445
import org.elasticsearch.index.translog.Translog;
4546
import org.elasticsearch.indices.IndicesService;
@@ -52,6 +53,7 @@
5253
import org.elasticsearch.test.discovery.TestZenDiscovery;
5354
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
5455
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
56+
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
5557
import org.elasticsearch.xpack.core.XPackSettings;
5658
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
5759
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
@@ -210,7 +212,7 @@ public void testFollowIndex() throws Exception {
210212
for (int i = 0; i < firstBatchNumDocs; i++) {
211213
assertBusy(assertExpectedDocumentRunnable(i));
212214
}
213-
215+
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
214216
unfollowIndex("index2");
215217
client().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
216218
final int secondBatchNumDocs = randomIntBetween(2, 64);
@@ -234,6 +236,7 @@ public void testFollowIndex() throws Exception {
234236
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
235237
assertBusy(assertExpectedDocumentRunnable(i));
236238
}
239+
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs + secondBatchNumDocs);
237240
unfollowIndex("index2");
238241
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfPrimaryShards);
239242
}
@@ -347,6 +350,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
347350
assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true));
348351

349352
assertSameDocCount("index1", "index2");
353+
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfShards,
354+
client().prepareSearch("index2").get().getHits().totalHits);
350355
unfollowIndex("index2");
351356
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards);
352357
}
@@ -436,6 +441,7 @@ public void testFollowIndexWithNestedField() throws Exception {
436441
}
437442
unfollowIndex("index2");
438443
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
444+
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), 1, numDocs);
439445
}
440446

441447
public void testUnfollowNonExistingIndex() {
@@ -473,7 +479,7 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
473479
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
474480
ensureYellow("index1");
475481

476-
final int numDocs = 1024;
482+
final int numDocs = between(10, 1024);
477483
logger.info("Indexing [{}] docs", numDocs);
478484
for (int i = 0; i < numDocs; i++) {
479485
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
@@ -499,6 +505,7 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
499505
}
500506
unfollowIndex("index2");
501507
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
508+
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), 1, numDocs);
502509
}
503510

504511
public void testDontFollowTheWrongIndex() throws Exception {
@@ -871,6 +878,27 @@ private void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index follo
871878
});
872879
}
873880

881+
private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception {
882+
assertBusy(() -> {
883+
long[] numOfOptimizedOps = new long[numberOfShards];
884+
for (int shardId = 0; shardId < numberOfShards; shardId++) {
885+
for (String node : internalCluster().nodesInclude(followerIndex.getName())) {
886+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
887+
IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
888+
if (shard != null) {
889+
try {
890+
FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard));
891+
numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing();
892+
} catch (AlreadyClosedException e) {
893+
throw new AssertionError(e); // causes assertBusy to retry
894+
}
895+
}
896+
}
897+
}
898+
assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal));
899+
});
900+
}
901+
874902
public static PutFollowAction.Request follow(String leaderIndex, String followerIndex) {
875903
return new PutFollowAction.Request(resumeFollow(leaderIndex, followerIndex));
876904
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
3131
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
3232
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
33+
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
3334
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
3435

3536
import java.io.IOException;
@@ -72,6 +73,9 @@ public void testSimpleCcrReplication() throws Exception {
7273
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
7374
followerGroup.assertAllEqual(indexedDocIds.size());
7475
});
76+
for (IndexShard shard : followerGroup) {
77+
assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount));
78+
}
7579
// Deletes should be replicated to the follower
7680
List<String> deleteDocIds = randomSubsetOf(indexedDocIds);
7781
for (String deleteId : deleteDocIds) {

0 commit comments

Comments
 (0)