Skip to content

Commit 9fb864e

Browse files
feat(crons): Backfill missed clock-tick dispatches (#70338)
When we detect a clock tick it is possible that we may have skipped a tick when a monitor ingest partition is very slow and does not contain a message for a entire minute (and other partitions have already moved multiple minutes forward). Previously we would log this and avoid producing a clock tick for these skipped minute(s) as we were using celery to dispatch the check_missing and check_timeout tasks. Since the celery tasks would be produced back-to-back it wasn't unlikely they would be processed out of order Since the completion of GH-58410 we are now guaranteed that clock tick tasks are processed in order.
1 parent f6b60c2 commit 9fb864e

File tree

2 files changed

+107
-32
lines changed

2 files changed

+107
-32
lines changed

src/sentry/monitors/clock_dispatch.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import logging
4-
from datetime import datetime, timezone
4+
from datetime import datetime, timedelta, timezone
55

66
import sentry_sdk
77
from arroyo import Topic as ArroyoTopic
@@ -147,11 +147,23 @@ def try_monitor_clock_tick(ts: datetime, partition: int):
147147
metrics.gauge("monitors.task.clock_delay", total_delay, sample_rate=1.0)
148148

149149
# If more than exactly a minute has passed then we've skipped a
150-
# task run, report that to sentry, it is a problem.
150+
# task run, backfill those ticks. This can happen when one partition has
151+
# slowed down too much and is missing a minutes worth of check-ins
151152
if last_ts is not None and slowest_part_ts > last_ts + 60:
152-
with sentry_sdk.push_scope() as scope:
153-
scope.set_extra("last_ts", last_ts)
154-
scope.set_extra("slowest_part_ts", slowest_part_ts)
155-
sentry_sdk.capture_message("Monitor task dispatch minute skipped")
153+
if options.get("crons.use_clock_pulse_consumer"):
154+
# We only want to do backfills when we're using the clock tick
155+
# consumer, otherwise the celery tasks may process out of order
156+
backfill_tick = datetime.fromtimestamp(last_ts + 60, tz=timezone.utc)
157+
while backfill_tick < tick:
158+
extra = {"reference_datetime": str(backfill_tick)}
159+
logger.info("monitors.consumer.clock_tick_backfill", extra=extra)
160+
161+
_dispatch_tick(backfill_tick)
162+
backfill_tick = backfill_tick + timedelta(minutes=1)
163+
else:
164+
with sentry_sdk.push_scope() as scope:
165+
scope.set_extra("last_ts", last_ts)
166+
scope.set_extra("slowest_part_ts", slowest_part_ts)
167+
sentry_sdk.capture_message("Monitor task dispatch minute skipped")
156168

157169
_dispatch_tick(tick)

tests/sentry/monitors/test_clock_dispatch.py

Lines changed: 89 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,26 @@
1212

1313

1414
@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
15-
def test_monitor_task_trigger(dispatch_tick):
15+
@override_options({"crons.use_clock_pulse_consumer": False})
16+
def test_monitor_task_trigger_legacy(dispatch_tick):
1617
now = timezone.now().replace(second=0, microsecond=0)
1718

1819
# Assumes a single partition for simplicitly. Multi-partition cases are
1920
# covered in further test cases.
2021

21-
# First checkin triggers tasks
22+
# First checkin triggers dispatch
2223
try_monitor_clock_tick(ts=now, partition=0)
2324
assert dispatch_tick.call_count == 1
2425

25-
# 5 seconds later does NOT trigger the task
26+
# 5 seconds later does NOT trigger the dispatch
2627
try_monitor_clock_tick(ts=now + timedelta(seconds=5), partition=0)
2728
assert dispatch_tick.call_count == 1
2829

29-
# a minute later DOES trigger the task
30+
# a minute later DOES trigger the dispatch
3031
try_monitor_clock_tick(ts=now + timedelta(minutes=1), partition=0)
3132
assert dispatch_tick.call_count == 2
3233

33-
# Same time does NOT trigger the task
34+
# Same time does NOT trigger the dispatch
3435
try_monitor_clock_tick(ts=now + timedelta(minutes=1), partition=0)
3536
assert dispatch_tick.call_count == 2
3637

@@ -42,6 +43,79 @@ def test_monitor_task_trigger(dispatch_tick):
4243
capture_message.assert_called_with("Monitor task dispatch minute skipped")
4344

4445

46+
@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
47+
@override_options({"crons.use_clock_pulse_consumer": False})
48+
def test_monitor_task_trigger_partition_tick_skip_legacy(dispatch_tick):
49+
now = timezone.now().replace(second=0, microsecond=0)
50+
51+
# Tick for 4 partitions
52+
try_monitor_clock_tick(ts=now, partition=0)
53+
try_monitor_clock_tick(ts=now, partition=1)
54+
try_monitor_clock_tick(ts=now, partition=2)
55+
try_monitor_clock_tick(ts=now, partition=3)
56+
assert dispatch_tick.call_count == 1
57+
assert dispatch_tick.mock_calls[0] == mock.call(now)
58+
59+
# Tick forward twice for 3 partitions
60+
try_monitor_clock_tick(ts=now + timedelta(minutes=1), partition=0)
61+
try_monitor_clock_tick(ts=now + timedelta(minutes=1), partition=1)
62+
try_monitor_clock_tick(ts=now + timedelta(minutes=1), partition=2)
63+
64+
try_monitor_clock_tick(ts=now + timedelta(minutes=2), partition=0)
65+
try_monitor_clock_tick(ts=now + timedelta(minutes=3), partition=1)
66+
try_monitor_clock_tick(ts=now + timedelta(minutes=3), partition=2)
67+
assert dispatch_tick.call_count == 1
68+
69+
# Slowest partition catches up, but has a timestamp gap, capture the fact
70+
# that we skipped a minute
71+
with mock.patch("sentry_sdk.capture_message") as capture_message:
72+
assert capture_message.call_count == 0
73+
try_monitor_clock_tick(ts=now + timedelta(minutes=2), partition=3)
74+
capture_message.assert_called_with("Monitor task dispatch minute skipped")
75+
76+
# XXX(epurkhiser): Another approach we could take here is to detect the
77+
# skipped minute and generate a tick for that minute, since we know
78+
# processed past that minute.
79+
#
80+
# This still could be a problem though since it may mean we will not
81+
# produce missed check-ins since the monitor already may have already
82+
# checked-in after and moved the `next_checkin_latest` forward.
83+
#
84+
# In practice this should almost never happen since we have a high volume of
85+
86+
assert dispatch_tick.call_count == 2
87+
assert dispatch_tick.mock_calls[1] == mock.call(now + timedelta(minutes=2))
88+
89+
90+
@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
91+
@override_options({"crons.use_clock_pulse_consumer": True})
92+
def test_monitor_task_trigger(dispatch_tick):
93+
now = timezone.now().replace(second=0, microsecond=0)
94+
95+
# Assumes a single partition for simplicitly. Multi-partition cases are
96+
# covered in further test cases.
97+
98+
# First checkin triggers dispatch
99+
try_monitor_clock_tick(ts=now, partition=0)
100+
assert dispatch_tick.call_count == 1
101+
102+
# 5 seconds later does NOT trigger the dispatch
103+
try_monitor_clock_tick(ts=now + timedelta(seconds=5), partition=0)
104+
assert dispatch_tick.call_count == 1
105+
106+
# a minute later DOES trigger the dispatch
107+
try_monitor_clock_tick(ts=now + timedelta(minutes=1), partition=0)
108+
assert dispatch_tick.call_count == 2
109+
110+
# Same time does NOT trigger the dispatch
111+
try_monitor_clock_tick(ts=now + timedelta(minutes=1), partition=0)
112+
assert dispatch_tick.call_count == 2
113+
114+
# A skipped minute triggers the dispatch multiple times
115+
try_monitor_clock_tick(ts=now + timedelta(minutes=3, seconds=5), partition=0)
116+
assert dispatch_tick.call_count == 4
117+
118+
45119
@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
46120
def test_monitor_task_trigger_partition_desync(dispatch_tick):
47121
"""
@@ -52,7 +126,7 @@ def test_monitor_task_trigger_partition_desync(dispatch_tick):
52126
now = timezone.now().replace(second=0, microsecond=0)
53127

54128
# First message in partition 0 with timestamp just after the minute
55-
# boundary triggers the task
129+
# boundary triggers the dispatch
56130
try_monitor_clock_tick(ts=now + timedelta(seconds=1), partition=0)
57131
assert dispatch_tick.call_count == 1
58132

@@ -63,7 +137,7 @@ def test_monitor_task_trigger_partition_desync(dispatch_tick):
63137
assert dispatch_tick.call_count == 1
64138

65139
# Third message in partition 1 again just after the minute boundary does
66-
# NOT trigger the task, we've already ticked at that time.
140+
# NOT trigger the dispatch, we've already ticked at that time.
67141
try_monitor_clock_tick(ts=now + timedelta(seconds=1), partition=1)
68142
assert dispatch_tick.call_count == 1
69143

@@ -102,10 +176,12 @@ def test_monitor_task_trigger_partition_sync(dispatch_tick):
102176

103177

104178
@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
179+
@override_options({"crons.use_clock_pulse_consumer": True})
105180
def test_monitor_task_trigger_partition_tick_skip(dispatch_tick):
106181
"""
107182
In a scenario where all partitions move multiple ticks past the slowest
108-
partition we may end up skipping a tick.
183+
partition we may end up skipping a tick. In this scenario we will backfill
184+
those ticks
109185
"""
110186
now = timezone.now().replace(second=0, microsecond=0)
111187

@@ -127,25 +203,12 @@ def test_monitor_task_trigger_partition_tick_skip(dispatch_tick):
127203
try_monitor_clock_tick(ts=now + timedelta(minutes=3), partition=2)
128204
assert dispatch_tick.call_count == 1
129205

130-
# Slowest partition catches up, but has a timestamp gap, capture the fact
131-
# that we skipped a minute
132-
with mock.patch("sentry_sdk.capture_message") as capture_message:
133-
assert capture_message.call_count == 0
134-
try_monitor_clock_tick(ts=now + timedelta(minutes=2), partition=3)
135-
capture_message.assert_called_with("Monitor task dispatch minute skipped")
206+
# Slowest partition catches up, but has a timestamp gap
207+
try_monitor_clock_tick(ts=now + timedelta(minutes=2), partition=3)
136208

137-
# XXX(epurkhiser): Another approach we could take here is to detect the
138-
# skipped minute and generate a tick for that minute, since we know
139-
# processed past that minute.
140-
#
141-
# This still could be a problem though since it may mean we will not
142-
# produce missed check-ins since the monitor already may have already
143-
# checked-in after and moved the `next_checkin_latest` forward.
144-
#
145-
# In practice this should almost never happen since we have a high volume of
146-
147-
assert dispatch_tick.call_count == 2
148-
assert dispatch_tick.mock_calls[1] == mock.call(now + timedelta(minutes=2))
209+
assert dispatch_tick.call_count == 3
210+
assert dispatch_tick.mock_calls[1] == mock.call(now + timedelta(minutes=1))
211+
assert dispatch_tick.mock_calls[2] == mock.call(now + timedelta(minutes=2))
149212

150213

151214
@override_settings(KAFKA_TOPIC_OVERRIDES={"monitors-clock-tick": "clock-tick-test-topic"})

0 commit comments

Comments
 (0)