Skip to content

Commit d225bf9

Browse files
committed
CCR: Optimize indexing ops using seq_no on followers (#34099)
This change introduces the indexing optimization using sequence numbers in the FollowingEngine. This optimization uses the max_seq_no_updates which is tracked on the primary of the leader and replicated to replicas and followers. Relates #33656
1 parent a89b9de commit d225bf9

File tree

8 files changed

+368
-66
lines changed

8 files changed

+368
-66
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,6 +1804,33 @@ public interface TranslogRecoveryRunner {
18041804
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
18051805
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
18061806
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
1807+
* <p>
1808+
* A note on the optimization using max_seq_no_of_updates_or_deletes:
1809+
* For each operation O, the key invariants are:
1810+
* <ol>
1811+
* <li> I1: There is no operation on docID(O) with seqno that is {@literal > MSU(O) and < seqno(O)} </li>
1812+
* <li> I2: If {@literal MSU(O) < seqno(O)} then docID(O) did not exist when O was applied; more precisely, if there is any O'
1813+
* with {@literal seqno(O') < seqno(O) and docID(O') = docID(O)} then the one with the greatest seqno is a delete.</li>
1814+
* </ol>
1815+
* <p>
1816+
* When a receiving shard (either a replica or a follower) receives an operation O, it must first ensure its own MSU at least MSU(O),
1817+
* and then compares its MSU to its local checkpoint (LCP). If {@literal LCP < MSU} then there's a gap: there may be some operations
1818+
* that act on docID(O) about which we do not yet know, so we cannot perform an add. Note this also covers the case where a future
1819+
* operation O' with {@literal seqNo(O') > seqNo(O) and docId(O') = docID(O)} is processed before O. In that case MSU(O') is at least
1820+
* seqno(O') and this means {@literal MSU >= seqNo(O') > seqNo(O) > LCP} (because O wasn't processed yet).
1821+
* <p>
1822+
* However, if {@literal MSU <= LCP} then there is no gap: we have processed every {@literal operation <= LCP}, and no operation O'
1823+
* with {@literal seqno(O') > LCP and seqno(O') < seqno(O) also has docID(O') = docID(O)}, because such an operation would have
1824+
* {@literal seqno(O') > LCP >= MSU >= MSU(O)} which contradicts the first invariant. Furthermore in this case we immediately know
1825+
* that docID(O) has been deleted (or never existed) without needing to check Lucene for the following reason. If there's no earlier
1826+
* operation on docID(O) then this is clear, so suppose instead that the preceding operation on docID(O) is O':
1827+
* 1. The first invariant above tells us that {@literal seqno(O') <= MSU(O) <= LCP} so we have already applied O' to Lucene.
1828+
* 2. Also {@literal MSU(O) <= MSU <= LCP < seqno(O)} (we discard O if {@literal seqno(O) <= LCP}) so the second invariant applies,
1829+
* meaning that the O' was a delete.
1830+
* <p>
1831+
* Therefore, if {@literal MSU <= LCP < seqno(O)} we know that O can safely be optimized with and added to lucene with addDocument.
1832+
* Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates
1833+
* in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed.
18071834
*
18081835
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
18091836
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
929929
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
930930
*/
931931
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
932-
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
932+
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo(), 1L);
933933
} else {
934934
if (appendOnlyRequest == false) {
935935
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
@@ -993,7 +993,7 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
993993
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
994994
versionMap.enforceSafeAccess();
995995
} else {
996-
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
996+
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index), 1L);
997997
}
998998
} else {
999999
versionMap.enforceSafeAccess();
@@ -1148,8 +1148,8 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda
11481148
Optional.of(earlyResultOnPreFlightError);
11491149
}
11501150

1151-
static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
1152-
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null);
1151+
public static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing, long versionForIndexing) {
1152+
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, versionForIndexing, null);
11531153
}
11541154

11551155
static IndexingStrategy skipDueToVersionConflict(
@@ -1170,7 +1170,8 @@ static IndexingStrategy overrideExistingAsIfNotThere(
11701170
return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null);
11711171
}
11721172

