Skip to content

Commit 8819029

Browse files
authored
CCR: Fix incorrect read request completion condition (#32266)
Today we consider a read request is exhausted if from_seqno is equal to or greater than the max_required_seqno. However, if we stop when from_seqno equals to the max_required_seqno, we will miss an operation whose seqno is max_required_seqno because we have not seen that operation yet.
1 parent 8e66a93 commit 8819029

File tree

3 files changed

+56
-52
lines changed

3 files changed

+56
-52
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -188,29 +188,28 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res
188188

189189
synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
190190
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
191-
final long newMinRequiredSeqNo;
191+
final long newFromSeqNo;
192192
if (response.getOperations().length == 0) {
193-
newMinRequiredSeqNo = from;
193+
newFromSeqNo = from;
194194
} else {
195195
assert response.getOperations()[0].seqNo() == from :
196196
"first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0];
197197
buffer.addAll(Arrays.asList(response.getOperations()));
198198
final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo();
199199
assert maxSeqNo ==
200200
Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong();
201-
newMinRequiredSeqNo = maxSeqNo + 1;
201+
newFromSeqNo = maxSeqNo + 1;
202202
// update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again.
203203
lastRequestedSeqno = Math.max(lastRequestedSeqno, maxSeqNo);
204204
assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno +
205205
"] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]";
206206
coordinateWrites();
207207
}
208-
209-
if (newMinRequiredSeqNo < maxRequiredSeqNo && isStopped() == false) {
210-
int newSize = (int) (maxRequiredSeqNo - newMinRequiredSeqNo) + 1;
208+
if (newFromSeqNo <= maxRequiredSeqNo && isStopped() == false) {
209+
int newSize = Math.toIntExact(maxRequiredSeqNo - newFromSeqNo + 1);
211210
LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...",
212-
params.getFollowShardId(), response.getOperations().length, newMinRequiredSeqNo, maxRequiredSeqNo);
213-
sendShardChangesRequest(newMinRequiredSeqNo, newSize, maxRequiredSeqNo);
211+
params.getFollowShardId(), response.getOperations().length, newFromSeqNo, maxRequiredSeqNo);
212+
sendShardChangesRequest(newFromSeqNo, newSize, maxRequiredSeqNo);
214213
} else {
215214
// read is completed, decrement
216215
numConcurrentReads--;

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

+43-43
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void testCoordinateReads() {
6262
public void testWriteBuffer() {
6363
// Need to set concurrentWrites to 0, other the write buffer gets flushed immediately:
6464
ShardFollowNodeTask task = createShardFollowTask(64, 1, 0, 32, Long.MAX_VALUE);
65-
startTask(task, 64, -1);
65+
startTask(task, 63, -1);
6666

6767
task.coordinateReads();
6868
assertThat(shardChangesRequests.size(), equalTo(1));
@@ -234,11 +234,11 @@ public void testReceiveNonRetryableError() {
234234

235235
public void testHandleReadResponse() {
236236
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
237-
startTask(task, 64, -1);
237+
startTask(task, 63, -1);
238238

239239
task.coordinateReads();
240-
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
241-
task.innerHandleReadResponse(0L, 64L, response);
240+
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
241+
task.innerHandleReadResponse(0L, 63L, response);
242242

243243
assertThat(bulkShardOperationRequests.size(), equalTo(1));
244244
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
@@ -248,8 +248,8 @@ public void testHandleReadResponse() {
248248
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
249249
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
250250
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
251-
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
252-
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
251+
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
252+
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
253253
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
254254
}
255255

@@ -263,7 +263,7 @@ public void testReceiveLessThanRequested() {
263263
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
264264

265265
shardChangesRequests.clear();
266-
ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L);
266+
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L);
267267
task.innerHandleReadResponse(0L, 64L, response);
268268

269269
assertThat(shardChangesRequests.size(), equalTo(1));
@@ -288,7 +288,7 @@ public void testCancelAndReceiveLessThanRequested() {
288288

289289
shardChangesRequests.clear();
290290
task.markAsCompleted();
291-
ShardChangesAction.Response response = generateShardChangesResponse(0, 32, 0L, 31L);
291+
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L);
292292
task.innerHandleReadResponse(0L, 64L, response);
293293

294294
assertThat(shardChangesRequests.size(), equalTo(0));
@@ -310,8 +310,8 @@ public void testReceiveNothingExpectedSomething() {
310310
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
311311

312312
shardChangesRequests.clear();
313-
ShardChangesAction.Response response = generateShardChangesResponse(0, 0, 0L, 0L);
314-
task.innerHandleReadResponse(0L, 64L, response);
313+
task.innerHandleReadResponse(0L, 64L,
314+
new ShardChangesAction.Response(0, 0, new Translog.Operation[0]));
315315

316316
assertThat(shardChangesRequests.size(), equalTo(1));
317317
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
@@ -331,30 +331,30 @@ public void testDelayCoordinatesRead() {
331331
task.run();
332332
};
333333
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
334-
startTask(task, 64, -1);
334+
startTask(task, 63, -1);
335335

336336
task.coordinateReads();
337337
assertThat(shardChangesRequests.size(), equalTo(1));
338338
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
339339
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
340340

341341
shardChangesRequests.clear();
342-
ShardChangesAction.Response response = generateShardChangesResponse(0, 65, 0L, 64L);
342+
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
343343
// Also invokes coordinateReads()
344-
task.innerHandleReadResponse(0L, 64L, response);
345-
response = generateShardChangesResponse(0, 0, 0L, 64L);
346-
task.innerHandleReadResponse(65L, 64L, response);
344+
task.innerHandleReadResponse(0L, 63L, response);
345+
task.innerHandleReadResponse(64L, 63L,
346+
new ShardChangesAction.Response(0, 63L, new Translog.Operation[0]));
347347
assertThat(counter[0], equalTo(1));
348348
}
349349

350350
public void testMappingUpdate() {
351351
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
352-
startTask(task, 64, -1);
352+
startTask(task, 63, -1);
353353

354354
imdVersions.add(1L);
355355
task.coordinateReads();
356-
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
357-
task.handleReadResponse(0L, 64L, response);
356+
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L);
357+
task.handleReadResponse(0L, 63L, response);
358358

359359
assertThat(bulkShardOperationRequests.size(), equalTo(1));
360360
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
@@ -363,23 +363,23 @@ public void testMappingUpdate() {
363363
assertThat(status.getIndexMetadataVersion(), equalTo(1L));
364364
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
365365
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
366-
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
367-
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
366+
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
367+
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
368368
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
369369
}
370370

371371
public void testMappingUpdateRetryableError() {
372372
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
373-
startTask(task, 64, -1);
373+
startTask(task, 63, -1);
374374

375375
int max = randomIntBetween(1, 10);
376376
for (int i = 0; i < max; i++) {
377377
mappingUpdateFailures.add(new ConnectException());
378378
}
379379
imdVersions.add(1L);
380380
task.coordinateReads();
381-
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
382-
task.handleReadResponse(0L, 64L, response);
381+
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L);
382+
task.handleReadResponse(0L, 63L, response);
383383

384384
assertThat(mappingUpdateFailures.size(), equalTo(0));
385385
assertThat(bulkShardOperationRequests.size(), equalTo(1));
@@ -388,8 +388,8 @@ public void testMappingUpdateRetryableError() {
388388
assertThat(status.getIndexMetadataVersion(), equalTo(1L));
389389
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
390390
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
391-
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
392-
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
391+
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
392+
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
393393

394394
}
395395

@@ -439,25 +439,25 @@ public void testMappingUpdateNonRetryableError() {
439439

440440
public void testCoordinateWrites() {
441441
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
442-
startTask(task, 64, -1);
442+
startTask(task, 63, -1);
443443

444444
task.coordinateReads();
445445
assertThat(shardChangesRequests.size(), equalTo(1));
446446
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
447447
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
448448

449-
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
449+
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
450450
// Also invokes coordinatesWrites()
451-
task.innerHandleReadResponse(0L, 64L, response);
451+
task.innerHandleReadResponse(0L, 63L, response);
452452

453453
assertThat(bulkShardOperationRequests.size(), equalTo(1));
454454
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
455455

456456
ShardFollowNodeTask.Status status = task.getStatus();
457457
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
458458
assertThat(status.getNumberOfConcurrentWrites(), equalTo(1));
459-
assertThat(status.getLastRequestedSeqno(), equalTo(64L));
460-
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(64L));
459+
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
460+
assertThat(status.getLeaderGlobalCheckpoint(), equalTo(63L));
461461
assertThat(status.getFollowerGlobalCheckpoint(), equalTo(-1L));
462462
}
463463

@@ -507,7 +507,7 @@ public void testMaxBatchOperationCount() {
507507

508508
public void testRetryableError() {
509509
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
510-
startTask(task, 64, -1);
510+
startTask(task, 63, -1);
511511

512512
task.coordinateReads();
513513
assertThat(shardChangesRequests.size(), equalTo(1));
@@ -518,9 +518,9 @@ public void testRetryableError() {
518518
for (int i = 0; i < max; i++) {
519519
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
520520
}
521-
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
521+
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
522522
// Also invokes coordinatesWrites()
523-
task.innerHandleReadResponse(0L, 64L, response);
523+
task.innerHandleReadResponse(0L, 63L, response);
524524

525525
// Number of requests is equal to initial request + retried attempts:
526526
assertThat(bulkShardOperationRequests.size(), equalTo(max + 1));
@@ -535,7 +535,7 @@ public void testRetryableError() {
535535

536536
public void testRetryableErrorRetriedTooManyTimes() {
537537
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
538-
startTask(task, 64, -1);
538+
startTask(task, 63, -1);
539539

540540
task.coordinateReads();
541541
assertThat(shardChangesRequests.size(), equalTo(1));
@@ -546,9 +546,9 @@ public void testRetryableErrorRetriedTooManyTimes() {
546546
for (int i = 0; i < max; i++) {
547547
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
548548
}
549-
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
549+
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 643);
550550
// Also invokes coordinatesWrites()
551-
task.innerHandleReadResponse(0L, 64L, response);
551+
task.innerHandleReadResponse(0L, 63L, response);
552552

553553
// Number of requests is equal to initial request + retried attempts:
554554
assertThat(bulkShardOperationRequests.size(), equalTo(11));
@@ -563,17 +563,17 @@ public void testRetryableErrorRetriedTooManyTimes() {
563563

564564
public void testNonRetryableError() {
565565
ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
566-
startTask(task, 64, -1);
566+
startTask(task, 63, -1);
567567

568568
task.coordinateReads();
569569
assertThat(shardChangesRequests.size(), equalTo(1));
570570
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
571571
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
572572

573573
writeFailures.add(new RuntimeException());
574-
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
574+
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
575575
// Also invokes coordinatesWrites()
576-
task.innerHandleReadResponse(0L, 64L, response);
576+
task.innerHandleReadResponse(0L, 63L, response);
577577

578578
assertThat(bulkShardOperationRequests.size(), equalTo(1));
579579
assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
@@ -592,7 +592,7 @@ public void testMaxBatchBytesLimit() {
592592
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
593593
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
594594

595-
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 64L);
595+
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 64L);
596596
// Also invokes coordinatesWrites()
597597
task.innerHandleReadResponse(0L, 64L, response);
598598

@@ -610,7 +610,7 @@ public void testHandleWriteResponse() {
610610

611611
shardChangesRequests.clear();
612612
followerGlobalCheckpoints.add(63L);
613-
ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 63L);
613+
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
614614
// Also invokes coordinatesWrites()
615615
task.innerHandleReadResponse(0L, 63L, response);
616616

@@ -702,10 +702,10 @@ public void markAsFailed(Exception e) {
702702
};
703703
}
704704

705-
private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, int size, long imdVersion,
705+
private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long imdVersion,
706706
long leaderGlobalCheckPoint) {
707707
List<Translog.Operation> ops = new ArrayList<>();
708-
for (long seqNo = fromSeqNo; seqNo < size; seqNo++) {
708+
for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) {
709709
String id = UUIDs.randomBase64UUID();
710710
byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
711711
ops.add(new Translog.Index("doc", id, seqNo, 0, source));

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import java.util.function.Consumer;
3636
import java.util.function.LongConsumer;
3737

38+
import static org.hamcrest.Matchers.equalTo;
39+
3840
public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase {
3941

4042
public void testSimpleCcrReplication() throws Exception {
@@ -51,7 +53,10 @@ public void testSimpleCcrReplication() throws Exception {
5153

5254
leaderGroup.assertAllEqual(docCount);
5355
int expectedCount = docCount;
54-
assertBusy(() -> followerGroup.assertAllEqual(expectedCount));
56+
assertBusy(() -> {
57+
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
58+
followerGroup.assertAllEqual(expectedCount);
59+
});
5560
shardFollowTask.markAsCompleted();
5661
}
5762
}

0 commit comments

Comments
 (0)