Skip to content

Commit 9fd307b

Browse files
committed
ref(crons): Simplify monitor clock implementation
This is a follow up to GH-54204 as suggested by @fpacifici #53661 (comment) > Can we just have the pulse message running in both modes and treat > everything as high volume mode? Instead of having two modes, we can simply always use the same logic for dispatching the monitor tasks on the minute roll-over, using the consumer as a clock. Previously the worry here was that in low-volume check-in situations nothing would drive the clock and we would need to have an external clock, with a different way to dispatch the tasks. But there is no need for a different way to dispatch the tasks, we can have an external clock that pulses messages into the topic and we can simply use the same logic already implemented to use the topic messages as a clock. This change removes the concept of "high volume" / "low volume" and adds the concept of a "clock_pulse" message to the consumer. In a follow up PR we will introduce the celery beat task which produces the clock_pulse messages.
1 parent a412a62 commit 9fd307b

File tree

2 files changed

+79
-40
lines changed

2 files changed

+79
-40
lines changed

src/sentry/monitors/consumers/monitor_consumer.py

+50-34
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
from __future__ import annotations
2+
13
import logging
24
import uuid
35
from datetime import datetime, timedelta
4-
from typing import Dict, Mapping, Optional, TypedDict
6+
from typing import Dict, Literal, Mapping, Optional, TypedDict
57

68
import msgpack
79
import sentry_sdk
@@ -51,19 +53,24 @@
5153
CHECKIN_QUOTA_LIMIT = 5
5254
CHECKIN_QUOTA_WINDOW = 60
5355

54-
# This key is used when SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled and we
55-
# trigger the monitor tasks as a side-effect of check-ins coming in. It is used
56-
# to store he last timestamp that the tasks were triggered.
57-
HIGH_VOLUME_LAST_TRIGGER_TS_KEY = "sentry.monitors.last_tasks_ts"
56+
# This key is used to store he last timestamp that the tasks were triggered.
57+
MONITOR_TASKS_LAST_TRIGGERED_KEY = "sentry.monitors.last_tasks_ts"
5858

5959

6060
class CheckinMessage(TypedDict):
61+
# TODO(epurkhiser): We should make this required and ensure the message
62+
# produced by relay includes this message type
63+
message_type: NotRequired[Literal["check_in"]]
6164
payload: str
62-
start_time: str
65+
start_time: float
6366
project_id: str
6467
sdk: str
6568

6669

70+
class ClockPulseMessage(TypedDict):
71+
message_type: Literal["clock_pulse"]
72+
73+
6774
class CheckinTrace(TypedDict):
6875
trace_id: str
6976

@@ -147,6 +154,23 @@ def _ensure_monitor_with_config(
147154

148155

149156
def _dispatch_tasks(ts: datetime):
157+
"""
158+
Dispatch monitor tasks triggered by the consumer clock. These will run
159+
after the MONITOR_TASK_DELAY (in seconds), This is to give some breathing
160+
room for check-ins to start and not be EXACTLY on the minute
161+
162+
These tasks are triggered via the consumer processing check-ins. This
163+
allows the monitor tasks to be synchronized to any backlog of check-ins
164+
that are being processed.
165+
166+
To ensure these tasks are always triggered there is an additional celery
167+
beat task that produces a clock pulse message into the topic that can be
168+
used to trigger these tasks when there is a low volume of check-ins. It is
169+
however, preferred to have a high volume of check-ins, so we do not need to
170+
rely on celery beat, which in some cases may fail to trigger (such as in
171+
sentry.io, when we deploy we restart the celery beat worker and it will
172+
skip any tasks it missed)
173+
"""
150174
# For now we're going to have this do nothing. We want to validate that
151175
# we're not going to be skipping any check-ins
152176
return
@@ -155,23 +179,9 @@ def _dispatch_tasks(ts: datetime):
155179
# check_timeout.delay(current_datetime=ts)
156180

157181

158-
def _handle_clock_pulse_task_trigger(ts: datetime):
182+
def _try_monitor_tasks_trigger(ts: datetime):
159183
"""
160-
Handles clock pulse messages. These pulses are generated by the
161-
`sentry.monitors.tasks.clock_pulse` tasks which runs every minute. Clock
162-
pulses will NOT be generated when SENTRY_MONITORS_HIGH_VOLUME_MODE is
163-
enabled.
164-
165-
This function is responsible for dispatching the missed check-in and timed
166-
out check-in detection tasks.
167-
"""
168-
_dispatch_tasks(ts)
169-
170-
171-
def _try_handle_high_volume_task_trigger(ts: datetime):
172-
"""
173-
When SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled we use each check-in
174-
message as a pseudo clock.
184+
Handles triggering the monitor tasks when we've rolled over the minute.
175185
"""
176186
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
177187

@@ -182,7 +192,7 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
182192

183193
# Since GETSET is atomic this acts as a guard against another consumer
184194
# picking up the minute rollover
185-
last_ts = redis_client.getset(HIGH_VOLUME_LAST_TRIGGER_TS_KEY, reference_ts)
195+
last_ts = redis_client.getset(MONITOR_TASKS_LAST_TRIGGERED_KEY, reference_ts)
186196
if last_ts is not None:
187197
last_ts = int(last_ts)
188198

@@ -194,8 +204,8 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
194204
# close, but in the case of a backlog, this will be much higher
195205
total_delay = reference_ts - datetime.now().timestamp()
196206

197-
metrics.incr("monitors.task.triggered_via_high_volume_clock")
198-
metrics.gauge("monitors.task.high_volume_clock_delay", total_delay)
207+
metrics.incr("monitors.task.triggered")
208+
metrics.gauge("monitors.task.clock_delay", total_delay)
199209

200210
# If more than exactly a minute has passed then we've skipped a
201211
# task run, report that to sentry, it is a problem.
@@ -208,15 +218,21 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
208218
_dispatch_tasks(ts)
209219

210220

211-
def _process_message(ts: datetime, wrapper: CheckinMessage) -> None:
212-
# When running in high volume mode we will not consume clock pulses (The
213-
# clock_pulse task is not enabled). Instead we use each check-in message as
214-
# a means for triggering our tasks.
215-
if settings.SENTRY_MONITORS_HIGH_VOLUME_MODE:
216-
try:
217-
_try_handle_high_volume_task_trigger(ts)
218-
except Exception:
219-
logger.exception("Failed try high-volume task trigger", exc_info=True)
221+
def _process_message(ts: datetime, wrapper: CheckinMessage | ClockPulseMessage) -> None:
222+
# XXX: Relay does not attach a message type, to properly discriminate the
223+
# type we add it by default here. This can be removed once the message_type
224+
# is guaranteed
225+
if "message_type" not in wrapper:
226+
wrapper["message_type"] = "check_in"
227+
228+
try:
229+
_try_monitor_tasks_trigger(ts)
230+
except Exception:
231+
logger.exception("Failed to trigger monitor tasks", exc_info=True)
232+
233+
# Nothing else to do with clock pulses
234+
if wrapper["message_type"] == "clock_pulse":
235+
return
220236

221237
params: CheckinPayload = json.loads(wrapper["payload"])
222238
start_time = to_datetime(float(wrapper["start_time"]))

tests/sentry/monitors/test_monitor_consumer.py

+29-6
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,28 @@ def send_checkin(
8989
)
9090
)
9191

