Skip to content

Commit fa54be2

Browse files
authored
CCR: Do not minimization requesting range on leader (#30980)
Today before reading operations on the leading shard, we minimization the requesting range with the global checkpoint. However, this might make the request invalid if the following shard generates a requesting range based on the global-checkpoint from a primary shard and sends that request to a replica whose global checkpoint is lagged. Another issue is that we are mutating the request when applying minimization. If the request becomes invalid on a replica, we will reroute the mutated request instead of the original one to the primary. This commit removes the minimization and replaces it by a range check with the local checkpoint.
1 parent 7e8cf76 commit fa54be2

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,14 @@ public TransportAction(Settings settings,
235235
protected Response shardOperation(Request request, ShardId shardId) throws IOException {
236236
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
237237
IndexShard indexShard = indexService.getShard(request.getShard().id());
238-
239238
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
240-
request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint());
239+
// The following shard generates the request based on the global checkpoint which may not be synced to all leading copies.
240+
// However, this guarantees that the requesting range always be below the local-checkpoint of any leading copies.
241+
final long localCheckpoint = indexShard.getLocalCheckpoint();
242+
if (localCheckpoint < request.minSeqNo || localCheckpoint < request.maxSeqNo) {
243+
throw new IllegalStateException("invalid request from_seqno=[" + request.minSeqNo + "], " +
244+
"to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + localCheckpoint + "], shardId=[" + shardId + "]");
245+
}
241246
final Translog.Operation[] operations =
242247
getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
243248
return new Response(indexMetaDataVersion, operations);

0 commit comments

Comments
 (0)