Skip to content

feat(crons): Implement "High Volume" consumer driven clock task dispatch #54204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def env(
SENTRY_ARTIFACT_BUNDLES_INDEXING_REDIS_CLUSTER = "default"
SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER = "default"
SENTRY_DEBUG_FILES_REDIS_CLUSTER = "default"
SENTRY_MONITORS_REDIS_CLUSTER = "default"

# Hosts that are allowed to use system token authentication.
# http://en.wikipedia.org/wiki/Reserved_IP_addresses
Expand Down Expand Up @@ -3651,3 +3652,32 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]

SENTRY_METRICS_INTERFACE_BACKEND = "sentry.sentry_metrics.client.snuba.SnubaMetricsBackend"
SENTRY_METRICS_INTERFACE_BACKEND_OPTIONS: dict[str, Any] = {}

# This setting configures how the Monitors (Crons) feature will run the tasks
# responsible for marking monitors as having "Missed" check-ins and having
# "Timed out" check-ins.
#
# These two tasks must be run every minute and should be run as close to the
# leading minute boundary as possible. By default these tasks will be
# triggered via a clock pulse that is generated by a celery beat task. The
# sentry.monitors.consumer service is responsible for detecting this clock
# pulse and dispatching the tasks.
#
# When high volume mode is enabled, a clock pulse will not be generated by
# celery beat, instead the monitor consumer will use all processed check-in
# messages as its clock. We track message timestamps (floored to the minute)
# and any time that timestamp changes over a minute, the tasks will be
# triggered
#
# NOTE: THERE MUST BE A HIGH VOLUME OF CHECK-INS TO USE THIS MODE!! If a
# check-in message is not consumed the tasks will not run, and missed
# check-ins will not be generated!
#
# The advantage of high volume mode is that we will not rely on celery beat to
# accurately trigger clock pulses. This is important in scenarios where it is
# not possible to guarantee that the celery beat tasks will run every minute.
#
# (For example, when sentry.io deploys, there is a short period where the
# celery tasks are being restarted, if they are not running during the minute
# boundary, the task will not run)
SENTRY_MONITORS_HIGH_VOLUME_MODE = False
82 changes: 78 additions & 4 deletions src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
from typing import Dict, Mapping, Optional, TypedDict

import msgpack
import sentry_sdk
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import Commit, Message, Partition
from arroyo.types import BrokerValue, Commit, Message, Partition
from django.conf import settings
from django.db import router, transaction
from django.utils.text import slugify
Expand Down Expand Up @@ -37,7 +38,7 @@
valid_duration,
)
from sentry.monitors.validators import ConfigValidator, MonitorCheckInValidator
from sentry.utils import json, metrics
from sentry.utils import json, metrics, redis
from sentry.utils.dates import to_datetime
from sentry.utils.locking import UnableToAcquireLock
from sentry.utils.locking.manager import LockManager
Expand All @@ -50,6 +51,11 @@
CHECKIN_QUOTA_LIMIT = 5
CHECKIN_QUOTA_WINDOW = 60

# This key is used when SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled and we
# trigger the monitor tasks as a side-effect of check-ins coming in. It is used
# to store he last timestamp that the tasks were triggered.
HIGH_VOLUME_LAST_TRIGGER_TS_KEY = "sentry.monitors.last_tasks_ts"


