Skip to content

Commit ac654cb

Browse files
authored
Follow engine should not fill gaps upon promotion and recovery (#31751)
Closes #31318
1 parent 05b4517 commit ac654cb

File tree

3 files changed

+152
-44
lines changed

3 files changed

+152
-44
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java

+6
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Del
5858
return planDeletionAsNonPrimary(delete);
5959
}
6060

61+
@Override
62+
public int fillSeqNoGaps(long primaryTerm) throws IOException {
63+
// a noop implementation, because follow shard does not own the history but the leader shard does.
64+
return 0;
65+
}
66+
6167
@Override
6268
protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
6369
// sequence number should be set when operation origin is primary
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ccr.index.engine;
7+
8+
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.action.index.IndexRequest;
10+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
11+
import org.elasticsearch.cluster.routing.ShardRouting;
12+
import org.elasticsearch.cluster.routing.ShardRoutingState;
13+
import org.elasticsearch.common.bytes.BytesArray;
14+
import org.elasticsearch.common.lease.Releasable;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.common.xcontent.XContentType;
17+
import org.elasticsearch.index.VersionType;
18+
import org.elasticsearch.index.mapper.SourceToParse;
19+
import org.elasticsearch.index.shard.IndexShard;
20+
import org.elasticsearch.index.shard.IndexShardTestCase;
21+
import org.elasticsearch.threadpool.ThreadPool;
22+
import org.elasticsearch.xpack.ccr.CcrSettings;
23+
24+
import java.util.Collections;
25+
import java.util.concurrent.CountDownLatch;
26+
27+
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
28+
import static org.hamcrest.Matchers.equalTo;
29+
30+
public class FollowEngineIndexShardTests extends IndexShardTestCase {
31+
32+
public void testDoNotFillGaps() throws Exception {
33+
Settings settings = Settings.builder()
34+
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
35+
.build();
36+
final IndexShard indexShard = newStartedShard(false, settings, new FollowingEngineFactory());
37+
38+
long seqNo = -1;
39+
for (int i = 0; i < 8; i++) {
40+
final String id = Long.toString(i);
41+
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", id,
42+
new BytesArray("{}"), XContentType.JSON);
43+
indexShard.applyIndexOperationOnReplica(++seqNo,
44+
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
45+
}
46+
long seqNoBeforeGap = seqNo;
47+
seqNo += 8;
48+
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "_doc", "9",
49+
new BytesArray("{}"), XContentType.JSON);
50+
indexShard.applyIndexOperationOnReplica(seqNo,
51+
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);
52+
53+
// promote the replica to primary:
54+
final ShardRouting replicaRouting = indexShard.routingEntry();
55+
final ShardRouting primaryRouting =
56+
newShardRouting(
57+
replicaRouting.shardId(),
58+
replicaRouting.currentNodeId(),
59+
null,
60+
true,
61+
ShardRoutingState.STARTED,
62+
replicaRouting.allocationId());
63+
indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {},
64+
0L, Collections.singleton(primaryRouting.allocationId().getId()),
65+
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());
66+
67+
final CountDownLatch latch = new CountDownLatch(1);
68+
ActionListener<Releasable> actionListener = ActionListener.wrap(releasable -> {
69+
releasable.close();
70+
latch.countDown();
71+
}, e -> {assert false : "expected no exception, but got [" + e.getMessage() + "]";});
72+
indexShard.acquirePrimaryOperationPermit(actionListener, ThreadPool.Names.GENERIC, "");
73+
latch.await();
74+
assertThat(indexShard.getLocalCheckpoint(), equalTo(seqNoBeforeGap));
75+
indexShard.refresh("test");
76+
assertThat(indexShard.docStats().getCount(), equalTo(9L));
77+
closeShards(indexShard);
78+
}
79+
80+
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

