Skip to content

Commit a3f3974

Browse files
committed
Adjust log and unmute testFailOverOnFollower (#38762)
There were two documents (seq=2 and seq=103) missing on the follower in one of the failures of `testFailOverOnFollower`. I spent several hours on that failure but could not figure out the reason. I adjust log and unmute this test so we can collect more information. Relates #38633
1 parent 4a5070d commit a3f3974

File tree

4 files changed

+40
-23
lines changed

4 files changed

+40
-23
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,8 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
779779
try {
780780
if (logger.isTraceEnabled()) {
781781
// don't use index.source().utf8ToString() here source might not be valid UTF-8
782-
logger.trace("index [{}][{}] (seq# [{}])", index.type(), index.id(), index.seqNo());
782+
logger.trace("index [{}][{}] seq# [{}] allocation-id {}",
783+
index.type(), index.id(), index.seqNo(), routingEntry().allocationId());
783784
}
784785
result = engine.index(index);
785786
} catch (Exception e) {

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -997,8 +997,9 @@ public static List<DocIdSeqNoAndTerm> getDocIds(Engine engine, boolean refresh)
997997
}
998998
}
999999
}
1000-
docs.sort(Comparator.comparing(DocIdSeqNoAndTerm::getId)
1001-
.thenComparingLong(DocIdSeqNoAndTerm::getSeqNo).thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm));
1000+
docs.sort(Comparator.comparingLong(DocIdSeqNoAndTerm::getSeqNo)
1001+
.thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm)
1002+
.thenComparing((DocIdSeqNoAndTerm::getId)));
10021003
return docs;
10031004
}
10041005
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.network.NetworkModule;
3939
import org.elasticsearch.common.settings.Settings;
4040
import org.elasticsearch.common.unit.TimeValue;
41+
import org.elasticsearch.common.util.set.Sets;
4142
import org.elasticsearch.common.xcontent.XContentBuilder;
4243
import org.elasticsearch.core.internal.io.IOUtils;
4344
import org.elasticsearch.env.NodeEnvironment;
@@ -451,8 +452,18 @@ protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String f
451452
logger.info("--> asserting <<docId,seqNo>> between {} and {}", leaderIndex, followerIndex);
452453
assertBusy(() -> {
453454
Map<Integer, List<DocIdSeqNoAndTerm>> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex);
454-
logger.info("--> docs on the follower {}", docsOnFollower);
455-
assertThat(docsOnFollower, equalTo(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex)));
455+
Map<Integer, List<DocIdSeqNoAndTerm>> docsOnLeader = getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex);
456+
Map<Integer, Set<DocIdSeqNoAndTerm>> mismatchedDocs = new HashMap<>();
457+
for (Map.Entry<Integer, List<DocIdSeqNoAndTerm>> fe : docsOnFollower.entrySet()) {
458+
Set<DocIdSeqNoAndTerm> d1 = Sets.difference(
459+
Sets.newHashSet(fe.getValue()), Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())));
460+
Set<DocIdSeqNoAndTerm> d2 = Sets.difference(
461+
Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())), Sets.newHashSet(fe.getValue()));
462+
if (d1.isEmpty() == false || d2.isEmpty() == false) {
463+
mismatchedDocs.put(fe.getKey(), Sets.union(d1, d2));
464+
}
465+
}
466+
assertThat("mismatched documents [" + mismatchedDocs + "]", docsOnFollower, equalTo(docsOnLeader));
456467
}, 120, TimeUnit.SECONDS);
457468

