Skip to content

Commit a1c6ee4

Browse files
feat(crons): Implement "High Volume" consumer driven clock task dispatch (#54204)
This is a partial implementation of GH-53661. This implements the "High Volume" mode, where check-ins from the consumer are essentially the 'clock pulses' that are used to dispatch the monitor tasks each minute. This change does NOT actually dispatch the tasks, but it does send some telemetry each time the tasks would be dispatched, as well as capture error conditions when the clock skips.
1 parent bdbec4d commit a1c6ee4

File tree

3 files changed

+146
-7
lines changed

3 files changed

+146
-7
lines changed

src/sentry/conf/server.py

+30
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ def env(
114114
SENTRY_ARTIFACT_BUNDLES_INDEXING_REDIS_CLUSTER = "default"
115115
SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER = "default"
116116
SENTRY_DEBUG_FILES_REDIS_CLUSTER = "default"
117+
SENTRY_MONITORS_REDIS_CLUSTER = "default"
117118

118119
# Hosts that are allowed to use system token authentication.
119120
# http://en.wikipedia.org/wiki/Reserved_IP_addresses
@@ -3651,3 +3652,32 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
36513652

36523653
SENTRY_METRICS_INTERFACE_BACKEND = "sentry.sentry_metrics.client.snuba.SnubaMetricsBackend"
36533654
SENTRY_METRICS_INTERFACE_BACKEND_OPTIONS: dict[str, Any] = {}
3655+
3656+
# This setting configures how the Monitors (Crons) feature will run the tasks
3657+
# responsible for marking monitors as having "Missed" check-ins and having
3658+
# "Timed out" check-ins.
3659+
#
3660+
# These two tasks must be run every minute and should be run as close to the
3661+
# leading minute boundary as possible. By default these tasks will be
3662+
# triggered via a clock pulse that is generated by a celery beat task. The
3663+
# sentry.monitors.consumer service is responsible for detecting this clock
3664+
# pulse and dispatching the tasks.
3665+
#
3666+
# When high volume mode is enabled, a clock pulse will not be generated by
3667+
# celery beat, instead the monitor consumer will use all processed check-in
3668+
# messages as its clock. We track message timestamps (floored to the minute)
3669+
# and any time that timestamp changes over a minute, the tasks will be
3670+
# triggered
3671+
#
3672+
# NOTE: THERE MUST BE A HIGH VOLUME OF CHECK-INS TO USE THIS MODE!! If a
3673+
# check-in message is not consumed the tasks will not run, and missed
3674+
# check-ins will not be generated!
3675+
#
3676+
# The advantage of high volume mode is that we will not rely on celery beat to
3677+
# accurately trigger clock pulses. This is important in scenarios where it is
3678+
# not possible to guarantee that the celery beat tasks will run every minute.
3679+
#
3680+
# (For example, when sentry.io deploys, there is a short period where the
3681+
# celery tasks are being restarted, if they are not running during the minute
3682+
# boundary, the task will not run)
3683+
SENTRY_MONITORS_HIGH_VOLUME_MODE = False

src/sentry/monitors/consumers/monitor_consumer.py

+78-4
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
from typing import Dict, Mapping, Optional, TypedDict
55

66
import msgpack
7+
import sentry_sdk
78
from arroyo.backends.kafka.consumer import KafkaPayload
89
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
910
from arroyo.processing.strategies.commit import CommitOffsets
1011
from arroyo.processing.strategies.run_task import RunTask
11-
from arroyo.types import Commit, Message, Partition
12+
from arroyo.types import BrokerValue, Commit, Message, Partition
1213
from django.conf import settings
1314
from django.db import router, transaction
1415
from django.utils.text import slugify
@@ -37,7 +38,7 @@
3738
valid_duration,
3839
)
3940
from sentry.monitors.validators import ConfigValidator, MonitorCheckInValidator
40-
from sentry.utils import json, metrics
41+
from sentry.utils import json, metrics, redis
4142
from sentry.utils.dates import to_datetime
4243
from sentry.utils.locking import UnableToAcquireLock
4344
from sentry.utils.locking.manager import LockManager
@@ -50,6 +51,11 @@
5051
CHECKIN_QUOTA_LIMIT = 5
5152
CHECKIN_QUOTA_WINDOW = 60
5253

54+
# This key is used when SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled and we
55+
# trigger the monitor tasks as a side-effect of check-ins coming in. It is used
56+
# to store he last timestamp that the tasks were triggered.
57+
HIGH_VOLUME_LAST_TRIGGER_TS_KEY = "sentry.monitors.last_tasks_ts"
58+
5359

5460
class CheckinMessage(TypedDict):
5561
payload: str
@@ -140,7 +146,74 @@ def _ensure_monitor_with_config(
140146
return monitor
141147

142148

143-
def _process_message(wrapper: CheckinMessage) -> None:
149+
def _dispatch_tasks(ts: datetime):
150+
# For now we're going to have this do nothing. We want to validate that
151+
# we're not going to be skipping any check-ins
152+
return
153+
154+
# check_missing.delay(current_datetime=ts)
155+
# check_timeout.delay(current_datetime=ts)
156+
157+
158+
def _handle_clock_pulse_task_trigger(ts: datetime):
159+
"""
160+
Handles clock pulse messages. These pulses are generated by the
161+
`sentry.monitors.tasks.clock_pulse` tasks which runs every minute. Clock
162+
pulses will NOT be generated when SENTRY_MONITORS_HIGH_VOLUME_MODE is
163+
enabled.
164+
165+
This function is responsible for dispatching the missed check-in and timed
166+
out check-in detection tasks.
167+
"""
168+
_dispatch_tasks(ts)
169+
170+
171+
def _try_handle_high_volume_task_trigger(ts: datetime):
172+
"""
173+
When SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled we use each check-in
174+
message as a pseudo clock.
175+
"""
176+
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
177+
178+
# Trim the timestamp seconds off, these tasks are run once per minute and
179+
# should have their timestamp clamped to the minute.
180+
reference_datetime = ts.replace(second=0, microsecond=0)
181+
reference_ts = int(reference_datetime.timestamp())
182+
183+
last_ts = redis_client.get(HIGH_VOLUME_LAST_TRIGGER_TS_KEY)
184+
if last_ts is not None:
185+
last_ts = int(last_ts)
186+
187+
# Do nothing until the message we process moves across the minute boundary
188+
if last_ts == reference_ts:
189+
return
190+
191+
try:
192+
lock = locks.get("sentry.monitors.task_trigger", duration=5)
193+
with lock.acquire():
194+
# If more than exactly a minute has passed then we've skipped a
195+
# task run, report that to sentry, it is a problem.
196+
if last_ts is not None and last_ts + 60 != reference_ts:
197+
with sentry_sdk.push_scope() as scope:
198+
scope.set_extra("last_ts", last_ts)
199+
scope.set_extra("reference_ts", reference_ts)
200+
sentry_sdk.capture_message("Monitor task dispatch minute skipped")
201+
202+
_dispatch_tasks(ts)
203+
metrics.incr("monitors.tassk.triggered_via_high_volume_clock")
204+
redis_client.set(HIGH_VOLUME_LAST_TRIGGER_TS_KEY, reference_ts)
205+
except UnableToAcquireLock:
206+
# Another message processor is handling this. Nothing to do
207+
pass
208+
209+
210+
def _process_message(ts: datetime, wrapper: CheckinMessage) -> None:
211+
# When running in high volume mode we will not consume clock pulses (The
212+
# clock_pulse task is not enabled). Instead we use each check-in message as
213+
# a means for trigering our tasks.
214+
if settings.SENTRY_MONITORS_HIGH_VOLUME_MODE:
215+
_try_handle_high_volume_task_trigger(ts)
216+
144217
params: CheckinPayload = json.loads(wrapper["payload"])
145218
start_time = to_datetime(float(wrapper["start_time"]))
146219
project_id = int(wrapper["project_id"])
@@ -441,9 +514,10 @@ def create_with_partitions(
441514
partitions: Mapping[Partition, int],
442515
) -> ProcessingStrategy[KafkaPayload]:
443516
def process_message(message: Message[KafkaPayload]) -> None:
517+
assert isinstance(message.value, BrokerValue)
444518
try:
445519
wrapper = msgpack.unpackb(message.payload.value)
446-
_process_message(wrapper)
520+
_process_message(message.value.timestamp, wrapper)
447521
except Exception:
448522
logger.exception("Failed to process message payload")
449523

tests/sentry/monitors/test_monitor_consumer.py

+38-3
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,12 @@ def send_checkin(
5050
self,
5151
monitor_slug: str,
5252
guid: Optional[str] = None,
53+
ts: Optional[datetime] = None,
5354
**overrides: Any,
5455
) -> None:
55-
now = datetime.now()
56+
if ts is None:
57+
ts = datetime.now()
58+
5659
self.guid = uuid.uuid4().hex if not guid else guid
5760
self.trace_id = uuid.uuid4().hex
5861

@@ -67,7 +70,7 @@ def send_checkin(
6770
payload.update(overrides)
6871

6972
wrapper = {
70-
"start_time": now.timestamp(),
73+
"start_time": ts.timestamp(),
7174
"project_id": self.project.id,
7275
"payload": json.dumps(payload),
7376
"sdk": "test/1.0",
@@ -81,7 +84,7 @@ def send_checkin(
8184
KafkaPayload(b"fake-key", msgpack.packb(wrapper), []),
8285
partition,
8386
1,
84-
datetime.now(),
87+
ts,
8588
)
8689
)
8790
)
@@ -547,3 +550,35 @@ def test_organization_killswitch(self):
547550
options.set("crons.organization.disable-check-in", opt_val)
548551

549552
assert not MonitorCheckIn.objects.filter(guid=self.guid).exists()
553+
554+
@override_settings(SENTRY_MONITORS_HIGH_VOLUME_MODE=True)
555+
@mock.patch("sentry.monitors.consumers.monitor_consumer._dispatch_tasks")
556+
@mock.patch("sentry_sdk.capture_message")
557+
def test_high_volume_task_trigger(self, capture_message, dispatch_tasks):
558+
monitor = self._create_monitor(slug="my-monitor")
559+
560+
assert dispatch_tasks.call_count == 0
561+
562+
now = datetime.now().replace(second=0, microsecond=0)
563+
564+
# First checkin triggers tasks
565+
self.send_checkin(monitor.slug, ts=now)
566+
assert dispatch_tasks.call_count == 1
567+
568+
# 5 seconds later does NOT trigger the task
569+
self.send_checkin(monitor.slug, ts=now + timedelta(seconds=5))
570+
assert dispatch_tasks.call_count == 1
571+
572+
# a minute later DOES trigger the task
573+
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=1))
574+
assert dispatch_tasks.call_count == 2
575+
576+
# Same time does NOT trigger the task
577+
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=1))
578+
assert dispatch_tasks.call_count == 2
579+
580+
# A skipped minute trigges the task AND captures an error
581+
assert capture_message.call_count == 0
582+
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=3, seconds=5))
583+
assert dispatch_tasks.call_count == 3
584+
capture_message.assert_called_with("Monitor task dispatch minute skipped")

0 commit comments

Comments
 (0)