diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 23771b136b8eb8..24c257a023fd0a 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -114,6 +114,7 @@ def env( SENTRY_ARTIFACT_BUNDLES_INDEXING_REDIS_CLUSTER = "default" SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER = "default" SENTRY_DEBUG_FILES_REDIS_CLUSTER = "default" +SENTRY_MONITORS_REDIS_CLUSTER = "default" # Hosts that are allowed to use system token authentication. # http://en.wikipedia.org/wiki/Reserved_IP_addresses @@ -3651,3 +3652,32 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str] SENTRY_METRICS_INTERFACE_BACKEND = "sentry.sentry_metrics.client.snuba.SnubaMetricsBackend" SENTRY_METRICS_INTERFACE_BACKEND_OPTIONS: dict[str, Any] = {} + +# This setting configures how the Monitors (Crons) feature will run the tasks +# responsible for marking monitors as having "Missed" check-ins and having +# "Timed out" check-ins. +# +# These two tasks must be run every minute and should be run as close to the +# leading minute boundary as possible. By default these tasks will be +# triggered via a clock pulse that is generated by a celery beat task. The +# sentry.monitors.consumer service is responsible for detecting this clock +# pulse and dispatching the tasks. +# +# When high volume mode is enabled, a clock pulse will not be generated by +# celery beat, instead the monitor consumer will use all processed check-in +# messages as its clock. We track message timestamps (floored to the minute) +# and any time that timestamp changes over a minute, the tasks will be +# triggered +# +# NOTE: THERE MUST BE A HIGH VOLUME OF CHECK-INS TO USE THIS MODE!! If a +# check-in message is not consumed the tasks will not run, and missed +# check-ins will not be generated! +# +# The advantage of high volume mode is that we will not rely on celery beat to +# accurately trigger clock pulses. This is important in scenarios where it is +# not possible to guarantee that the celery beat tasks will run every minute. +# +# (For example, when sentry.io deploys, there is a short period where the +# celery tasks are being restarted, if they are not running during the minute +# boundary, the task will not run) +SENTRY_MONITORS_HIGH_VOLUME_MODE = False diff --git a/src/sentry/monitors/consumers/monitor_consumer.py b/src/sentry/monitors/consumers/monitor_consumer.py index 7ba9cd837b436b..6a419c0ca6781d 100644 --- a/src/sentry/monitors/consumers/monitor_consumer.py +++ b/src/sentry/monitors/consumers/monitor_consumer.py @@ -4,11 +4,12 @@ from typing import Dict, Mapping, Optional, TypedDict import msgpack +import sentry_sdk from arroyo.backends.kafka.consumer import KafkaPayload from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory from arroyo.processing.strategies.commit import CommitOffsets from arroyo.processing.strategies.run_task import RunTask -from arroyo.types import Commit, Message, Partition +from arroyo.types import BrokerValue, Commit, Message, Partition from django.conf import settings from django.db import router, transaction from django.utils.text import slugify @@ -37,7 +38,7 @@ valid_duration, ) from sentry.monitors.validators import ConfigValidator, MonitorCheckInValidator -from sentry.utils import json, metrics +from sentry.utils import json, metrics, redis from sentry.utils.dates import to_datetime from sentry.utils.locking import UnableToAcquireLock from sentry.utils.locking.manager import LockManager @@ -50,6 +51,11 @@ CHECKIN_QUOTA_LIMIT = 5 CHECKIN_QUOTA_WINDOW = 60 +# This key is used when SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled and we +# trigger the monitor tasks as a side-effect of check-ins coming in. It is used +# to store he last timestamp that the tasks were triggered. +HIGH_VOLUME_LAST_TRIGGER_TS_KEY = "sentry.monitors.last_tasks_ts" + class CheckinMessage(TypedDict): payload: str @@ -140,7 +146,74 @@ def _ensure_monitor_with_config( return monitor -def _process_message(wrapper: CheckinMessage) -> None: +def _dispatch_tasks(ts: datetime): + # For now we're going to have this do nothing. We want to validate that + # we're not going to be skipping any check-ins + return + + # check_missing.delay(current_datetime=ts) + # check_timeout.delay(current_datetime=ts) + + +def _handle_clock_pulse_task_trigger(ts: datetime): + """ + Handles clock pulse messages. These pulses are generated by the + `sentry.monitors.tasks.clock_pulse` tasks which runs every minute. Clock + pulses will NOT be generated when SENTRY_MONITORS_HIGH_VOLUME_MODE is + enabled. + + This function is responsible for dispatching the missed check-in and timed + out check-in detection tasks. + """ + _dispatch_tasks(ts) + + +def _try_handle_high_volume_task_trigger(ts: datetime): + """ + When SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled we use each check-in + message as a pseudo clock. + """ + redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) + + # Trim the timestamp seconds off, these tasks are run once per minute and + # should have their timestamp clamped to the minute. + reference_datetime = ts.replace(second=0, microsecond=0) + reference_ts = int(reference_datetime.timestamp()) + + last_ts = redis_client.get(HIGH_VOLUME_LAST_TRIGGER_TS_KEY) + if last_ts is not None: + last_ts = int(last_ts) + + # Do nothing until the message we process moves across the minute boundary + if last_ts == reference_ts: + return + + try: + lock = locks.get("sentry.monitors.task_trigger", duration=5) + with lock.acquire(): + # If more than exactly a minute has passed then we've skipped a + # task run, report that to sentry, it is a problem. + if last_ts is not None and last_ts + 60 != reference_ts: + with sentry_sdk.push_scope() as scope: + scope.set_extra("last_ts", last_ts) + scope.set_extra("reference_ts", reference_ts) + sentry_sdk.capture_message("Monitor task dispatch minute skipped") + + _dispatch_tasks(ts) + metrics.incr("monitors.tassk.triggered_via_high_volume_clock") + redis_client.set(HIGH_VOLUME_LAST_TRIGGER_TS_KEY, reference_ts) + except UnableToAcquireLock: + # Another message processor is handling this. Nothing to do + pass + + +def _process_message(ts: datetime, wrapper: CheckinMessage) -> None: + # When running in high volume mode we will not consume clock pulses (The + # clock_pulse task is not enabled). Instead we use each check-in message as + # a means for trigering our tasks. + if settings.SENTRY_MONITORS_HIGH_VOLUME_MODE: + _try_handle_high_volume_task_trigger(ts) + params: CheckinPayload = json.loads(wrapper["payload"]) start_time = to_datetime(float(wrapper["start_time"])) project_id = int(wrapper["project_id"]) @@ -441,9 +514,10 @@ def create_with_partitions( partitions: Mapping[Partition, int], ) -> ProcessingStrategy[KafkaPayload]: def process_message(message: Message[KafkaPayload]) -> None: + assert isinstance(message.value, BrokerValue) try: wrapper = msgpack.unpackb(message.payload.value) - _process_message(wrapper) + _process_message(message.value.timestamp, wrapper) except Exception: logger.exception("Failed to process message payload") diff --git a/tests/sentry/monitors/test_monitor_consumer.py b/tests/sentry/monitors/test_monitor_consumer.py index 4e7d6fa9642fad..d7cf33e6bad5d9 100644 --- a/tests/sentry/monitors/test_monitor_consumer.py +++ b/tests/sentry/monitors/test_monitor_consumer.py @@ -50,9 +50,12 @@ def send_checkin( self, monitor_slug: str, guid: Optional[str] = None, + ts: Optional[datetime] = None, **overrides: Any, ) -> None: - now = datetime.now() + if ts is None: + ts = datetime.now() + self.guid = uuid.uuid4().hex if not guid else guid self.trace_id = uuid.uuid4().hex @@ -67,7 +70,7 @@ def send_checkin( payload.update(overrides) wrapper = { - "start_time": now.timestamp(), + "start_time": ts.timestamp(), "project_id": self.project.id, "payload": json.dumps(payload), "sdk": "test/1.0", @@ -81,7 +84,7 @@ def send_checkin( KafkaPayload(b"fake-key", msgpack.packb(wrapper), []), partition, 1, - datetime.now(), + ts, ) ) ) @@ -547,3 +550,35 @@ def test_organization_killswitch(self): options.set("crons.organization.disable-check-in", opt_val) assert not MonitorCheckIn.objects.filter(guid=self.guid).exists() + + @override_settings(SENTRY_MONITORS_HIGH_VOLUME_MODE=True) + @mock.patch("sentry.monitors.consumers.monitor_consumer._dispatch_tasks") + @mock.patch("sentry_sdk.capture_message") + def test_high_volume_task_trigger(self, capture_message, dispatch_tasks): + monitor = self._create_monitor(slug="my-monitor") + + assert dispatch_tasks.call_count == 0 + + now = datetime.now().replace(second=0, microsecond=0) + + # First checkin triggers tasks + self.send_checkin(monitor.slug, ts=now) + assert dispatch_tasks.call_count == 1 + + # 5 seconds later does NOT trigger the task + self.send_checkin(monitor.slug, ts=now + timedelta(seconds=5)) + assert dispatch_tasks.call_count == 1 + + # a minute later DOES trigger the task + self.send_checkin(monitor.slug, ts=now + timedelta(minutes=1)) + assert dispatch_tasks.call_count == 2 + + # Same time does NOT trigger the task + self.send_checkin(monitor.slug, ts=now + timedelta(minutes=1)) + assert dispatch_tasks.call_count == 2 + + # A skipped minute trigges the task AND captures an error + assert capture_message.call_count == 0 + self.send_checkin(monitor.slug, ts=now + timedelta(minutes=3, seconds=5)) + assert dispatch_tasks.call_count == 3 + capture_message.assert_called_with("Monitor task dispatch minute skipped")