-
Notifications
You must be signed in to change notification settings - Fork 25.2k
CCR: replicates max seq_no of updates to follower #34051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This commit replicates the max_seq_no_of_updates on the leading index to the primaries of the following index via ShardFollowNodeTask. The max_seq_of_updates is then transmitted to the replicas of the follower via replication requests (that's BulkShardOperationsRequest). Relates elastic#33656
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a look and it looks good. Left a question.
@@ -56,6 +57,7 @@ | |||
|
|||
private long leaderGlobalCheckpoint; | |||
private long leaderMaxSeqNo; | |||
private volatile long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we provide leaderMaxSeqNoOfUpdatesOrDeletes
at the line 298 then this field does not have to be volatile? Then in the case of retries we would not read an updated version, but I think that is ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct. In the POC, I made that way but Boaz preferred capture it here. I am fine with either option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure exactly what @martijnvg meant, but I think this is has to be part of the state as we don't have direct connection between the read operations and the write operations. The read operations just populate the write buffer and the write operation read operations from the write buffer. Since the write buffer is class level state, I think it's easier to see correctness when you reason about an MSU field that relates to all ops in the write buffer. Note that I say easier because it maybe true without based on the fact that we always read the lowest sequence numbers from the buffer but IMO if that ends up working out it's still not worth the complexity.
@dnhatn does this need to be volatile? when do we read it out of lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martijnvg concerned about the volatile
. The current approach requires MSU to be volatile because we call sendBulkShardOperationsRequest
without synchronization when handling write failures. We can make the MSU field without volatile by capturing it once after we populate the write buffer (with synchronization). I tend to prefer the approach without volatile because we won't change MSU when handling write failures. @bleskes WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed adc5ae9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @dnhatn!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM (with some comments for consideration).
return new CcrWritePrimaryResult(replicaRequest, location, primary, logger); | ||
} | ||
|
||
@Override | ||
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica( | ||
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { | ||
assert replica.getMaxSeqNoOfUpdatesOrDeletes() >= request.getMaxSeqNoOfUpdatesOrDeletes() : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
@@ -529,6 +543,53 @@ public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception { | |||
"this setting is managed via a dedicated API")); | |||
} | |||
|
|||
public void testTransferMaxSeqNoOfUpdates() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dnhatn what's the added value of this test compared to asserting at all other tests that the MSU on the follower is the same as the primary after operations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I'll remove this test and reintroduce it when we have the optimization in the following engine.
@@ -56,6 +57,7 @@ | |||
|
|||
private long leaderGlobalCheckpoint; | |||
private long leaderMaxSeqNo; | |||
private volatile long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure exactly what @martijnvg meant, but I think this is has to be part of the state as we don't have direct connection between the read operations and the write operations. The read operations just populate the write buffer and the write operation read operations from the write buffer. Since the write buffer is class level state, I think it's easier to see correctness when you reason about an MSU field that relates to all ops in the write buffer. Note that I say easier because it maybe true without based on the fact that we always read the lowest sequence numbers from the buffer but IMO if that ends up working out it's still not worth the complexity.
@dnhatn does this need to be volatile? when do we read it out of lock?
@martijnvg I've addressed your question. Could you please have another look? Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Still LGTM |
Thanks @martijnvg and @bleskes. |
This commit replicates the max_seq_no_of_updates on the leading index to the primaries of the following index via ShardFollowNodeTask. The max_seq_of_updates is then transmitted to the replicas of the follower via replication requests (that's BulkShardOperationsRequest). Relates #33656
This commit replicates the max_seq_no_of_updates on the leading index to the primaries of the following index via ShardFollowNodeTask. The max_seq_of_updates is then transmitted to the replicas of the follower via replication requests (that's BulkShardOperationsRequest). Relates #33656
This commit replicates the max_seq_no_of_updates on the leading index to
the primaries of the following index via ShardFollowNodeTask. The
max_seq_of_updates is then transmitted to the replicas of the follower
via replication requests (that's BulkShardOperationsRequest).
Relates #33656