Skip to content

Commit b6b596e

Browse files
authored
[CCR] Add random shard follow task test (#32188)
Added shard follow task unit tests that tests whether the shard follow task is able to process randomly generated shard changes api responses.
1 parent 8ed3624 commit b6b596e

File tree

1 file changed

+281
-0
lines changed

1 file changed

+281
-0
lines changed
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ccr.action;
7+
8+
import org.elasticsearch.action.UnavailableShardsException;
9+
import org.elasticsearch.common.UUIDs;
10+
import org.elasticsearch.common.unit.TimeValue;
11+
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
12+
import org.elasticsearch.index.shard.ShardId;
13+
import org.elasticsearch.index.translog.Translog;
14+
import org.elasticsearch.test.ESTestCase;
15+
import org.elasticsearch.threadpool.TestThreadPool;
16+
import org.elasticsearch.threadpool.ThreadPool;
17+
18+
import java.nio.charset.StandardCharsets;
19+
import java.util.ArrayList;
20+
import java.util.Arrays;
21+
import java.util.Collections;
22+
import java.util.Comparator;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Objects;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.function.BiConsumer;
29+
import java.util.function.Consumer;
30+
import java.util.function.LongConsumer;
31+
import java.util.stream.Collectors;
32+
33+
import static org.hamcrest.Matchers.equalTo;
34+
35+
public class ShardFollowNodeTaskRandomTests extends ESTestCase {
36+
37+
public void testSingleReaderWriter() throws Exception {
38+
TestRun testRun = createTestRun(randomNonNegativeLong(), randomNonNegativeLong(), randomIntBetween(1, 2048));
39+
ShardFollowNodeTask task = createShardFollowTask(1, testRun);
40+
startAndAssertAndStopTask(task, testRun);
41+
}
42+
43+
public void testMultipleReaderWriter() throws Exception {
44+
int concurrency = randomIntBetween(2, 8);
45+
TestRun testRun = createTestRun(0, 0, 1024);
46+
ShardFollowNodeTask task = createShardFollowTask(concurrency, testRun);
47+
startAndAssertAndStopTask(task, testRun);
48+
}
49+
50+
private void startAndAssertAndStopTask(ShardFollowNodeTask task, TestRun testRun) throws Exception {
51+
task.start(testRun.startSeqNo - 1, testRun.startSeqNo - 1);
52+
assertBusy(() -> {
53+
ShardFollowNodeTask.Status status = task.getStatus();
54+
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
55+
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
56+
assertThat(status.getIndexMetadataVersion(), equalTo(testRun.finalIndexMetaDataVerion));
57+
});
58+
59+
task.markAsCompleted();
60+
assertBusy(() -> {
61+
ShardFollowNodeTask.Status status = task.getStatus();
62+
assertThat(status.getNumberOfConcurrentReads(), equalTo(0));
63+
assertThat(status.getNumberOfConcurrentWrites(), equalTo(0));
64+
});
65+
}
66+
67+
private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testRun) {
68+
AtomicBoolean stopped = new AtomicBoolean(false);
69+
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
70+
new ShardId("leader_index", "", 0), testRun.maxOperationCount, concurrency,
71+
ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, concurrency, 10240,
72+
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap());
73+
74+
ThreadPool threadPool = new TestThreadPool(getClass().getSimpleName());
75+
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> {
76+
assert delay.millis() < 100 : "The delay should be kept to a minimum, so that this test does not take to long to run";
77+
threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
78+
};
79+
List<Translog.Operation> receivedOperations = Collections.synchronizedList(new ArrayList<>());
80+
LocalCheckpointTracker tracker = new LocalCheckpointTracker(testRun.startSeqNo - 1, testRun.startSeqNo - 1);
81+
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
82+
83+
private volatile long indexMetadataVersion = 0L;
84+
private final Map<Long, Integer> fromToSlot = new HashMap<>();
85+
86+
@Override
87+
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
88+
handler.accept(indexMetadataVersion);
89+
}
90+
91+
@Override
92+
protected void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, LongConsumer handler,
93+
Consumer<Exception> errorHandler) {
94+
for(Translog.Operation op : operations) {
95+
tracker.markSeqNoAsCompleted(op.seqNo());
96+
}
97+
receivedOperations.addAll(operations);
98+
99+
// Emulate network thread and avoid SO:
100+
threadPool.generic().execute(() -> handler.accept(tracker.getCheckpoint()));
101+
}
102+
103+
@Override
104+
protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
105+
Consumer<Exception> errorHandler) {
106+
107+
// Emulate network thread and avoid SO:
108+
Runnable task = () -> {
109+
List<TestResponse> items = testRun.responses.get(from);
110+
if (items != null) {
111+
final TestResponse testResponse;
112+
synchronized (fromToSlot) {
113+
int slot;
114+
if (fromToSlot.get(from) == null) {
115+
slot = fromToSlot.getOrDefault(from, 0);
116+
fromToSlot.put(from, slot);
117+
} else {
118+
slot = fromToSlot.get(from);
119+
}
120+
testResponse = items.get(slot);
121+
fromToSlot.put(from, ++slot);
122+
// if too many invocations occur with the same from then AOBE occurs, this ok and then something is wrong.
123+
}
124+
indexMetadataVersion = testResponse.indexMetadataVersion;
125+
if (testResponse.exception != null) {
126+
errorHandler.accept(testResponse.exception);
127+
} else {
128+
handler.accept(testResponse.response);
129+
}
130+
} else {
131+
assert from >= testRun.finalExpectedGlobalCheckpoint;
132+
handler.accept(new ShardChangesAction.Response(0L, tracker.getCheckpoint(), new Translog.Operation[0]));
133+
}
134+
};
135+
threadPool.generic().execute(task);
136+
}
137+
138+
@Override
139+
protected boolean isStopped() {
140+
return stopped.get();
141+
}
142+
143+
@Override
144+
public void markAsCompleted() {
145+
stopped.set(true);
146+
tearDown();
147+
}
148+
149+
@Override
150+
public void markAsFailed(Exception e) {
151+
stopped.set(true);
152+
tearDown();
153+
}
154+
155+
private void tearDown() {
156+
threadPool.shutdown();
157+
List<Translog.Operation> expectedOperations = testRun.responses.values().stream()
158+
.flatMap(List::stream)
159+
.map(testResponse -> testResponse.response)
160+
.filter(Objects::nonNull)
161+
.flatMap(response -> Arrays.stream(response.getOperations()))
162+
.sorted(Comparator.comparingLong(Translog.Operation::seqNo))
163+
.collect(Collectors.toList());
164+
assertThat(receivedOperations.size(), equalTo(expectedOperations.size()));
165+
receivedOperations.sort(Comparator.comparingLong(Translog.Operation::seqNo));
166+
for (int i = 0; i < receivedOperations.size(); i++) {
167+
Translog.Operation actual = receivedOperations.get(i);
168+
Translog.Operation expected = expectedOperations.get(i);
169+
assertThat(actual, equalTo(expected));
170+
}
171+
}
172+
};
173+
}
174+
175+
private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVersion, int maxOperationCount) {
176+
long prevGlobalCheckpoint = startSeqNo;
177+
long indexMetaDataVersion = startIndexMetadataVersion;
178+
int numResponses = randomIntBetween(16, 256);
179+
Map<Long, List<TestResponse>> responses = new HashMap<>(numResponses);
180+
for (int i = 0; i < numResponses; i++) {
181+
long nextGlobalCheckPoint = prevGlobalCheckpoint + maxOperationCount;
182+
if (sometimes()) {
183+
indexMetaDataVersion++;
184+
}
185+
186+
if (sometimes()) {
187+
List<TestResponse> item = new ArrayList<>();
188+
// Sometimes add a random retryable error
189+
if (sometimes()) {
190+
Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), "");
191+
item.add(new TestResponse(error, indexMetaDataVersion, null));
192+
}
193+
List<Translog.Operation> ops = new ArrayList<>();
194+
for (long seqNo = prevGlobalCheckpoint; seqNo <= nextGlobalCheckPoint; seqNo++) {
195+
String id = UUIDs.randomBase64UUID();
196+
byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
197+
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
198+
}
199+
item.add(new TestResponse(null, indexMetaDataVersion,
200+
new ShardChangesAction.Response(indexMetaDataVersion, nextGlobalCheckPoint, ops.toArray(EMPTY))));
201+
responses.put(prevGlobalCheckpoint, item);
202+
} else {
203+
// Simulates a leader shard copy not having all the operations the shard follow task thinks it has by
204+
// splitting up a response into multiple responses AND simulates maxBatchSizeInBytes limit being reached:
205+
long toSeqNo;
206+
for (long fromSeqNo = prevGlobalCheckpoint; fromSeqNo <= nextGlobalCheckPoint; fromSeqNo = toSeqNo + 1) {
207+
toSeqNo = randomLongBetween(fromSeqNo, nextGlobalCheckPoint);
208+
List<TestResponse> item = new ArrayList<>();
209+
// Sometimes add a random retryable error
210+
if (sometimes()) {
211+
Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), "");
212+
item.add(new TestResponse(error, indexMetaDataVersion, null));
213+
}
214+
// Sometimes add an empty shard changes response to also simulate a leader shard lagging behind
215+
if (sometimes()) {
216+
ShardChangesAction.Response response =
217+
new ShardChangesAction.Response(indexMetaDataVersion, prevGlobalCheckpoint, EMPTY);
218+
item.add(new TestResponse(null, indexMetaDataVersion, response));
219+
}
220+
List<Translog.Operation> ops = new ArrayList<>();
221+
for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) {
222+
String id = UUIDs.randomBase64UUID();
223+
byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
224+
ops.add(new Translog.Index("doc", id, seqNo, 0, source));
225+
}
226+
// Report toSeqNo to simulate maxBatchSizeInBytes limit being met or last op to simulate a shard lagging behind:
227+
long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo;
228+
ShardChangesAction.Response response = new ShardChangesAction.Response(indexMetaDataVersion,
229+
localLeaderGCP, ops.toArray(EMPTY));
230+
item.add(new TestResponse(null, indexMetaDataVersion, response));
231+
responses.put(fromSeqNo, Collections.unmodifiableList(item));
232+
}
233+
}
234+
prevGlobalCheckpoint = nextGlobalCheckPoint + 1;
235+
}
236+
return new TestRun(maxOperationCount, startSeqNo, startIndexMetadataVersion, indexMetaDataVersion,
237+
prevGlobalCheckpoint - 1, responses);
238+
}
239+
240+
// Instead of rarely(), which returns true very rarely especially not running in nightly mode or a multiplier have not been set
241+
private static boolean sometimes() {
242+
return randomIntBetween(0, 10) == 5;
243+
}
244+
245+
private static class TestRun {
246+
247+
final int maxOperationCount;
248+
final long startSeqNo;
249+
final long startIndexMetadataVersion;
250+
251+
final long finalIndexMetaDataVerion;
252+
final long finalExpectedGlobalCheckpoint;
253+
final Map<Long, List<TestResponse>> responses;
254+
255+
private TestRun(int maxOperationCount, long startSeqNo, long startIndexMetadataVersion, long finalIndexMetaDataVerion,
256+
long finalExpectedGlobalCheckpoint, Map<Long, List<TestResponse>> responses) {
257+
this.maxOperationCount = maxOperationCount;
258+
this.startSeqNo = startSeqNo;
259+
this.startIndexMetadataVersion = startIndexMetadataVersion;
260+
this.finalIndexMetaDataVerion = finalIndexMetaDataVerion;
261+
this.finalExpectedGlobalCheckpoint = finalExpectedGlobalCheckpoint;
262+
this.responses = Collections.unmodifiableMap(responses);
263+
}
264+
}
265+
266+
private static class TestResponse {
267+
268+
final Exception exception;
269+
final long indexMetadataVersion;
270+
final ShardChangesAction.Response response;
271+
272+
private TestResponse(Exception exception, long indexMetadataVersion, ShardChangesAction.Response response) {
273+
this.exception = exception;
274+
this.indexMetadataVersion = indexMetadataVersion;
275+
this.response = response;
276+
}
277+
}
278+
279+
private static final Translog.Operation[] EMPTY = new Translog.Operation[0];
280+
281+
}

0 commit comments

Comments
 (0)