1173-
static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) {
1173+
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing,
1174+
long versionForIndexing) {
11741175
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null);
11751176
}
11761177

@@ -2411,6 +2412,16 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
24112412
localCheckpointTracker.waitForOpsToComplete(seqNo);
24122413
}
24132414

2415+
/**
2416+
* Checks if the given operation has been processed in this engine or not.
2417+
* @return true if the given operation was processed; otherwise false.
2418+
*/
2419+
protected final boolean hasBeenProcessedBefore(Operation op) {
2420+
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
2421+
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
2422+
return localCheckpointTracker.contains(op.seqNo());
2423+
}
2424+
24142425
@Override
24152426
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
24162427
return localCheckpointTracker.getStats(globalCheckpoint);

server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ Releasable acquireLock(BytesRef uid) {
470470
return keyedLock.acquire(uid);
471471
}
472472

473-
private boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
473+
boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
474474
assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]";
475475
return true;
476476
}

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -575,11 +575,11 @@ protected static BytesArray bytesArray(String string) {
575575
return new BytesArray(string.getBytes(Charset.defaultCharset()));
576576
}
577577

578-
protected static Term newUid(String id) {
578+
public static Term newUid(String id) {
579579
return new Term("_id", Uid.encodeId(id));
580580
}
581581

582-
protected Term newUid(ParsedDocument doc) {
582+
public static Term newUid(ParsedDocument doc) {
583583
return newUid(doc.id());
584584
}
585585

@@ -656,7 +656,7 @@ public static List<Engine.Operation> generateSingleDocHistory(
656656
throw new UnsupportedOperationException("unknown version type: " + versionType);
657657
}
658658
if (randomBoolean()) {
659-
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null),
659+
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), SOURCE, null),
660660
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
661661
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
662662
version,
@@ -747,7 +747,7 @@ public static void assertOpsOnReplica(
747747
}
748748
}
749749

750-
protected void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
750+
public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine engine) throws InterruptedException {
751751
Thread[] thread = new Thread[randomIntBetween(3, 5)];
752752
CountDownLatch startGun = new CountDownLatch(thread.length);
753753
AtomicInteger offset = new AtomicInteger(-1);
@@ -890,7 +890,7 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
890890
}
891891
}
892892

893-
protected MapperService createMapperService(String type) throws IOException {
893+
public static MapperService createMapperService(String type) throws IOException {
894894
IndexMetaData indexMetaData = IndexMetaData.builder("test")
895895
.settings(Settings.builder()
896896
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.ccr.index.engine;
77

8+
import org.elasticsearch.common.metrics.CounterMetric;
89
import org.elasticsearch.index.VersionType;
910
import org.elasticsearch.index.engine.EngineConfig;
1011
import org.elasticsearch.index.engine.InternalEngine;
@@ -18,6 +19,8 @@
1819
*/
1920
public final class FollowingEngine extends InternalEngine {
2021

22+
private final CounterMetric numOfOptimizedIndexing = new CounterMetric();
23+
2124
/**
2225
* Construct a new following engine with the specified engine configuration.
2326
*
@@ -49,7 +52,20 @@ private void preFlight(final Operation operation) {
4952
@Override
5053
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
5154
preFlight(index);
52-
return planIndexingAsNonPrimary(index);
55+
// NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
56+
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
57+
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
58+
if (hasBeenProcessedBefore(index)) {
59+
return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
60+
61+
} else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) {
62+
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]";
63+
numOfOptimizedIndexing.inc();
64+
return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.seqNo(), index.version());
65+
66+
} else {
67+
return planIndexingAsNonPrimary(index);
68+
}
5369
}
5470

5571
@Override
@@ -83,4 +99,11 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
8399
return true;
84100
}
85101

102+
/**
103+
* Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine.
104+
* This metric is not persisted, and started from 0 when the engine is opened.
105+
*/
106+
public long getNumberOfOptimizedIndexing() {
107+
return numOfOptimizedIndexing.count();
108+
}
86109
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.index.engine.Engine;
4242
import org.elasticsearch.index.seqno.SequenceNumbers;
4343
import org.elasticsearch.index.shard.IndexShard;
44+
import org.elasticsearch.index.shard.IndexShardTestCase;
4445
import org.elasticsearch.index.shard.ShardId;
4546
import org.elasticsearch.index.translog.Translog;
4647
import org.elasticsearch.indices.IndicesService;
@@ -52,6 +53,7 @@
5253
import org.elasticsearch.test.discovery.TestZenDiscovery;
5354
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
5455
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
56+
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
5557
import org.elasticsearch.xpack.core.XPackSettings;
5658
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
5759
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
@@ -210,7 +212,7 @@ public void testFollowIndex() throws Exception {
210212
for (int i = 0; i < firstBatchNumDocs; i++) {
211213
assertBusy(assertExpectedDocumentRunnable(i));
212214
}
213-
215+
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
214216
unfollowIndex("index2");
215217
client().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
216218
final int secondBatchNumDocs = randomIntBetween(2, 64);
@@ -234,6 +236,7 @@ public void testFollowIndex() throws Exception {
234236
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
235237
assertBusy(assertExpectedDocumentRunnable(i));
236238
}
239+
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs + secondBatchNumDocs);
237240
unfollowIndex("index2");
238241
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfPrimaryShards);
239242
}
@@ -347,6 +350,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
347350
assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true));
348351

