Skip to content

Commit ef73a24

Browse files
committed
feat(crons): Add ability dispatch clock ticks to kafka
1 parent 2c12dad commit ef73a24

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

src/sentry/monitors/clock_dispatch.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,18 @@
44
from datetime import datetime, timezone
55

66
import sentry_sdk
7+
from arroyo import Topic as ArroyoTopic
8+
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
79
from django.conf import settings
10+
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick
811

912
from sentry import options
13+
from sentry.conf.types.kafka_definition import Topic
1014
from sentry.monitors.tasks.check_missed import check_missing
1115
from sentry.monitors.tasks.check_timeout import check_timeout
12-
from sentry.utils import metrics, redis
16+
from sentry.utils import json, metrics, redis
17+
from sentry.utils.arroyo_producer import SingletonProducer
18+
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
1319

1420
logger = logging.getLogger("sentry")
1521
# This key is used to store the last timestamp that the tasks were triggered.
@@ -26,6 +32,17 @@ def _int_or_none(s: str | None) -> int | None:
2632
return int(s)
2733

2834

35+
def _get_producer() -> KafkaProducer:
36+
cluster_name = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["cluster"]
37+
producer_config = get_kafka_producer_cluster_options(cluster_name)
38+
producer_config.pop("compression.type", None)
39+
producer_config.pop("message.max.bytes", None)
40+
return KafkaProducer(build_kafka_configuration(default_config=producer_config))
41+
42+
43+
_clock_tick_producer = SingletonProducer(_get_producer)
44+
45+
2946
def _dispatch_tick(ts: datetime):
3047
"""
3148
Dispatch a clock tick which will trigger monitor tasks.
@@ -43,9 +60,15 @@ def _dispatch_tick(ts: datetime):
4360
skip any tasks it missed)
4461
"""
4562
if options.get("crons.use_clock_pulse_consumer"):
46-
# TODO(epurkhiser): This should dispatch the pulse as a message on the
47-
# monitors-clock-pulse topic
48-
pass
63+
if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
64+
# XXX(epurkhiser): Unclear what we want to do if we're not using kafka
65+
return
66+
67+
message: ClockTick = {"ts": ts.timestamp()}
68+
payload = KafkaPayload(None, json.dumps(message).encode("utf-8"), [])
69+
70+
topic = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["real_topic_name"]
71+
_clock_tick_producer.produce(ArroyoTopic(topic), payload)
4972
else:
5073
check_missing.delay(current_datetime=ts)
5174
check_timeout.delay(current_datetime=ts)

tests/sentry/monitors/test_clock_dispatch.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
from datetime import timedelta
22
from unittest import mock
33

4+
from arroyo import Topic
5+
from arroyo.backends.kafka import KafkaPayload
6+
from django.test.utils import override_settings
47
from django.utils import timezone
58

6-
from sentry.monitors.clock_dispatch import try_monitor_clock_tick
9+
from sentry.monitors.clock_dispatch import _dispatch_tick, try_monitor_clock_tick
10+
from sentry.testutils.helpers.options import override_options
11+
from sentry.utils import json
712

813

914
@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
@@ -141,3 +146,17 @@ def test_monitor_task_trigger_partition_tick_skip(dispatch_tick):
141146

142147
assert dispatch_tick.call_count == 2
143148
assert dispatch_tick.mock_calls[1] == mock.call(now + timedelta(minutes=2))
149+
150+
151+
@override_settings(KAFKA_TOPIC_OVERRIDES={"monitors-clock-tick": "clock-tick-test-topic"})
152+
@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
153+
@override_options({"crons.use_clock_pulse_consumer": True})
154+
@mock.patch("sentry.monitors.clock_dispatch._clock_tick_producer")
155+
def test_dispatch_to_kafka(clock_tick_producer_mock):
156+
now = timezone.now().replace(second=0, microsecond=0)
157+
_dispatch_tick(now)
158+
159+
clock_tick_producer_mock.produce.assert_called_with(
160+
Topic("clock-tick-test-topic"),
161+
KafkaPayload(None, json.dumps({"ts": now.timestamp()}).encode("utf-8"), []),
162+
)

0 commit comments

Comments
 (0)