Skip to content

Commit f33a3f7

Browse files
feat(crons): Limit clock ticks to the slowest partition (#58003)
Part of #55821 This function keeps a global clock driven by the monitor ingest topic to trigger the monitor tasks once per minute. This change updates this function to track the slowest partition within the topic. This will help to avoid the clock being moved forward when a single partition has a large number of check-ins read out of it (in a backlog situation), causing check-ins to be marked missed since they were not read before the clock ticked. This change does NOT yet use the slowest partition timestamp as the driver of the global clock, but simply logs the timestamp so we can validate in production that it is still accurately moving forward. In a future PR I will switch this function to use the slowest partition timestamp as the `reference_ts` and add tests to validate this works
1 parent b2821d1 commit f33a3f7

File tree

3 files changed

+67
-17
lines changed

3 files changed

+67
-17
lines changed

Diff for: src/sentry/monitors/consumers/monitor_consumer.py

+15-6
Original file line numberDiff line numberDiff line change
@@ -570,15 +570,20 @@ def update_existing_check_in(
570570
logger.exception("Failed to process check-in", exc_info=True)
571571

572572

573-
def _process_message(ts: datetime, wrapper: CheckinMessage | ClockPulseMessage) -> None:
574-
# XXX: Relay does not attach a message type, to properly discriminate
575-
# we add it by default here. This can be removed once the message_type
576-
# is guaranteed
573+
def _process_message(
574+
ts: datetime,
575+
partition: int,
576+
wrapper: CheckinMessage | ClockPulseMessage,
577+
) -> None:
578+
579+
# XXX: Relay does not attach a message type, to properly discriminate the
580+
# message_type we add it by default here. This can be removed once the
581+
# message_type is guaranteed
577582
if "message_type" not in wrapper:
578583
wrapper["message_type"] = "check_in"
579584

580585
try:
581-
try_monitor_tasks_trigger(ts)
586+
try_monitor_tasks_trigger(ts, partition)
582587
except Exception:
583588
logger.exception("Failed to trigger monitor tasks", exc_info=True)
584589

@@ -608,7 +613,11 @@ def process_message(message: Message[KafkaPayload]) -> None:
608613
assert isinstance(message.value, BrokerValue)
609614
try:
610615
wrapper = msgpack.unpackb(message.payload.value)
611-
_process_message(message.value.timestamp, wrapper)
616+
_process_message(
617+
message.value.timestamp,
618+
message.value.partition.index,
619+
wrapper,
620+
)
612621
except Exception:
613622
logger.exception("Failed to process message payload")
614623

Diff for: src/sentry/monitors/tasks.py

+43-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
# This key is used to store the last timestamp that the tasks were triggered.
4545
MONITOR_TASKS_LAST_TRIGGERED_KEY = "sentry.monitors.last_tasks_ts"
4646

47+
# This key is used to store the hashmap of Mapping[PartitionKey, Timestamp]
48+
MONITOR_TASKS_PARTITION_CLOCKS = "sentry.monitors.partition_clocks"
49+
4750

4851
def _get_producer() -> KafkaProducer:
4952
cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"]
@@ -90,10 +93,14 @@ def _dispatch_tasks(ts: datetime):
9093
check_timeout.delay(current_datetime=ts)
9194

9295

93-
def try_monitor_tasks_trigger(ts: datetime):
96+
def try_monitor_tasks_trigger(ts: datetime, partition: int):
9497
"""
9598
Handles triggering the monitor tasks when we've rolled over the minute.
9699
100+
We keep a reference to the most recent timestamp for each partition and use
101+
the slowest partition as our reference time. This ensures all partitions
102+
have been synchronized before ticking our clock.
103+
97104
This function is called by our consumer processor
98105
"""
99106
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
@@ -103,6 +110,30 @@ def try_monitor_tasks_trigger(ts: datetime):
103110
reference_datetime = ts.replace(second=0, microsecond=0)
104111
reference_ts = int(reference_datetime.timestamp())
105112

113+
# Store the current clock value for this partition.
114+
redis_client.zadd(
115+
name=MONITOR_TASKS_PARTITION_CLOCKS,
116+
mapping={f"part-{partition}": reference_ts},
117+
)
118+
119+
# Find the slowest partition from our sorted set of partitions, where the
120+
# clock is the score.
121+
slowest_partitions = redis_client.zrange(
122+
name=MONITOR_TASKS_PARTITION_CLOCKS,
123+
withscores=True,
124+
start=0,
125+
end=0,
126+
)
127+
128+
# the first tuple is the slowest (part-<id>, score), the score is the
129+
# timestamp. Use `int()` to keep the timestamp (score) as an int
130+
slowest_part_ts = int(slowest_partitions[0][1])
131+
132+
# TODO(epurkhiser): The `slowest_part_ts` is going to become the
133+
# reference_ts for the rest of this function. But we don't want to flip
134+
# this over quite yet since we want to make sure this is working as
135+
# expected.
136+
106137
precheck_last_ts = redis_client.get(MONITOR_TASKS_LAST_TRIGGERED_KEY)
107138
if precheck_last_ts is not None:
108139
precheck_last_ts = int(precheck_last_ts)
@@ -131,6 +162,15 @@ def try_monitor_tasks_trigger(ts: datetime):
131162
# close, but in the case of a backlog, this will be much higher
132163
total_delay = datetime.now().timestamp() - reference_ts
133164

165+
# TODO(epurkhiser): For now we will just log the slowest partition
166+
# timestamp and in production we can validate the value moves forward
167+
# correctly. It's likely this will be a minute behind the actual
168+
# reference_ts since there will always be a sloest partition
169+
logger.info(
170+
"monitors.consumer.clock_tick_slowest_partition",
171+
extra={"slowest_part_ts": slowest_part_ts},
172+
)
173+
134174
logger.info(
135175
"monitors.consumer.clock_tick",
136176
extra={"reference_datetime": str(reference_datetime)},
@@ -160,7 +200,8 @@ def clock_pulse(current_datetime=None):
160200

161201
if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
162202
# Directly trigger try_monitor_tasks_trigger in dev
163-
try_monitor_tasks_trigger(current_datetime)
203+
for partition in _get_partitions().values():
204+
try_monitor_tasks_trigger(current_datetime, partition.id)
164205
return
165206

166207
message: ClockPulseMessage = {

Diff for: tests/sentry/monitors/test_tasks.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -950,25 +950,25 @@ def test_monitor_task_trigger(dispatch_tasks):
950950
now = datetime.now().replace(second=0, microsecond=0)
951951

952952
# First checkin triggers tasks
953-
try_monitor_tasks_trigger(ts=now)
953+
try_monitor_tasks_trigger(ts=now, partition=0)
954954
assert dispatch_tasks.call_count == 1
955955

956956
# 5 seconds later does NOT trigger the task
957-
try_monitor_tasks_trigger(ts=now + timedelta(seconds=5))
957+
try_monitor_tasks_trigger(ts=now + timedelta(seconds=5), partition=0)
958958
assert dispatch_tasks.call_count == 1
959959

960960
# a minute later DOES trigger the task
961-
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1))
961+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1), partition=0)
962962
assert dispatch_tasks.call_count == 2
963963

