|
23 | 23 | import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
24 | 24 | import org.elasticsearch.common.xcontent.XContentHelper;
|
25 | 25 | import org.elasticsearch.index.IndexSettings;
|
| 26 | +import org.elasticsearch.index.VersionType; |
26 | 27 | import org.elasticsearch.index.analysis.AnalysisRegistry;
|
27 | 28 | import org.elasticsearch.index.analysis.AnalyzerScope;
|
28 | 29 | import org.elasticsearch.index.analysis.IndexAnalyzers;
|
@@ -117,20 +118,23 @@ public int run(Engine engine, Translog.Snapshot snapshot) throws IOException {
|
117 | 118 | return opsRecovered;
|
118 | 119 | }
|
119 | 120 |
|
120 |
| - private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { |
| 121 | + public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { |
| 122 | + // If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type. |
| 123 | + final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null; |
121 | 124 | switch (operation.opType()) {
|
122 | 125 | case INDEX:
|
123 | 126 | final Translog.Index index = (Translog.Index) operation;
|
124 | 127 | final String indexName = mapperService.index().getName();
|
125 | 128 | final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
|
126 | 129 | new SourceToParse(indexName, index.type(), index.id(), index.source(), XContentHelper.xContentType(index.source()),
|
127 |
| - index.routing()), index.seqNo(), index.primaryTerm(), |
128 |
| - index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0); |
| 130 | + index.routing()), index.seqNo(), index.primaryTerm(), index.version(), versionType, origin, |
| 131 | + index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); |
129 | 132 | return engineIndex;
|
130 | 133 | case DELETE:
|
131 | 134 | final Translog.Delete delete = (Translog.Delete) operation;
|
132 | 135 | final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
|
133 |
| - delete.primaryTerm(), delete.version(), null, origin, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0); |
| 136 | + delete.primaryTerm(), delete.version(), versionType, origin, System.nanoTime(), |
| 137 | + SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); |
134 | 138 | return engineDelete;
|
135 | 139 | case NO_OP:
|
136 | 140 | final Translog.NoOp noOp = (Translog.NoOp) operation;
|
|
0 commit comments