92+
def send_clock_pulse(
93+
self,
94+
ts: Optional[datetime] = None,
95+
) -> None:
96+
if ts is None:
97+
ts = datetime.now()
98+
99+
wrapper = {"message_type": "clock_pulse"}
100+
101+
commit = mock.Mock()
102+
partition = Partition(Topic("test"), 0)
103+
StoreMonitorCheckInStrategyFactory().create_with_partitions(commit, {partition: 0}).submit(
104+
Message(
105+
BrokerValue(
106+
KafkaPayload(b"fake-key", msgpack.packb(wrapper), []),
107+
partition,
108+
1,
109+
ts,
110+
)
111+
)
112+
)
113+
92114
def test_payload(self) -> None:
93115
monitor = self._create_monitor(slug="my-monitor")
94116

@@ -548,10 +570,9 @@ def test_organization_killswitch(self):
548570

549571
assert not MonitorCheckIn.objects.filter(guid=self.guid).exists()
550572

551-
@override_settings(SENTRY_MONITORS_HIGH_VOLUME_MODE=True)
552573
@mock.patch("sentry.monitors.consumers.monitor_consumer.CHECKIN_QUOTA_LIMIT", 20)
553574
@mock.patch("sentry.monitors.consumers.monitor_consumer._dispatch_tasks")
554-
def test_high_volume_task_trigger(self, dispatch_tasks):
575+
def test_monitor_task_trigger(self, dispatch_tasks):
555576
monitor = self._create_monitor(slug="my-monitor")
556577

557578
assert dispatch_tasks.call_count == 0
@@ -581,12 +602,14 @@ def test_high_volume_task_trigger(self, dispatch_tasks):
581602
assert dispatch_tasks.call_count == 3
582603
capture_message.assert_called_with("Monitor task dispatch minute skipped")
583604

605+
# A clock pulse message also triggers the tasks
606+
self.send_clock_pulse(ts=now + timedelta(minutes=4))
607+
assert dispatch_tasks.call_count == 4
608+
584609
# An exception dispatching the tasks does NOT cause ingestion to fail
585610
with mock.patch("sentry.monitors.consumers.monitor_consumer.logger") as logger:
586611
dispatch_tasks.side_effect = Exception()
587-
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=4))
612+
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=5))
588613
assert MonitorCheckIn.objects.filter(guid=self.guid).exists()
589-
logger.exception.assert_called_with(
590-
"Failed try high-volume task trigger", exc_info=True
591-
)
614+
logger.exception.assert_called_with("Failed to trigger monitor tasks", exc_info=True)
592615
dispatch_tasks.side_effect = None

0 commit comments

Comments
 (0)