Skip to content

ref(crons): Guard clock ticks against desynced-partitions #54489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,33 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
reference_datetime = ts.replace(second=0, microsecond=0)
reference_ts = int(reference_datetime.timestamp())

# Since GETSET is atomic this acts as a guard against another consumer
# picking up the minute rollover
precheck_last_ts = redis_client.get(HIGH_VOLUME_LAST_TRIGGER_TS_KEY)
if precheck_last_ts is not None:
precheck_last_ts = int(precheck_last_ts)

# If we have the same or an older reference timestamp from the most recent
# tick there is nothing to do, we've already handled this tick.
#
# The scenario where the reference_ts is older is likely due to a partition
# being slightly behind another partition that we've already read from
if precheck_last_ts is not None and precheck_last_ts >= reference_ts:
return

# GETSET is atomic. This is critical to avoid another consumer also
# processing the same tick.
last_ts = redis_client.getset(HIGH_VOLUME_LAST_TRIGGER_TS_KEY, reference_ts)
if last_ts is not None:
last_ts = int(last_ts)

# Do nothing until the message we process moves across the minute boundary
if last_ts == reference_ts:
# Another consumer already handled the tick if the first LAST_TRIGGERED
# timestamp we got is different from the one we just got from the GETSET.
# Nothing needs to be done
if precheck_last_ts != last_ts:
return

# Track the delay from the true time, ideally this should be pretty
# close, but in the case of a backlog, this will be much higher
total_delay = reference_ts - datetime.now().timestamp()
total_delay = datetime.now().timestamp()

logger.info(f"Monitor consumer clock tick: {reference_datetime}")
metrics.gauge("monitors.task.high_volume_clock_delay", total_delay, sample_rate=1.0)
Expand Down
33 changes: 33 additions & 0 deletions tests/sentry/monitors/test_monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,36 @@ def test_high_volume_task_trigger(self, dispatch_tasks):
"Failed try high-volume task trigger", exc_info=True
)
dispatch_tasks.side_effect = None

@override_settings(SENTRY_MONITORS_HIGH_VOLUME_MODE=True)
@mock.patch("sentry.monitors.consumers.monitor_consumer._dispatch_tasks")
def test_monitor_task_trigger_partition_desync(self, dispatch_tasks):
"""
When consumer partitions are not completely synchronized we may read
timestamps in a non-monotonic order. In this scenario we want to make
sure we still only trigger once
"""
monitor = self._create_monitor(slug="my-monitor")

assert dispatch_tasks.call_count == 0

now = datetime.now().replace(second=0, microsecond=0)

# First message with timestamp just after the minute bounardary
# triggers the task
self.send_checkin(monitor.slug, ts=now + timedelta(seconds=1))
assert dispatch_tasks.call_count == 1

# Second message has a timestamp just before the minute boundary,
# should not trigger anything since we've already ticked ahead of this
self.send_checkin(monitor.slug, ts=now - timedelta(seconds=1))
assert dispatch_tasks.call_count == 1

# Third message again just after the minute bounadry does NOT trigger
# the task, we've already ticked at that time.
self.send_checkin(monitor.slug, ts=now + timedelta(seconds=1))
assert dispatch_tasks.call_count == 1

# Fourth message moves past a new minute boundary, tick
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=1, seconds=1))
assert dispatch_tasks.call_count == 2