Skip to content

feat(crons): Add ability dispatch clock ticks to kafka #69896

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions src/sentry/monitors/clock_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
from datetime import datetime, timezone

import sentry_sdk
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.monitors.tasks.check_missed import check_missing
from sentry.monitors.tasks.check_timeout import check_timeout
from sentry.utils import metrics, redis
from sentry.utils import json, metrics, redis
from sentry.utils.arroyo_producer import SingletonProducer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

logger = logging.getLogger("sentry")
# This key is used to store the last timestamp that the tasks were triggered.
Expand All @@ -26,6 +32,17 @@ def _int_or_none(s: str | None) -> int | None:
return int(s)


def _get_producer() -> KafkaProducer:
cluster_name = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
return KafkaProducer(build_kafka_configuration(default_config=producer_config))


_clock_tick_producer = SingletonProducer(_get_producer)


def _dispatch_tick(ts: datetime):
"""
Dispatch a clock tick which will trigger monitor tasks.
Expand All @@ -43,9 +60,15 @@ def _dispatch_tick(ts: datetime):
skip any tasks it missed)
"""
if options.get("crons.use_clock_pulse_consumer"):
# TODO(epurkhiser): This should dispatch the pulse as a message on the
# monitors-clock-pulse topic
pass
if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
# XXX(epurkhiser): Unclear what we want to do if we're not using kafka
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fire the tasks directly probably, it's only for dev

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'll keep the logic separate to handle that 👍

return

message: ClockTick = {"ts": ts.timestamp()}
payload = KafkaPayload(None, json.dumps(message).encode("utf-8"), [])

topic = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["real_topic_name"]
_clock_tick_producer.produce(ArroyoTopic(topic), payload)
else:
check_missing.delay(current_datetime=ts)
check_timeout.delay(current_datetime=ts)
Expand Down
21 changes: 20 additions & 1 deletion tests/sentry/monitors/test_clock_dispatch.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from datetime import timedelta
from unittest import mock

from arroyo import Topic
from arroyo.backends.kafka import KafkaPayload
from django.test.utils import override_settings
from django.utils import timezone

from sentry.monitors.clock_dispatch import try_monitor_clock_tick
from sentry.monitors.clock_dispatch import _dispatch_tick, try_monitor_clock_tick
from sentry.testutils.helpers.options import override_options
from sentry.utils import json


@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
Expand Down Expand Up @@ -141,3 +146,17 @@ def test_monitor_task_trigger_partition_tick_skip(dispatch_tick):

assert dispatch_tick.call_count == 2
assert dispatch_tick.mock_calls[1] == mock.call(now + timedelta(minutes=2))


@override_settings(KAFKA_TOPIC_OVERRIDES={"monitors-clock-tick": "clock-tick-test-topic"})
@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
@override_options({"crons.use_clock_pulse_consumer": True})
@mock.patch("sentry.monitors.clock_dispatch._clock_tick_producer")
def test_dispatch_to_kafka(clock_tick_producer_mock):
now = timezone.now().replace(second=0, microsecond=0)
_dispatch_tick(now)

clock_tick_producer_mock.produce.assert_called_with(
Topic("clock-tick-test-topic"),
KafkaPayload(None, json.dumps({"ts": now.timestamp()}).encode("utf-8"), []),
)
Loading