349352
assertSameDocCount("index1", "index2");
353+
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfShards,
354+
client().prepareSearch("index2").get().getHits().totalHits);
350355
unfollowIndex("index2");
351356
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards);
352357
}
@@ -436,6 +441,7 @@ public void testFollowIndexWithNestedField() throws Exception {
436441
}
437442
unfollowIndex("index2");
438443
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
444+
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), 1, numDocs);
439445
}
440446

441447
public void testUnfollowNonExistingIndex() {
@@ -473,7 +479,7 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
473479
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
474480
ensureYellow("index1");
475481

476-
final int numDocs = 1024;
482+
final int numDocs = between(10, 1024);
477483
logger.info("Indexing [{}] docs", numDocs);
478484
for (int i = 0; i < numDocs; i++) {
479485
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
@@ -499,6 +505,7 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
499505
}
500506
unfollowIndex("index2");
501507
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
508+
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), 1, numDocs);
502509
}
503510

504511
public void testDontFollowTheWrongIndex() throws Exception {
@@ -871,6 +878,27 @@ private void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index follo
871878
});
872879
}
873880

881+
private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception {
882+
assertBusy(() -> {
883+
long[] numOfOptimizedOps = new long[numberOfShards];
884+
for (int shardId = 0; shardId < numberOfShards; shardId++) {
885+
for (String node : internalCluster().nodesInclude(followerIndex.getName())) {
886+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
887+
IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
888+
if (shard != null) {
889+
try {
890+
FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard));
891+
numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing();
892+
} catch (AlreadyClosedException e) {
893+
throw new AssertionError(e); // causes assertBusy to retry
894+
}
895+
}
896+
}
897+
}
898+
assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal));
899+
});
900+
}
901+
874902
public static PutFollowAction.Request follow(String leaderIndex, String followerIndex) {
875903
return new PutFollowAction.Request(resumeFollow(leaderIndex, followerIndex));
876904
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
3131
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
3232
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
33+
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
3334
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
3435

3536
import java.io.IOException;
@@ -72,6 +73,9 @@ public void testSimpleCcrReplication() throws Exception {
7273
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
7374
followerGroup.assertAllEqual(indexedDocIds.size());
7475
});
76+
for (IndexShard shard : followerGroup) {
77+
assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount));
78+
}
7579
// Deletes should be replicated to the follower
7680
List<String> deleteDocIds = randomSubsetOf(indexedDocIds);
7781
for (String deleteId : deleteDocIds) {

0 commit comments

Comments
 (0)