Skip to content

Commit bd9187d

Browse files
committed
ref(crons): Guard clock ticks against desynced-partitions
This fixes an issue with the original implementation of GH-54204 when processing messages in a non-monotonic order. Typically kafka messages will be in order like such 12:59:58 12:59:59 01:00:00 01:00:01 01:00:01 01:00:02 However, because of how messages are shared into the kafka partitions we may end up with a secnario that looks like this partitions #1 #2 #3 12:59:58 01:00:00 01:00:01 12:59:59 01:00:01 01:00:02 With one consumer reading from each partition sequentially we would read these out as 12:59:58 01:00:00 01:00:01 12:59:59 <-- problematic skip backwards in time 01:00:01 01:00:02 Prior to this change, when we would process the task_trigger clock tick for the timestamp `12:59:59` after `01:00:01` our `GETSET` would update the key with an OLDER timestamps. When the next tick happens at `01:00:01` we would now tick for the `01:00:00` minute boundary again incorrectly. This change corrects this by first looking at the existing last timestamp value stored in redis, if that value is smaller than the reference timestamp we're about to tick for, do nothing, do not store the older reference timestamp.
1 parent 82888a3 commit bd9187d

File tree

2 files changed

+52
-5
lines changed

2 files changed

+52
-5
lines changed

src/sentry/monitors/consumers/monitor_consumer.py

+19-5
Original file line numberDiff line numberDiff line change
@@ -180,19 +180,33 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
180180
reference_datetime = ts.replace(second=0, microsecond=0)
181181
reference_ts = int(reference_datetime.timestamp())
182182

183-
# Since GETSET is atomic this acts as a guard against another consumer
184-
# picking up the minute rollover
183+
precheck_last_ts = redis_client.get(HIGH_VOLUME_LAST_TRIGGER_TS_KEY)
184+
if precheck_last_ts is not None:
185+
precheck_last_ts = int(precheck_last_ts)
186+
187+
# If we have the same or an older reference timestamp from the most recent
188+
# tick there is nothing to do, we've already handled this tick.
189+
#
190+
# The scenario where the reference_ts is older is likely due to a partition
191+
# being slightly behind another partition that we've already read from
192+
if precheck_last_ts is not None and precheck_last_ts >= reference_ts:
193+
return
194+
195+
# GETSET is atomic. This is critical to avoid another consumer also
196+
# processing the same tick.
185197
last_ts = redis_client.getset(HIGH_VOLUME_LAST_TRIGGER_TS_KEY, reference_ts)
186198
if last_ts is not None:
187199
last_ts = int(last_ts)
188200

189-
# Do nothing until the message we process moves across the minute boundary
190-
if last_ts == reference_ts:
201+
# Another consumer already handled the tick if the first LAST_TRIGGERED
202+
# timestamp we got is different from the one we just got from the GETSET.
203+
# Nothing needs to be done
204+
if precheck_last_ts != last_ts:
191205
return
192206

193207
# Track the delay from the true time, ideally this should be pretty
194208
# close, but in the case of a backlog, this will be much higher
195-
total_delay = reference_ts - datetime.now().timestamp()
209+
total_delay = datetime.now().timestamp()
196210

197211
logger.info(f"Monitor consumer clock tick: {reference_datetime}")
198212
metrics.gauge("monitors.task.high_volume_clock_delay", total_delay, sample_rate=1.0)

tests/sentry/monitors/test_monitor_consumer.py

+33
Original file line numberDiff line numberDiff line change
@@ -590,3 +590,36 @@ def test_high_volume_task_trigger(self, dispatch_tasks):
590590
"Failed try high-volume task trigger", exc_info=True
591591
)
592592
dispatch_tasks.side_effect = None
593+
594+
@override_settings(SENTRY_MONITORS_HIGH_VOLUME_MODE=True)
595+
@mock.patch("sentry.monitors.consumers.monitor_consumer._dispatch_tasks")
596+
def test_monitor_task_trigger_partition_desync(self, dispatch_tasks):
597+
"""
598+
When consumer partitions are not completely synchronized we may read
599+
timestamps in a non-monotonic order. In this scenario we want to make
600+
sure we still only trigger once
601+
"""
602+
monitor = self._create_monitor(slug="my-monitor")
603+
604+
assert dispatch_tasks.call_count == 0
605+
606+
now = datetime.now().replace(second=0, microsecond=0)
607+
608+
# First message with timestamp just after the minute bounardary
609+
# triggers the task
610+
self.send_checkin(monitor.slug, ts=now + timedelta(seconds=1))
611+
assert dispatch_tasks.call_count == 1
612+
613+
# Second message has a timestamp just before the minute boundary,
614+
# should not trigger anything since we've already ticked ahead of this
615+
self.send_checkin(monitor.slug, ts=now - timedelta(seconds=1))
616+
assert dispatch_tasks.call_count == 1
617+
618+
# Third message again just after the minute bounadry does NOT trigger
619+
# the task, we've already ticked at that time.
620+
self.send_checkin(monitor.slug, ts=now + timedelta(seconds=1))
621+
assert dispatch_tasks.call_count == 1
622+
623+
# Fourth message moves past a new minute boundary, tick
624+
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=1, seconds=1))
625+
assert dispatch_tasks.call_count == 2

0 commit comments

Comments
 (0)