Skip to content

Commit 48ca2ea

Browse files
ref(crons): Simplify monitor clock implementation (#54388)
This is a follow up to GH-54204 as suggested by @fpacifici #53661 (comment) > Can we just have the pulse message running in both modes and treat > everything as high volume mode? Instead of having two modes, we can simply always use the same logic for dispatching the monitor tasks on the minute roll-over, using the consumer as a clock. Previously the worry here was that in low-volume check-in situations nothing would drive the clock and we would need to have an external clock, with a different way to dispatch the tasks. But there is no need for a different way to dispatch the tasks, we can have an external clock that pulses messages into the topic and we can simply use the same logic already implemented to use the topic messages as a clock. This change removes the concept of "high volume" / "low volume" and adds the concept of a "clock_pulse" message to the consumer. In a follow up PR we will introduce the celery beat task which produces the clock_pulse messages.
1 parent e443c70 commit 48ca2ea

File tree

2 files changed

+79
-41
lines changed

2 files changed

+79
-41
lines changed

src/sentry/monitors/consumers/monitor_consumer.py

+50-34
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
from __future__ import annotations
2+
13
import logging
24
import uuid
35
from datetime import datetime, timedelta
4-
from typing import Dict, Mapping, Optional, TypedDict
6+
from typing import Dict, Literal, Mapping, Optional, TypedDict
57

68
import msgpack
79
import sentry_sdk
@@ -51,19 +53,24 @@
5153
CHECKIN_QUOTA_LIMIT = 5
5254
CHECKIN_QUOTA_WINDOW = 60
5355

54-
# This key is used when SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled and we
55-
# trigger the monitor tasks as a side-effect of check-ins coming in. It is used
56-
# to store he last timestamp that the tasks were triggered.
57-
HIGH_VOLUME_LAST_TRIGGER_TS_KEY = "sentry.monitors.last_tasks_ts"
56+
# This key is used to store he last timestamp that the tasks were triggered.
57+
MONITOR_TASKS_LAST_TRIGGERED_KEY = "sentry.monitors.last_tasks_ts"
5858

5959

6060
class CheckinMessage(TypedDict):
61+
# TODO(epurkhiser): We should make this required and ensure the message
62+
# produced by relay includes this message type
63+
message_type: NotRequired[Literal["check_in"]]
6164
payload: str
62-
start_time: str
65+
start_time: float
6366
project_id: str
6467
sdk: str
6568

6669

70+
class ClockPulseMessage(TypedDict):
71+
message_type: Literal["clock_pulse"]
72+
73+
6774
class CheckinTrace(TypedDict):
6875
trace_id: str
6976

@@ -147,6 +154,23 @@ def _ensure_monitor_with_config(
147154

148155

149156
def _dispatch_tasks(ts: datetime):
157+
"""
158+
Dispatch monitor tasks triggered by the consumer clock. These will run
159+
after the MONITOR_TASK_DELAY (in seconds), This is to give some breathing
160+
room for check-ins to start and not be EXACTLY on the minute
161+
162+
These tasks are triggered via the consumer processing check-ins. This
163+
allows the monitor tasks to be synchronized to any backlog of check-ins
164+
that are being processed.
165+
166+
To ensure these tasks are always triggered there is an additional celery
167+
beat task that produces a clock pulse message into the topic that can be
168+
used to trigger these tasks when there is a low volume of check-ins. It is
169+
however, preferred to have a high volume of check-ins, so we do not need to
170+
rely on celery beat, which in some cases may fail to trigger (such as in
171+
sentry.io, when we deploy we restart the celery beat worker and it will
172+
skip any tasks it missed)
173+
"""
150174
# For now we're going to have this do nothing. We want to validate that
151175
# we're not going to be skipping any check-ins
152176
return
@@ -155,23 +179,9 @@ def _dispatch_tasks(ts: datetime):
155179
# check_timeout.delay(current_datetime=ts)
156180

157181

158-
def _handle_clock_pulse_task_trigger(ts: datetime):
182+
def _try_monitor_tasks_trigger(ts: datetime):
159183
"""
160-
Handles clock pulse messages. These pulses are generated by the
161-
`sentry.monitors.tasks.clock_pulse` tasks which runs every minute. Clock
162-
pulses will NOT be generated when SENTRY_MONITORS_HIGH_VOLUME_MODE is
163-
enabled.
164-
165-
This function is responsible for dispatching the missed check-in and timed
166-
out check-in detection tasks.
167-
"""
168-
_dispatch_tasks(ts)
169-
170-
171-
def _try_handle_high_volume_task_trigger(ts: datetime):
172-
"""
173-
When SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled we use each check-in
174-
message as a pseudo clock.
184+
Handles triggering the monitor tasks when we've rolled over the minute.
175185
"""
176186
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
177187

@@ -180,7 +190,7 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
180190
reference_datetime = ts.replace(second=0, microsecond=0)
181191
reference_ts = int(reference_datetime.timestamp())
182192

183-
precheck_last_ts = redis_client.get(HIGH_VOLUME_LAST_TRIGGER_TS_KEY)
193+
precheck_last_ts = redis_client.get(MONITOR_TASKS_LAST_TRIGGERED_KEY)
184194
if precheck_last_ts is not None:
185195
precheck_last_ts = int(precheck_last_ts)
186196

@@ -194,7 +204,7 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
194204

195205
# GETSET is atomic. This is critical to avoid another consumer also
196206
# processing the same tick.
197-
last_ts = redis_client.getset(HIGH_VOLUME_LAST_TRIGGER_TS_KEY, reference_ts)
207+
last_ts = redis_client.getset(MONITOR_TASKS_LAST_TRIGGERED_KEY, reference_ts)
198208
if last_ts is not None:
199209
last_ts = int(last_ts)
200210

@@ -212,7 +222,7 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
212222
"monitors.consumer.clock_tick",
213223
extra={"reference_datetime": str(reference_datetime)},
214224
)
215-
metrics.gauge("monitors.task.high_volume_clock_delay", total_delay, sample_rate=1.0)
225+
metrics.gauge("monitors.task.clock_delay", total_delay, sample_rate=1.0)
216226

217227
# If more than exactly a minute has passed then we've skipped a
218228
# task run, report that to sentry, it is a problem.
@@ -225,15 +235,21 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
225235
_dispatch_tasks(ts)
226236

227237

228-
def _process_message(ts: datetime, wrapper: CheckinMessage) -> None:
229-
# When running in high volume mode we will not consume clock pulses (The
230-
# clock_pulse task is not enabled). Instead we use each check-in message as
231-
# a means for triggering our tasks.
232-
if settings.SENTRY_MONITORS_HIGH_VOLUME_MODE:
233-
try:
234-
_try_handle_high_volume_task_trigger(ts)
235-
except Exception:
236-
logger.exception("Failed try high-volume task trigger", exc_info=True)
238+
def _process_message(ts: datetime, wrapper: CheckinMessage | ClockPulseMessage) -> None:
239+
# XXX: Relay does not attach a message type, to properly discriminate the
240+
# type we add it by default here. This can be removed once the message_type
241+
# is guaranteed
242+
if "message_type" not in wrapper:
243+
wrapper["message_type"] = "check_in"
244+
245+
try:
246+
_try_monitor_tasks_trigger(ts)
247+
except Exception:
248+
logger.exception("Failed to trigger monitor tasks", exc_info=True)
249+
250+
# Nothing else to do with clock pulses
251+
if wrapper["message_type"] == "clock_pulse":
252+
return
237253

238254
params: CheckinPayload = json.loads(wrapper["payload"])
239255
start_time = to_datetime(float(wrapper["start_time"]))

tests/sentry/monitors/test_monitor_consumer.py

+29-7
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,28 @@ def send_checkin(
8989
)
9090
)
9191

