Skip to content

Commit 5d262c6

Browse files
committed
Relax maxSeqNoOfUpdates assertion in FollowingEngine (#47188)
We disable MSU optimization if the local checkpoint is smaller than max_seq_no_of_updates. Hence, we need to relax the MSU assertion in FollowingEngine for that scenario. Suppose the leader has three operations: index-0, delete-1, and index-2 for the same doc Id. MSU on the leader is 1 as index-2 is an append. If the follower applies index-0 then index-2, then the assertion is violated. Closes #47137
1 parent ba84d99 commit 5d262c6

File tree

3 files changed

+24
-9
lines changed

3 files changed

+24
-9
lines changed

test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2424
import org.elasticsearch.common.xcontent.XContentHelper;
2525
import org.elasticsearch.index.IndexSettings;
26+
import org.elasticsearch.index.VersionType;
2627
import org.elasticsearch.index.analysis.AnalysisRegistry;
2728
import org.elasticsearch.index.analysis.AnalyzerScope;
2829
import org.elasticsearch.index.analysis.IndexAnalyzers;
@@ -117,21 +118,24 @@ public int run(Engine engine, Translog.Snapshot snapshot) throws IOException {
117118
return opsRecovered;
118119
}
119120

120-
private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
121+
public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
122+
// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
123+
final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;
121124
switch (operation.opType()) {
122125
case INDEX:
123126
final Translog.Index index = (Translog.Index) operation;
124127
final String indexName = mapperService.index().getName();
125128
final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
126-
mapperService.getIndexSettings().getIndexVersionCreated(),
127-
new SourceToParse(indexName, index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source()),
128-
index.routing()), index.seqNo(), index.primaryTerm(),
129-
index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
129+
mapperService.getIndexSettings().getIndexVersionCreated(),
130+
new SourceToParse(indexName, index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source()),
131+
index.routing()), index.seqNo(), index.primaryTerm(), index.version(), versionType, origin,
132+
index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
130133
return engineIndex;
131134
case DELETE:
132135
final Translog.Delete delete = (Translog.Delete) operation;
133136
final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
134-
delete.primaryTerm(), delete.version(), null, origin, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
137+
delete.primaryTerm(), delete.version(), versionType, origin, System.nanoTime(),
138+
SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
135139
return engineDelete;
136140
case NO_OP:
137141
final Translog.NoOp noOp = (Translog.NoOp) operation;

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.lucene.search.IndexSearcher;
1717
import org.apache.lucene.search.Query;
1818
import org.apache.lucene.search.TopDocs;
19+
import org.elasticsearch.Assertions;
1920
import org.elasticsearch.ElasticsearchStatusException;
2021
import org.elasticsearch.common.lucene.Lucene;
2122
import org.elasticsearch.common.metrics.CounterMetric;
@@ -133,7 +134,12 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) {
133134

134135
@Override
135136
protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) {
136-
assert getMaxSeqNoOfUpdatesOrDeletes() >= seqNo : seqNo + " < " + getMaxSeqNoOfUpdatesOrDeletes();
137+
if (Assertions.ENABLED) {
138+
final long localCheckpoint = getProcessedLocalCheckpoint();
139+
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
140+
assert localCheckpoint < maxSeqNoOfUpdates || maxSeqNoOfUpdates >= seqNo :
141+
"maxSeqNoOfUpdates is not advanced local_checkpoint=" + localCheckpoint + " msu=" + maxSeqNoOfUpdates + " seq_no=" + seqNo;
142+
}
137143
super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); // extra safe in production code
138144
}
139145

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ private void runFollowTest(CheckedBiConsumer<InternalEngine, FollowingEngine, Ex
478478
for (Thread thread : threads) {
479479
thread.join();
480480
}
481-
assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
481+
assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
482482
assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
483483
}
484484
};
@@ -527,7 +527,12 @@ private void fetchOperations(AtomicBoolean stopped, AtomicLong lastFetchedSeqNo,
527527
try (Translog.Snapshot snapshot =
528528
shuffleSnapshot(leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true))) {
529529
follower.advanceMaxSeqNoOfUpdatesOrDeletes(leader.getMaxSeqNoOfUpdatesOrDeletes());
530-
translogHandler.run(follower, snapshot);
530+
Translog.Operation op;
531+
while ((op = snapshot.next()) != null) {
532+
EngineTestCase.applyOperation(follower,
533+
translogHandler.convertToEngineOp(op, randomFrom(Engine.Operation.Origin.values())));
534+
}
535+
follower.syncTranslog();
531536
}
532537
}
533538
}

0 commit comments

Comments
 (0)