Skip to content

Commit 4a6b8c5

Browse files
authored
Relax history check in ShardFollowTaskReplicationTests (#39162)
The follower won't always have the same history as the leader for its soft-deletes retention can be different. However, if some operation exists on the history of the follower, then the same operation must exist on the leader. This change relaxes the history check in ShardFollowTaskReplicationTests. Closes #39093
1 parent 8d9b391 commit 4a6b8c5

File tree

2 files changed

+8
-7
lines changed

2 files changed

+8
-7
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResp
7070
request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
7171
}
7272

73-
static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) {
73+
public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) {
7474
final Translog.Operation operationWithPrimaryTerm;
7575
switch (operation.opType()) {
7676
case INDEX:

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@
6060
import java.util.ArrayList;
6161
import java.util.Arrays;
6262
import java.util.Collections;
63-
import java.util.HashSet;
63+
import java.util.HashMap;
6464
import java.util.List;
65+
import java.util.Map;
6566
import java.util.Set;
6667
import java.util.concurrent.Future;
6768
import java.util.concurrent.atomic.AtomicBoolean;
@@ -559,11 +560,11 @@ private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup le
559560
boolean assertMaxSeqNoOfUpdatesOrDeletes) throws Exception {
560561
final List<Tuple<String, Long>> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream()
561562
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
562-
final Set<Tuple<Long, Translog.Operation.Type>> operationsOnLeader = new HashSet<>();
563+
final Map<Long, Translog.Operation> operationsOnLeader = new HashMap<>();
563564
try (Translog.Snapshot snapshot = leader.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
564565
Translog.Operation op;
565566
while ((op = snapshot.next()) != null) {
566-
operationsOnLeader.add(Tuple.tuple(op.seqNo(), op.opType()));
567+
operationsOnLeader.put(op.seqNo(), op);
567568
}
568569
}
569570
for (IndexShard followingShard : follower) {
@@ -573,14 +574,14 @@ private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup le
573574
List<Tuple<String, Long>> docAndSeqNosOnFollower = getDocIdAndSeqNos(followingShard).stream()
574575
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
575576
assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader));
576-
final Set<Tuple<Long, Translog.Operation.Type>> operationsOnFollower = new HashSet<>();
577577
try (Translog.Snapshot snapshot = followingShard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
578578
Translog.Operation op;
579579
while ((op = snapshot.next()) != null) {
580-
operationsOnFollower.add(Tuple.tuple(op.seqNo(), op.opType()));
580+
Translog.Operation leaderOp = operationsOnLeader.get(op.seqNo());
581+
assertThat(TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm(op, leaderOp.primaryTerm()),
582+
equalTo(leaderOp));
581583
}
582584
}
583-
assertThat(followingShard.routingEntry().toString(), operationsOnFollower, equalTo(operationsOnLeader));
584585
}
585586
}
586587

0 commit comments

Comments
 (0)