class CheckinMessage(TypedDict):
payload: str
Expand Down Expand Up @@ -140,7 +146,74 @@ def _ensure_monitor_with_config(
return monitor


def _process_message(wrapper: CheckinMessage) -> None:
def _dispatch_tasks(ts: datetime):
# For now we're going to have this do nothing. We want to validate that
# we're not going to be skipping any check-ins
return

# check_missing.delay(current_datetime=ts)
# check_timeout.delay(current_datetime=ts)


def _handle_clock_pulse_task_trigger(ts: datetime):
"""
Handles clock pulse messages. These pulses are generated by the
`sentry.monitors.tasks.clock_pulse` tasks which runs every minute. Clock
pulses will NOT be generated when SENTRY_MONITORS_HIGH_VOLUME_MODE is
enabled.

This function is responsible for dispatching the missed check-in and timed
out check-in detection tasks.
"""
_dispatch_tasks(ts)


def _try_handle_high_volume_task_trigger(ts: datetime):
"""
When SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled we use each check-in
message as a pseudo clock.
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# Trim the timestamp seconds off, these tasks are run once per minute and
# should have their timestamp clamped to the minute.
reference_datetime = ts.replace(second=0, microsecond=0)
reference_ts = int(reference_datetime.timestamp())

last_ts = redis_client.get(HIGH_VOLUME_LAST_TRIGGER_TS_KEY)
if last_ts is not None:
last_ts = int(last_ts)

# Do nothing until the message we process moves across the minute boundary
if last_ts == reference_ts:
return

try:
lock = locks.get("sentry.monitors.task_trigger", duration=5)
with lock.acquire():
# If more than exactly a minute has passed then we've skipped a
# task run, report that to sentry, it is a problem.
if last_ts is not None and last_ts + 60 != reference_ts:
with sentry_sdk.push_scope() as scope:
scope.set_extra("last_ts", last_ts)
scope.set_extra("reference_ts", reference_ts)
sentry_sdk.capture_message("Monitor task dispatch minute skipped")

_dispatch_tasks(ts)
metrics.incr("monitors.tassk.triggered_via_high_volume_clock")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably useful to also record the difference between the actual time, and what the reference time is. We want this to ideally be no more than a few seconds late.

redis_client.set(HIGH_VOLUME_LAST_TRIGGER_TS_KEY, reference_ts)
except UnableToAcquireLock:
# Another message processor is handling this. Nothing to do
pass


def _process_message(ts: datetime, wrapper: CheckinMessage) -> None:
# When running in high volume mode we will not consume clock pulses (The
# clock_pulse task is not enabled). Instead we use each check-in message as
# a means for trigering our tasks.
if settings.SENTRY_MONITORS_HIGH_VOLUME_MODE:
_try_handle_high_volume_task_trigger(ts)

params: CheckinPayload = json.loads(wrapper["payload"])
start_time = to_datetime(float(wrapper["start_time"]))
project_id = int(wrapper["project_id"])
Expand Down Expand Up @@ -441,9 +514,10 @@ def create_with_partitions(
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
def process_message(message: Message[KafkaPayload]) -> None:
assert isinstance(message.value, BrokerValue)
try:
wrapper = msgpack.unpackb(message.payload.value)
_process_message(wrapper)
_process_message(message.value.timestamp, wrapper)
except Exception:
logger.exception("Failed to process message payload")

Expand Down
41 changes: 38 additions & 3 deletions tests/sentry/monitors/test_monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ def send_checkin(
self,
monitor_slug: str,
guid: Optional[str] = None,
ts: Optional[datetime] = None,
**overrides: Any,
) -> None:
now = datetime.now()
if ts is None:
ts = datetime.now()

self.guid = uuid.uuid4().hex if not guid else guid
self.trace_id = uuid.uuid4().hex

Expand All @@ -67,7 +70,7 @@ def send_checkin(
payload.update(overrides)

wrapper = {
"start_time": now.timestamp(),
"start_time": ts.timestamp(),
"project_id": self.project.id,
"payload": json.dumps(payload),
"sdk": "test/1.0",
Expand All @@ -81,7 +84,7 @@ def send_checkin(
KafkaPayload(b"fake-key", msgpack.packb(wrapper), []),
partition,
1,
datetime.now(),
ts,
)
)
)
Expand Down Expand Up @@ -547,3 +550,35 @@ def test_organization_killswitch(self):
options.set("crons.organization.disable-check-in", opt_val)

assert not MonitorCheckIn.objects.filter(guid=self.guid).exists()

@override_settings(SENTRY_MONITORS_HIGH_VOLUME_MODE=True)
@mock.patch("sentry.monitors.consumers.monitor_consumer._dispatch_tasks")
@mock.patch("sentry_sdk.capture_message")
def test_high_volume_task_trigger(self, capture_message, dispatch_tasks):
monitor = self._create_monitor(slug="my-monitor")

assert dispatch_tasks.call_count == 0

now = datetime.now().replace(second=0, microsecond=0)

# First checkin triggers tasks
self.send_checkin(monitor.slug, ts=now)
assert dispatch_tasks.call_count == 1

# 5 seconds later does NOT trigger the task
self.send_checkin(monitor.slug, ts=now + timedelta(seconds=5))
assert dispatch_tasks.call_count == 1

# a minute later DOES trigger the task
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=1))
assert dispatch_tasks.call_count == 2

# Same time does NOT trigger the task
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=1))
assert dispatch_tasks.call_count == 2

# A skipped minute trigges the task AND captures an error
assert capture_message.call_count == 0
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=3, seconds=5))
assert dispatch_tasks.call_count == 3
capture_message.assert_called_with("Monitor task dispatch minute skipped")