964964
# Same time does NOT trigger the task
965-
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1))
965+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1), partition=0)
966966
assert dispatch_tasks.call_count == 2
967967

968968
# A skipped minute triggers the task AND captures an error
969969
with mock.patch("sentry_sdk.capture_message") as capture_message:
970970
assert capture_message.call_count == 0
971-
try_monitor_tasks_trigger(ts=now + timedelta(minutes=3, seconds=5))
971+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=3, seconds=5), partition=0)
972972
assert dispatch_tasks.call_count == 3
973973
capture_message.assert_called_with("Monitor task dispatch minute skipped")
974974

@@ -984,19 +984,19 @@ def test_monitor_task_trigger_partition_desync(dispatch_tasks):
984984

985985
# First message with timestamp just after the minute boundary
986986
# triggers the task
987-
try_monitor_tasks_trigger(ts=now + timedelta(seconds=1))
987+
try_monitor_tasks_trigger(ts=now + timedelta(seconds=1), partition=0)
988988
assert dispatch_tasks.call_count == 1
989989

990990
# Second message has a timestamp just before the minute boundary,
991991
# should not trigger anything since we've already ticked ahead of this
992-
try_monitor_tasks_trigger(ts=now - timedelta(seconds=1))
992+
try_monitor_tasks_trigger(ts=now - timedelta(seconds=1), partition=0)
993993
assert dispatch_tasks.call_count == 1
994994

995995
# Third message again just after the minute boundary does NOT trigger
996996
# the task, we've already ticked at that time.
997-
try_monitor_tasks_trigger(ts=now + timedelta(seconds=1))
997+
try_monitor_tasks_trigger(ts=now + timedelta(seconds=1), partition=0)
998998
assert dispatch_tasks.call_count == 1
999999

10001000
# Fourth message moves past a new minute boundary, tick
1001-
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1, seconds=1))
1001+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1, seconds=1), partition=0)
10021002
assert dispatch_tasks.call_count == 2

0 commit comments

Comments
 (0)