Skip to content

Commit 9620d7b

Browse files
committed
added test that verifies that we stop and fail the shard follow task
in case of history uuid changed.
1 parent 43fbbee commit 9620d7b

File tree

3 files changed

+58
-3
lines changed

3 files changed

+58
-3
lines changed

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,10 @@ public IndexShard getPrimary() {
443443
return primary;
444444
}
445445

446+
public synchronized void reinitPrimaryShard() throws IOException {
447+
primary = reinitShard(primary);
448+
}
449+
446450
public void syncGlobalCheckpoint() {
447451
PlainActionFuture<ReplicationResponse> listener = new PlainActionFuture<>();
448452
try {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
319319
}
320320
final String historyUUID = indexShard.getHistoryUUID();
321321
if (historyUUID.equals(expectedHistoryUUID) == false) {
322-
throw new IllegalStateException("unexpected history uuid, expected [" + historyUUID + "], actual [" +
323-
expectedHistoryUUID + "]");
322+
throw new IllegalStateException("unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" +
323+
historyUUID + "]");
324324
}
325325
if (fromSeqNo > globalCheckpoint) {
326326
return EMPTY_OPERATIONS_ARRAY;

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
import org.elasticsearch.action.delete.DeleteRequest;
1515
import org.elasticsearch.action.support.replication.TransportWriteAction;
1616
import org.elasticsearch.cluster.metadata.IndexMetaData;
17+
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.routing.ShardRouting;
19+
import org.elasticsearch.cluster.routing.ShardRoutingState;
20+
import org.elasticsearch.cluster.routing.TestShardRouting;
1821
import org.elasticsearch.common.settings.Settings;
1922
import org.elasticsearch.common.unit.TimeValue;
2023
import org.elasticsearch.index.IndexSettings;
@@ -25,6 +28,7 @@
2528
import org.elasticsearch.index.shard.IndexShard;
2629
import org.elasticsearch.index.shard.ShardId;
2730
import org.elasticsearch.index.translog.Translog;
31+
import org.elasticsearch.indices.recovery.RecoveryState;
2832
import org.elasticsearch.threadpool.ThreadPool;
2933
import org.elasticsearch.xpack.ccr.CcrSettings;
3034
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
@@ -38,11 +42,15 @@
3842
import java.util.List;
3943
import java.util.Set;
4044
import java.util.concurrent.atomic.AtomicBoolean;
45+
import java.util.concurrent.atomic.AtomicReference;
4146
import java.util.function.BiConsumer;
4247
import java.util.function.Consumer;
4348
import java.util.function.LongConsumer;
4449

50+
import static java.util.Collections.emptyMap;
51+
import static java.util.Collections.emptySet;
4552
import static org.hamcrest.Matchers.equalTo;
53+
import static org.hamcrest.Matchers.is;
4654
import static org.hamcrest.Matchers.nullValue;
4755

4856
public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase {
@@ -129,6 +137,43 @@ public void testFailLeaderReplicaShard() throws Exception {
129137
}
130138
}
131139

140+
public void testChangeHistoryUUID() throws Exception {
141+
try (ReplicationGroup leaderGroup = createGroup(0);
142+
ReplicationGroup followerGroup = createFollowGroup(0)) {
143+
leaderGroup.startAll();
144+
int docCount = leaderGroup.appendDocs(randomInt(64));
145+
leaderGroup.assertAllEqual(docCount);
146+
followerGroup.startAll();
147+
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
148+
final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
149+
final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
150+
shardFollowTask.start(
151+
leaderSeqNoStats.getGlobalCheckpoint(),
152+
leaderSeqNoStats.getMaxSeqNo(),
153+
followerSeqNoStats.getGlobalCheckpoint(),
154+
followerSeqNoStats.getMaxSeqNo());
155+
leaderGroup.syncGlobalCheckpoint();
156+
leaderGroup.assertAllEqual(docCount);
157+
Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
158+
assertBusy(() -> {
159+
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
160+
followerGroup.assertAllEqual(indexedDocIds.size());
161+
});
162+
163+
String oldHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
164+
leaderGroup.reinitPrimaryShard();
165+
leaderGroup.getPrimary().store().bootstrapNewHistory();
166+
recoverShardFromStore(leaderGroup.getPrimary());
167+
String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
168+
169+
assertBusy(() -> {
170+
assertThat(shardFollowTask.isStopped(), is(true));
171+
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
172+
"], actual [" + newHistoryUUID + "]"));
173+
});
174+
}
175+
}
176+
132177
@Override
133178
protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
134179
Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
@@ -167,14 +212,15 @@ private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup,
167212
between(1, 8),
168213
Long.MAX_VALUE,
169214
between(1, 4), 10240,
170-
TimeValue.timeValueMillis(10),
215+
TimeValue.timeValueMillis(100),
171216
TimeValue.timeValueMillis(10),
172217
leaderGroup.getPrimary().getHistoryUUID(),
173218
Collections.emptyMap()
174219
);
175220

176221
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
177222
AtomicBoolean stopped = new AtomicBoolean(false);
223+
AtomicReference<Exception> failureHolder = new AtomicReference<>();
178224
LongSet fetchOperations = new LongHashSet();
179225
return new ShardFollowNodeTask(
180226
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
@@ -252,9 +298,14 @@ public void markAsCompleted() {
252298

253299
@Override
254300
public void markAsFailed(Exception e) {
301+
failureHolder.set(e);
255302
stopped.set(true);
256303
}
257304

305+
@Override
306+
public Exception getFailure() {
307+
return failureHolder.get();
308+
}
258309
};
259310
}
260311

0 commit comments

Comments
 (0)