-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
ref(crons): Guard clock ticks against desynced-partitions #54489
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(crons): Guard clock ticks against desynced-partitions #54489
Conversation
@@ -180,19 +180,33 @@ def _try_handle_high_volume_task_trigger(ts: datetime): | |||
reference_datetime = ts.replace(second=0, microsecond=0) | |||
reference_ts = int(reference_datetime.timestamp()) | |||
|
|||
existing_last_ts = redis_client.get(HIGH_VOLUME_LAST_TRIGGER_TS_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't particularly LOVE this variable name, any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
precheck_last_ts
? Idk if that's much better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels better 👍
@@ -180,19 +180,33 @@ def _try_handle_high_volume_task_trigger(ts: datetime): | |||
reference_datetime = ts.replace(second=0, microsecond=0) | |||
reference_ts = int(reference_datetime.timestamp()) | |||
|
|||
existing_last_ts = redis_client.get(HIGH_VOLUME_LAST_TRIGGER_TS_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
precheck_last_ts
? Idk if that's much better
This fixes an issue with the original implementation of GH-54204 when processing messages in a non-monotonic order. Typically kafka messages will be in order like such 12:59:58 12:59:59 01:00:00 01:00:01 01:00:01 01:00:02 However, because of how messages are shared into the kafka partitions we may end up with a secnario that looks like this partitions #1 #2 #3 12:59:58 01:00:00 01:00:01 12:59:59 01:00:01 01:00:02 With one consumer reading from each partition sequentially we would read these out as 12:59:58 01:00:00 01:00:01 12:59:59 <-- problematic skip backwards in time 01:00:01 01:00:02 Prior to this change, when we would process the task_trigger clock tick for the timestamp `12:59:59` after `01:00:01` our `GETSET` would update the key with an OLDER timestamps. When the next tick happens at `01:00:01` we would now tick for the `01:00:00` minute boundary again incorrectly. This change corrects this by first looking at the existing last timestamp value stored in redis, if that value is smaller than the reference timestamp we're about to tick for, do nothing, do not store the older reference timestamp.
0601c4e
to
bd9187d
Compare
This fixes an issue with the original implementation of GH-54204 when processing messages in a non-monotonic order.
Typically kafka messages will be in order like such
However, because of how messages are shared into the kafka partitions we may end up with a secnario that looks like this
With one consumer reading from each partition sequentially we would read these out as
Prior to this change, when we would process the task_trigger clock tick for the timestamp
12:59:59
after01:00:01
ourGETSET
would update the key with an OLDER timestamps.When the next tick happens at
01:00:01
we would now tick for the01:00:00
minute boundary again incorrectly.This change corrects this by first looking at the existing last timestamp value stored in redis, if that value is smaller than the reference timestamp we're about to tick for, do nothing, do not store the older reference timestamp.