Skip to content

Commit aaef32b

Browse files
committed
[CCR] Only normalize -1 seqno in shard changes request. (#30238)
Prior to this change a -1 seqno would be normalized earlier, which caused a leader shard containing a single operation to be ignored. Closes #30227
1 parent b7727c6 commit aaef32b

File tree

2 files changed

+13
-6
lines changed

2 files changed

+13
-6
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public static class Status implements Task.Status {
5858
}
5959

6060
public Status(StreamInput in) throws IOException {
61-
this.processedGlobalCheckpoint = in.readVLong();
61+
this.processedGlobalCheckpoint = in.readZLong();
6262
}
6363

6464
public long getProcessedGlobalCheckpoint() {
@@ -72,7 +72,7 @@ public String getWriteableName() {
7272

7373
@Override
7474
public void writeTo(StreamOutput out) throws IOException {
75-
out.writeVLong(processedGlobalCheckpoint);
75+
out.writeZLong(processedGlobalCheckpoint);
7676
}
7777

7878
@Override

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask param
9393
this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client;
9494
logger.info("Starting shard following [{}]", params);
9595
fetchGlobalCheckpoint(client, params.getFollowShardId(),
96-
followGlobalCheckPoint -> prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint), task::markAsFailed);
96+
followGlobalCheckPoint -> {
97+
shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint);
98+
prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint);
99+
}, task::markAsFailed);
97100
}
98101

99102
void prepare(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
@@ -107,10 +110,13 @@ void prepare(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask para
107110
fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> {
108111
// TODO: check if both indices have the same history uuid
109112
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
113+
logger.debug("{} no write operations to fetch", followerShard);
110114
retry(leaderClient, task, params, followGlobalCheckPoint);
111115
} else {
112116
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
113117
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
118+
logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard,
119+
leaderGlobalCheckPoint, followGlobalCheckPoint);
114120
Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
115121
Consumer<Exception> handler = e -> {
116122
if (e == null) {
@@ -151,8 +157,7 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer
151157
.findAny();
152158

153159
if (filteredShardStats.isPresent()) {
154-
// Treat -1 as 0. If no indexing has happened in leader shard then global checkpoint is -1.
155-
final long globalCheckPoint = Math.max(0, filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint());
160+
final long globalCheckPoint = filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint();
156161
handler.accept(globalCheckPoint);
157162
} else {
158163
errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId));
@@ -284,7 +289,9 @@ static class ChunkProcessor {
284289

285290
void start(final long from, final long to, final long maxTranslogsBytes) {
286291
ShardChangesAction.Request request = new ShardChangesAction.Request(leaderShard);
287-
request.setMinSeqNo(from);
292+
// Treat -1 as 0, because shard changes api min_seq_no is inclusive and therefore it doesn't allow a negative min_seq_no
293+
// (If no indexing has happened in leader shard then global checkpoint is -1.)
294+
request.setMinSeqNo(Math.max(0, from));
288295
request.setMaxSeqNo(to);
289296
request.setMaxTranslogsBytes(maxTranslogsBytes);
290297
leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener<ShardChangesAction.Response>() {

0 commit comments

Comments
 (0)