458469
logger.info("--> asserting seq_no_stats between {} and {}", leaderIndex, followerIndex);
@@ -481,13 +492,15 @@ private Map<Integer, List<DocIdSeqNoAndTerm>> getDocIdAndSeqNos(InternalTestClus
481492
Randomness.shuffle(shardRoutings);
482493
final Map<Integer, List<DocIdSeqNoAndTerm>> docs = new HashMap<>();
483494
for (ShardRouting shardRouting : shardRoutings) {
484-
if (shardRouting == null || shardRouting.assignedToNode() == false || docs.containsKey(shardRouting.shardId().id())) {
495+
if (shardRouting == null || shardRouting.assignedToNode() == false) {
485496
continue;
486497
}
487498
IndexShard indexShard = cluster.getInstance(IndicesService.class, state.nodes().get(shardRouting.currentNodeId()).getName())
488499
.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id());
489500
try {
490-
docs.put(shardRouting.shardId().id(), IndexShardTestCase.getDocIdAndSeqNos(indexShard).stream()
501+
final List<DocIdSeqNoAndTerm> docsOnShard = IndexShardTestCase.getDocIdAndSeqNos(indexShard);
502+
logger.info("--> shard {} docs {} seq_no_stats {}", shardRouting, docsOnShard, indexShard.seqNoStats());
503+
docs.put(shardRouting.shardId().id(), docsOnShard.stream()
491504
// normalize primary term as the follower use its own term
492505
.map(d -> new DocIdSeqNoAndTerm(d.getId(), d.getSeqNo(), 1L))
493506
.collect(Collectors.toList()));

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java

+18-16
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,24 @@
4545
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4646
import static org.hamcrest.Matchers.equalTo;
4747

48-
@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.index.shard:DEBUG")
48+
@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.xpack.ccr.action.ShardChangesAction:DEBUG,"
49+
+ "org.elasticsearch.index.shard:TRACE")
4950
public class FollowerFailOverIT extends CcrIntegTestCase {
5051

5152
@Override
5253
protected boolean reuseClusters() {
5354
return false;
5455
}
5556

56-
@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/38633")
5757
public void testFailOverOnFollower() throws Exception {
58+
final String leaderIndex = "leader_test_failover";
59+
final String followerIndex = "follower_test_failover";
5860
int numberOfReplicas = between(1, 2);
5961
getFollowerCluster().startMasterOnlyNode();
6062
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2));
6163
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
6264
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
63-
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
65+
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
6466
AtomicBoolean stopped = new AtomicBoolean();
6567
Thread[] threads = new Thread[between(1, 8)];
6668
AtomicInteger docID = new AtomicInteger();
@@ -77,20 +79,20 @@ public void testFailOverOnFollower() throws Exception {
7779
}
7880
if (frequently()) {
7981
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update
80-
IndexResponse indexResponse = leaderClient().prepareIndex("leader-index", "doc", id)
82+
IndexResponse indexResponse = leaderClient().prepareIndex(leaderIndex, "doc", id)
8183
.setSource("{\"f\":" + id + "}", XContentType.JSON).get();
82-
logger.info("--> index id={} seq_no={}", indexResponse.getId(), indexResponse.getSeqNo());
84+
logger.info("--> index {} id={} seq_no={}", leaderIndex, indexResponse.getId(), indexResponse.getSeqNo());
8385
} else {
8486
String id = Integer.toString(between(0, docID.get()));
85-
DeleteResponse deleteResponse = leaderClient().prepareDelete("leader-index", "doc", id).get();
86-
logger.info("--> delete id={} seq_no={}", deleteResponse.getId(), deleteResponse.getSeqNo());
87+
DeleteResponse deleteResponse = leaderClient().prepareDelete(leaderIndex, "doc", id).get();
88+
logger.info("--> delete {} id={} seq_no={}", leaderIndex, deleteResponse.getId(), deleteResponse.getSeqNo());
8789
}
8890
}
8991
});
9092
threads[i].start();
9193
}
9294
availableDocs.release(between(100, 200));
93-
PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
95+
PutFollowAction.Request follow = putFollow(leaderIndex, followerIndex);
9496
follow.getParameters().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
9597
follow.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
9698
follow.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
@@ -99,27 +101,27 @@ public void testFailOverOnFollower() throws Exception {
99101
follow.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
100102
logger.info("--> follow request {}", Strings.toString(follow));
101103
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
102-
disableDelayedAllocation("follower-index");
103-
ensureFollowerGreen("follower-index");
104-
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(30, 80));
104+
disableDelayedAllocation(followerIndex);
105+
ensureFollowerGreen(followerIndex);
106+
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex(followerIndex), 0), between(30, 80));
105107
final ClusterState clusterState = getFollowerCluster().clusterService().state();
106-
for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) {
108+
for (ShardRouting shardRouting : clusterState.routingTable().allShards(followerIndex)) {
107109
if (shardRouting.primary()) {
108110
DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
109111
getFollowerCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback());
110112
break;
111113
}
112114
}
113115
availableDocs.release(between(50, 200));
114-
ensureFollowerGreen("follower-index");
116+
ensureFollowerGreen(followerIndex);
115117
availableDocs.release(between(50, 200));
116-
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(100, 150));
118+
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex(followerIndex), 0), between(100, 150));
117119
stopped.set(true);
118120
for (Thread thread : threads) {
119121
thread.join();
120122
}
121-
assertIndexFullyReplicatedToFollower("leader-index", "follower-index");
122-
pauseFollow("follower-index");
123+
assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex);
124+
pauseFollow(followerIndex);
123125
}
124126

125127
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33337")

0 commit comments

Comments
 (0)