diff --git a/src/sentry/monitors/clock_dispatch.py b/src/sentry/monitors/clock_dispatch.py index 8ee1c3461584fa..92d33072da9f7b 100644 --- a/src/sentry/monitors/clock_dispatch.py +++ b/src/sentry/monitors/clock_dispatch.py @@ -4,12 +4,18 @@ from datetime import datetime, timezone import sentry_sdk +from arroyo import Topic as ArroyoTopic +from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from django.conf import settings +from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick from sentry import options +from sentry.conf.types.kafka_definition import Topic from sentry.monitors.tasks.check_missed import check_missing from sentry.monitors.tasks.check_timeout import check_timeout -from sentry.utils import metrics, redis +from sentry.utils import json, metrics, redis +from sentry.utils.arroyo_producer import SingletonProducer +from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition logger = logging.getLogger("sentry") # This key is used to store the last timestamp that the tasks were triggered. @@ -26,6 +32,17 @@ def _int_or_none(s: str | None) -> int | None: return int(s) +def _get_producer() -> KafkaProducer: + cluster_name = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["cluster"] + producer_config = get_kafka_producer_cluster_options(cluster_name) + producer_config.pop("compression.type", None) + producer_config.pop("message.max.bytes", None) + return KafkaProducer(build_kafka_configuration(default_config=producer_config)) + + +_clock_tick_producer = SingletonProducer(_get_producer) + + def _dispatch_tick(ts: datetime): """ Dispatch a clock tick which will trigger monitor tasks. @@ -43,9 +60,15 @@ def _dispatch_tick(ts: datetime): skip any tasks it missed) """ if options.get("crons.use_clock_pulse_consumer"): - # TODO(epurkhiser): This should dispatch the pulse as a message on the - # monitors-clock-pulse topic - pass + if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream": + # XXX(epurkhiser): Unclear what we want to do if we're not using kafka + return + + message: ClockTick = {"ts": ts.timestamp()} + payload = KafkaPayload(None, json.dumps(message).encode("utf-8"), []) + + topic = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["real_topic_name"] + _clock_tick_producer.produce(ArroyoTopic(topic), payload) else: check_missing.delay(current_datetime=ts) check_timeout.delay(current_datetime=ts) diff --git a/tests/sentry/monitors/test_clock_dispatch.py b/tests/sentry/monitors/test_clock_dispatch.py index 6a64efe166a56c..55540094d5d1b6 100644 --- a/tests/sentry/monitors/test_clock_dispatch.py +++ b/tests/sentry/monitors/test_clock_dispatch.py @@ -1,9 +1,14 @@ from datetime import timedelta from unittest import mock +from arroyo import Topic +from arroyo.backends.kafka import KafkaPayload +from django.test.utils import override_settings from django.utils import timezone -from sentry.monitors.clock_dispatch import try_monitor_clock_tick +from sentry.monitors.clock_dispatch import _dispatch_tick, try_monitor_clock_tick +from sentry.testutils.helpers.options import override_options +from sentry.utils import json @mock.patch("sentry.monitors.clock_dispatch._dispatch_tick") @@ -141,3 +146,17 @@ def test_monitor_task_trigger_partition_tick_skip(dispatch_tick): assert dispatch_tick.call_count == 2 assert dispatch_tick.mock_calls[1] == mock.call(now + timedelta(minutes=2)) + + +@override_settings(KAFKA_TOPIC_OVERRIDES={"monitors-clock-tick": "clock-tick-test-topic"}) +@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream") +@override_options({"crons.use_clock_pulse_consumer": True}) +@mock.patch("sentry.monitors.clock_dispatch._clock_tick_producer") +def test_dispatch_to_kafka(clock_tick_producer_mock): + now = timezone.now().replace(second=0, microsecond=0) + _dispatch_tick(now) + + clock_tick_producer_mock.produce.assert_called_with( + Topic("clock-tick-test-topic"), + KafkaPayload(None, json.dumps({"ts": now.timestamp()}).encode("utf-8"), []), + )