+66-44
Original file line numberDiff line numberDiff line change
@@ -150,50 +150,7 @@ public void runIndexTest(
150150
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
151151
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
152152
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
153-
final String id = "id";
154-
final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
155-
final String type = "type";
156-
final Field versionField = new NumericDocValuesField("_version", 0);
157-
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
158-
final ParseContext.Document document = new ParseContext.Document();
159-
document.add(uidField);
160-
document.add(versionField);
161-
document.add(seqID.seqNo);
162-
document.add(seqID.seqNoDocValue);
163-
document.add(seqID.primaryTerm);
164-
final BytesReference source = new BytesArray(new byte[]{1});
165-
final ParsedDocument parsedDocument = new ParsedDocument(
166-
versionField,
167-
seqID,
168-
id,
169-
type,
170-
"routing",
171-
Collections.singletonList(document),
172-
source,
173-
XContentType.JSON,
174-
null);
175-
176-
final long version;
177-
final long autoGeneratedIdTimestamp;
178-
if (randomBoolean()) {
179-
version = 1;
180-
autoGeneratedIdTimestamp = System.currentTimeMillis();
181-
} else {
182-
version = randomNonNegativeLong();
183-
autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
184-
}
185-
final Engine.Index index = new Engine.Index(
186-
new Term("_id", parsedDocument.id()),
187-
parsedDocument,
188-
seqNo,
189-
primaryTerm.get(),
190-
version,
191-
VersionType.EXTERNAL,
192-
origin,
193-
System.currentTimeMillis(),
194-
autoGeneratedIdTimestamp,
195-
randomBoolean());
196-
153+
final Engine.Index index = createIndexOp("id", seqNo, origin);
197154
consumer.accept(followingEngine, index);
198155
}
199156
}
@@ -243,6 +200,26 @@ public void runDeleteTest(
243200
}
244201
}
245202

203+
public void testDoNotFillSeqNoGaps() throws Exception {
204+
final Settings settings =
205+
Settings.builder()
206+
.put("index.number_of_shards", 1)
207+
.put("index.number_of_replicas", 0)
208+
.put("index.version.created", Version.CURRENT)
209+
.put("index.xpack.ccr.following_index", true)
210+
.build();
211+
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
212+
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
213+
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
214+
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
215+
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
216+
followingEngine.index(createIndexOp("id", 128, Engine.Operation.Origin.PRIMARY));
217+
int addedNoops = followingEngine.fillSeqNoGaps(primaryTerm.get());
218+
assertThat(addedNoops, equalTo(0));
219+
}
220+
}
221+
}
222+
246223
private EngineConfig engineConfig(
247224
final ShardId shardId,
248225
final IndexSettings indexSettings,
@@ -307,4 +284,49 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO
307284
return followingEngine;
308285
}
309286

287+
private Engine.Index createIndexOp(String id, long seqNo, Engine.Operation.Origin origin) {
288+
final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
289+
final String type = "type";
290+
final Field versionField = new NumericDocValuesField("_version", 0);
291+
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
292+
final ParseContext.Document document = new ParseContext.Document();
293+
document.add(uidField);
294+
document.add(versionField);
295+
document.add(seqID.seqNo);
296+
document.add(seqID.seqNoDocValue);
297+
document.add(seqID.primaryTerm);
298+
final BytesReference source = new BytesArray(new byte[]{1});
299+
final ParsedDocument parsedDocument = new ParsedDocument(
300+
versionField,
301+
seqID,
302+
id,
303+
type,
304+
"routing",
305+
Collections.singletonList(document),
306+
source,
307+
XContentType.JSON,
308+
null);
309+
310+
final long version;
311+
final long autoGeneratedIdTimestamp;
312+
if (randomBoolean()) {
313+
version = 1;
314+
autoGeneratedIdTimestamp = System.currentTimeMillis();
315+
} else {
316+
version = randomNonNegativeLong();
317+
autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
318+
}
319+
return new Engine.Index(
320+
new Term("_id", parsedDocument.id()),
321+
parsedDocument,
322+
seqNo,
323+
primaryTerm.get(),
324+
version,
325+
VersionType.EXTERNAL,
326+
origin,
327+
System.currentTimeMillis(),
328+
autoGeneratedIdTimestamp,
329+
randomBoolean());
330+
}
331+
310332
}

0 commit comments

Comments
 (0)