92+
def send_clock_pulse(
93+
self,
94+
ts: Optional[datetime] = None,
95+
) -> None:
96+
if ts is None:
97+
ts = datetime.now()
98+
99+
wrapper = {"message_type": "clock_pulse"}
100+
101+
commit = mock.Mock()
102+
partition = Partition(Topic("test"), 0)
103+
StoreMonitorCheckInStrategyFactory().create_with_partitions(commit, {partition: 0}).submit(
104+
Message(
105+
BrokerValue(
106+
KafkaPayload(b"fake-key", msgpack.packb(wrapper), []),
107+
partition,
108+
1,
109+
ts,
110+
)
111+
)
112+
)
113+
92114
def test_payload(self) -> None:
93115
monitor = self._create_monitor(slug="my-monitor")
94116

@@ -548,10 +570,9 @@ def test_organization_killswitch(self):
548570

549571
assert not MonitorCheckIn.objects.filter(guid=self.guid).exists()
550572

551-
@override_settings(SENTRY_MONITORS_HIGH_VOLUME_MODE=True)
552573
@mock.patch("sentry.monitors.consumers.monitor_consumer.CHECKIN_QUOTA_LIMIT", 20)
553574
@mock.patch("sentry.monitors.consumers.monitor_consumer._dispatch_tasks")
554-
def test_high_volume_task_trigger(self, dispatch_tasks):
575+
def test_monitor_task_trigger(self, dispatch_tasks):
555576
monitor = self._create_monitor(slug="my-monitor")
556577

557578
assert dispatch_tasks.call_count == 0
@@ -581,17 +602,18 @@ def test_high_volume_task_trigger(self, dispatch_tasks):
581602
assert dispatch_tasks.call_count == 3
582603
capture_message.assert_called_with("Monitor task dispatch minute skipped")
583604

605+
# A clock pulse message also triggers the tasks
606+
self.send_clock_pulse(ts=now + timedelta(minutes=4))
607+
assert dispatch_tasks.call_count == 4
608+
584609
# An exception dispatching the tasks does NOT cause ingestion to fail
585610
with mock.patch("sentry.monitors.consumers.monitor_consumer.logger") as logger:
586611
dispatch_tasks.side_effect = Exception()
587-
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=4))
612+
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=5))
588613
assert MonitorCheckIn.objects.filter(guid=self.guid).exists()
589-
logger.exception.assert_called_with(
590-
"Failed try high-volume task trigger", exc_info=True
591-
)
614+
logger.exception.assert_called_with("Failed to trigger monitor tasks", exc_info=True)
592615
dispatch_tasks.side_effect = None
593616

594-
@override_settings(SENTRY_MONITORS_HIGH_VOLUME_MODE=True)
595617
@mock.patch("sentry.monitors.consumers.monitor_consumer._dispatch_tasks")
596618
def test_monitor_task_trigger_partition_desync(self, dispatch_tasks):
597619
"""

0 commit comments

Comments
 (0)