Skip to content

Commit 8cfbb64

Browse files
authored
ShardFollowNodeTask should fetch operation once (#32455)
Today ShardFollowNodeTask might fetch some operations more than once. This happens because we ask the leading for up to max_batch_count operations (instead of the left-over size) for the left-over request. The leading then can freely respond up to the max_batch_count, and at the same time, if one of the previous requests completed, we might issue another read request whose range overlaps with the response of the left-over request. Closes #32453
1 parent 1fdc3f0 commit 8cfbb64

File tree

5 files changed

+77
-36
lines changed

5 files changed

+77
-36
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,21 @@ synchronized void coordinateReads() {
103103
params.getFollowShardId(), lastRequestedSeqno, leaderGlobalCheckpoint);
104104
final int maxBatchOperationCount = params.getMaxBatchOperationCount();
105105
while (hasReadBudget() && lastRequestedSeqno < leaderGlobalCheckpoint) {
106+
final long from = lastRequestedSeqno + 1;
107+
final long maxRequiredSeqNo = Math.min(leaderGlobalCheckpoint, from + maxBatchOperationCount - 1);
108+
final int requestBatchCount;
109+
if (numConcurrentReads == 0) {
110+
// This is the only request, we can optimistically fetch more documents if possible but not enforce max_required_seqno.
111+
requestBatchCount = maxBatchOperationCount;
112+
} else {
113+
requestBatchCount = Math.toIntExact(maxRequiredSeqNo - from + 1);
114+
}
115+
assert 0 < requestBatchCount && requestBatchCount <= maxBatchOperationCount : "request_batch_count=" + requestBatchCount;
116+
LOGGER.trace("{}[{} ongoing reads] read from_seqno={} max_required_seqno={} batch_count={}",
117+
params.getFollowShardId(), numConcurrentReads, from, maxRequiredSeqNo, requestBatchCount);
106118
numConcurrentReads++;
107-
long from = lastRequestedSeqno + 1;
108-
// -1 is needed, because maxRequiredSeqno is inclusive
109-
long maxRequiredSeqno = Math.min(leaderGlobalCheckpoint, (from + maxBatchOperationCount) - 1);
110-
LOGGER.trace("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, maxRequiredSeqno, maxBatchOperationCount);
111-
sendShardChangesRequest(from, maxBatchOperationCount, maxRequiredSeqno);
112-
lastRequestedSeqno = maxRequiredSeqno;
119+
sendShardChangesRequest(from, requestBatchCount, maxRequiredSeqNo);
120+
lastRequestedSeqno = maxRequiredSeqNo;
113121
}
114122

115123
if (numConcurrentReads == 0 && hasReadBudget()) {
@@ -186,7 +194,13 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res
186194
maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
187195
}
188196

197+
/** Called when some operations are fetched from the leading */
198+
protected void onOperationsFetched(Translog.Operation[] operations) {
199+
200+
}
201+
189202
synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
203+
onOperationsFetched(response.getOperations());
190204
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
191205
final long newFromSeqNo;
192206
if (response.getOperations().length == 0) {

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
160160

161161
public void testFollowIndex() throws Exception {
162162
final int numberOfPrimaryShards = randomIntBetween(1, 3);
163-
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
163+
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
164164
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
165165
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
166166
ensureYellow("index1");
@@ -218,7 +218,7 @@ public void testFollowIndex() throws Exception {
218218
}
219219

220220
public void testSyncMappings() throws Exception {
221-
final String leaderIndexSettings = getIndexSettings(2,
221+
final String leaderIndexSettings = getIndexSettings(2, between(0, 1),
222222
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
223223
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
224224
ensureYellow("index1");
@@ -255,7 +255,8 @@ public void testSyncMappings() throws Exception {
255255
}
256256

257257
public void testFollowIndex_backlog() throws Exception {
258-
String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
258+
String leaderIndexSettings = getIndexSettings(between(1, 5), between(0, 1),
259+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
259260
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
260261
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
261262
@Override
@@ -306,10 +307,10 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
306307

307308
public void testFollowIndexAndCloseNode() throws Exception {
308309
internalCluster().ensureAtLeastNumDataNodes(3);
309-
String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
310+
String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
310311
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
311312

312-
String followerIndexSettings = getIndexSettings(3, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
313+
String followerIndexSettings = getIndexSettings(3, 1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
313314
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
314315
ensureGreen("index1", "index2");
315316

@@ -366,13 +367,14 @@ public void testFollowIndexAndCloseNode() throws Exception {
366367

367368
public void testFollowIndexWithNestedField() throws Exception {
368369
final String leaderIndexSettings =
369-
getIndexSettingsWithNestedMapping(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
370+
getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
370371
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
371372

372373
final String followerIndexSettings =
373-
getIndexSettingsWithNestedMapping(1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
374+
getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
374375
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
375376

377+
internalCluster().ensureAtLeastNumDataNodes(2);
376378
ensureGreen("index1", "index2");
377379

378380
final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
@@ -455,7 +457,8 @@ public void testValidateFollowingIndexSettings() throws Exception {
455457
}
456458

457459
public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
458-
final String leaderIndexSettings = getIndexSettings(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
460+
final String leaderIndexSettings = getIndexSettings(1, between(0, 1),
461+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
459462
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
460463
ensureYellow("index1");
461464

@@ -554,15 +557,16 @@ private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int valu
554557
};
555558
}
556559

557-
private String getIndexSettings(final int numberOfPrimaryShards, final Map<String, String> additionalIndexSettings) throws IOException {
560+
private String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
561+
final Map<String, String> additionalIndexSettings) throws IOException {
558562
final String settings;
559563
try (XContentBuilder builder = jsonBuilder()) {
560564
builder.startObject();
561565
{
562566
builder.startObject("settings");
563567
{
564-
builder.field("index.number_of_shards", numberOfPrimaryShards);
565-
builder.field("index.number_of_replicas", 1);
568+
builder.field("index.number_of_shards", numberOfShards);
569+
builder.field("index.number_of_replicas", numberOfReplicas);
566570
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
567571
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
568572
}
@@ -592,15 +596,16 @@ private String getIndexSettings(final int numberOfPrimaryShards, final Map<Strin
592596
return settings;
593597
}
594598

595-
private String getIndexSettingsWithNestedMapping(final int numberOfPrimaryShards,
599+
private String getIndexSettingsWithNestedMapping(final int numberOfShards, final int numberOfReplicas,
596600
final Map<String, String> additionalIndexSettings) throws IOException {
597601
final String settings;
598602
try (XContentBuilder builder = jsonBuilder()) {
599603
builder.startObject();
600604
{
601605
builder.startObject("settings");
602606
{
603-
builder.field("index.number_of_shards", numberOfPrimaryShards);
607+
builder.field("index.number_of_shards", numberOfShards);
608+
builder.field("index.number_of_replicas", numberOfReplicas);
604609
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
605610
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
606611
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void testSingleReaderWriter() throws Exception {
4242

4343
public void testMultipleReaderWriter() throws Exception {
4444
int concurrency = randomIntBetween(2, 8);
45-
TestRun testRun = createTestRun(0, 0, 1024);
45+
TestRun testRun = createTestRun(0, 0, between(1, 1024));
4646
ShardFollowNodeTask task = createShardFollowTask(concurrency, testRun);
4747
startAndAssertAndStopTask(task, testRun);
4848
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,18 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
4545
private Queue<Long> followerGlobalCheckpoints;
4646

4747
public void testCoordinateReads() {
48-
ShardFollowNodeTask task = createShardFollowTask(8, 8, 8, Integer.MAX_VALUE, Long.MAX_VALUE);
49-
startTask(task, 64, -1);
50-
48+
ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE);
49+
startTask(task, 3, -1);
5150
task.coordinateReads();
52-
assertThat(shardChangesRequests.size(), equalTo(8));
51+
assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request
52+
shardChangesRequests.clear();
53+
task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 60L));
5354
assertThat(shardChangesRequests, contains(new long[][]{
54-
{0L, 8L}, {8L, 8L}, {16L, 8L}, {24L, 8L}, {32L, 8L}, {40L, 8L}, {48L, 8L}, {56L, 8L}}
55+
{6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}}
5556
));
56-
5757
ShardFollowNodeTask.Status status = task.getStatus();
58-
assertThat(status.getNumberOfConcurrentReads(), equalTo(8));
59-
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
58+
assertThat(status.getNumberOfConcurrentReads(), equalTo(7));
59+
assertThat(status.getLastRequestedSeqno(), equalTo(60L));
6060
}
6161

6262
public void testWriteBuffer() {
@@ -263,12 +263,12 @@ public void testReceiveLessThanRequested() {
263263
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
264264

265265
shardChangesRequests.clear();
266-
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L);
267-
task.innerHandleReadResponse(0L, 64L, response);
266+
ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 31L);
267+
task.innerHandleReadResponse(0L, 63L, response);
268268

269269
assertThat(shardChangesRequests.size(), equalTo(1));
270-
assertThat(shardChangesRequests.get(0)[0], equalTo(32L));
271-
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
270+
assertThat(shardChangesRequests.get(0)[0], equalTo(21L));
271+
assertThat(shardChangesRequests.get(0)[1], equalTo(43L));
272272

273273
ShardFollowNodeTask.Status status = task.getStatus();
274274
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
@@ -310,7 +310,7 @@ public void testReceiveNothingExpectedSomething() {
310310
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
311311

312312
shardChangesRequests.clear();
313-
task.innerHandleReadResponse(0L, 64L,
313+
task.innerHandleReadResponse(0L, 63L,
314314
new ShardChangesAction.Response(0, 0, new Translog.Operation[0]));
315315

316316
assertThat(shardChangesRequests.size(), equalTo(1));
@@ -675,9 +675,9 @@ protected void innerSendBulkShardOperationsRequest(List<Translog.Operation> oper
675675
}
676676

677677
@Override
678-
protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
678+
protected void innerSendShardChangesRequest(long from, int requestBatchSize, Consumer<ShardChangesAction.Response> handler,
679679
Consumer<Exception> errorHandler) {
680-
shardChangesRequests.add(new long[]{from, maxBatchOperationCount});
680+
shardChangesRequests.add(new long[]{from, requestBatchSize});
681681
Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll();
682682
if (readFailure != null) {
683683
errorHandler.accept(readFailure);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package org.elasticsearch.xpack.ccr.action;
77

8+
import com.carrotsearch.hppc.LongHashSet;
9+
import com.carrotsearch.hppc.LongSet;
810
import org.elasticsearch.Version;
911
import org.elasticsearch.action.ActionListener;
1012
import org.elasticsearch.action.DocWriteResponse;
@@ -72,6 +74,7 @@ public void testSimpleCcrReplication() throws Exception {
7274
followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size());
7375
});
7476
shardFollowTask.markAsCompleted();
77+
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
7578
}
7679
}
7780

@@ -107,6 +110,7 @@ public void testFailLeaderReplicaShard() throws Exception {
107110
leaderGroup.assertAllEqual(docCount);
108111
assertBusy(() -> followerGroup.assertAllEqual(docCount));
109112
shardFollowTask.markAsCompleted();
113+
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
110114
}
111115
}
112116

@@ -141,12 +145,23 @@ private ReplicationGroup createFollowGroup(int replicas) throws IOException {
141145

142146
private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) {
143147
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
144-
new ShardId("leader_index", "", 0), 1024, 1, Long.MAX_VALUE, 1, 10240,
148+
new ShardId("leader_index", "", 0), between(1, 64), between(1, 8), Long.MAX_VALUE, between(1, 4), 10240,
145149
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap());
146150

147151
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
148152
AtomicBoolean stopped = new AtomicBoolean(false);
153+
LongSet fetchOperations = new LongHashSet();
149154
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
155+
@Override
156+
protected synchronized void onOperationsFetched(Translog.Operation[] operations) {
157+
super.onOperationsFetched(operations);
158+
for (Translog.Operation operation : operations) {
159+
if (fetchOperations.add(operation.seqNo()) == false) {
160+
throw new AssertionError("Operation [" + operation + " ] was fetched already");
161+
}
162+
}
163+
}
164+
150165
@Override
151166
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
152167
// noop, as mapping updates are not tested
@@ -210,6 +225,13 @@ public void markAsFailed(Exception e) {
210225
};
211226
}
212227

228+
private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup leader, ReplicationGroup follower) throws IOException {
229+
int totalOps = leader.getPrimary().estimateNumberOfHistoryOperations("test", 0);
230+
for (IndexShard followingShard : follower) {
231+
assertThat(followingShard.estimateNumberOfHistoryOperations("test", 0), equalTo(totalOps));
232+
}
233+
}
234+
213235
class CCRAction extends ReplicationAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
214236

215237
CCRAction(BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener, ReplicationGroup group) {

0 commit comments

Comments
 (0)