Skip to content

Commit 93e5ea0

Browse files
committed
fix(crons): Enable clock tick partition syncing
This is a follow up after GH-58003. The clock which dispatches tasks will now only tick forward once all partitions have been read up to to the synchronized time
1 parent f33a3f7 commit 93e5ea0

File tree

2 files changed

+103
-36
lines changed

2 files changed

+103
-36
lines changed

src/sentry/monitors/tasks.py

+14-27
Original file line numberDiff line numberDiff line change
@@ -129,26 +129,23 @@ def try_monitor_tasks_trigger(ts: datetime, partition: int):
129129
# timestamp. Use `int()` to keep the timestamp (score) as an int
130130
slowest_part_ts = int(slowest_partitions[0][1])
131131

132-
# TODO(epurkhiser): The `slowest_part_ts` is going to become the
133-
# reference_ts for the rest of this function. But we don't want to flip
134-
# this over quite yet since we want to make sure this is working as
135-
# expected.
136-
137132
precheck_last_ts = redis_client.get(MONITOR_TASKS_LAST_TRIGGERED_KEY)
138133
if precheck_last_ts is not None:
139134
precheck_last_ts = int(precheck_last_ts)
140135

141-
# If we have the same or an older reference timestamp from the most recent
142-
# tick there is nothing to do, we've already handled this tick.
136+
# If we have the same or an older timestamp from the most recent tick there
137+
# is nothing to do, we've already handled this tick.
143138
#
144-
# The scenario where the reference_ts is older is likely due to a partition
145-
# being slightly behind another partition that we've already read from
146-
if precheck_last_ts is not None and precheck_last_ts >= reference_ts:
139+
# The scenario where the slowest_part_ts is older may happen when our
140+
# MONITOR_TASKS_PARTITION_CLOCKS set did not know about every partition the
141+
# topic is responsible for. Older check-ins may be processed after newer
142+
# ones in diferent topics. This should only happen if redis loses state.
143+
if precheck_last_ts is not None and precheck_last_ts >= slowest_part_ts:
147144
return
148145

149146
# GETSET is atomic. This is critical to avoid another consumer also
150147
# processing the same tick.
151-
last_ts = redis_client.getset(MONITOR_TASKS_LAST_TRIGGERED_KEY, reference_ts)
148+
last_ts = redis_client.getset(MONITOR_TASKS_LAST_TRIGGERED_KEY, slowest_part_ts)
152149
if last_ts is not None:
153150
last_ts = int(last_ts)
154151

@@ -160,32 +157,22 @@ def try_monitor_tasks_trigger(ts: datetime, partition: int):
160157

161158
# Track the delay from the true time, ideally this should be pretty
162159
# close, but in the case of a backlog, this will be much higher
163-
total_delay = datetime.now().timestamp() - reference_ts
160+
total_delay = datetime.now().timestamp() - slowest_part_ts
164161

165-
# TODO(epurkhiser): For now we will just log the slowest partition
166-
# timestamp and in production we can validate the value moves forward
167-
# correctly. It's likely this will be a minute behind the actual
168-
# reference_ts since there will always be a sloest partition
169-
logger.info(
170-
"monitors.consumer.clock_tick_slowest_partition",
171-
extra={"slowest_part_ts": slowest_part_ts},
172-
)
162+
tick = datetime.fromtimestamp(slowest_part_ts)
173163

174-
logger.info(
175-
"monitors.consumer.clock_tick",
176-
extra={"reference_datetime": str(reference_datetime)},
177-
)
164+
logger.info("monitors.consumer.clock_tick", extra={"reference_datetime": str(tick)})
178165
metrics.gauge("monitors.task.clock_delay", total_delay, sample_rate=1.0)
179166

180167
# If more than exactly a minute has passed then we've skipped a
181168
# task run, report that to sentry, it is a problem.
182-
if last_ts is not None and reference_ts > last_ts + 60:
169+
if last_ts is not None and slowest_part_ts > last_ts + 60:
183170
with sentry_sdk.push_scope() as scope:
184171
scope.set_extra("last_ts", last_ts)
185-
scope.set_extra("reference_ts", reference_ts)
172+
scope.set_extra("slowest_part_ts", slowest_part_ts)
186173
sentry_sdk.capture_message("Monitor task dispatch minute skipped")
187174

188-
_dispatch_tasks(ts)
175+
_dispatch_tasks(tick)
189176

190177

191178
@instrumented_task(name="sentry.monitors.tasks.clock_pulse", silo_mode=SiloMode.REGION)

tests/sentry/monitors/test_tasks.py

+89-9
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,9 @@ def test_clock_pulse(checkin_producer_mock):
949949
def test_monitor_task_trigger(dispatch_tasks):
950950
now = datetime.now().replace(second=0, microsecond=0)
951951

952+
# Assumes a single partition for simplicitly. Multi-partition cases are
953+
# covered in further test cases.
954+
952955
# First checkin triggers tasks
953956
try_monitor_tasks_trigger(ts=now, partition=0)
954957
assert dispatch_tasks.call_count == 1
@@ -982,21 +985,98 @@ def test_monitor_task_trigger_partition_desync(dispatch_tasks):
982985
"""
983986
now = datetime.now().replace(second=0, microsecond=0)
984987

985-
# First message with timestamp just after the minute boundary
986-
# triggers the task
988+
# First message in partition 0 with timestamp just after the minute
989+
# boundary triggers the task
987990
try_monitor_tasks_trigger(ts=now + timedelta(seconds=1), partition=0)
988991
assert dispatch_tasks.call_count == 1
989992

990-
# Second message has a timestamp just before the minute boundary,
991-
# should not trigger anything since we've already ticked ahead of this
992-
try_monitor_tasks_trigger(ts=now - timedelta(seconds=1), partition=0)
993+
# Second message in a partition 1 has a timestamp just before the minute
994+
# boundary, should not trigger anything since we've already ticked ahead of
995+
# this
996+
try_monitor_tasks_trigger(ts=now - timedelta(seconds=1), partition=1)
993997
assert dispatch_tasks.call_count == 1
994998

995-
# Third message again just after the minute boundary does NOT trigger
996-
# the task, we've already ticked at that time.
997-
try_monitor_tasks_trigger(ts=now + timedelta(seconds=1), partition=0)
999+
# Third message in partition 1 again just after the minute boundary does
1000+
# NOT trigger the task, we've already ticked at that time.
1001+
try_monitor_tasks_trigger(ts=now + timedelta(seconds=1), partition=1)
9981002
assert dispatch_tasks.call_count == 1
9991003

1000-
# Fourth message moves past a new minute boundary, tick
1004+
# Next two messages in both partitions move the clock forward
10011005
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1, seconds=1), partition=0)
1006+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1, seconds=1), partition=1)
1007+
assert dispatch_tasks.call_count == 2
1008+
1009+
1010+
@mock.patch("sentry.monitors.tasks._dispatch_tasks")
1011+
def test_monitor_task_trigger_partition_sync(dispatch_tasks):
1012+
"""
1013+
When the kafka topic has multiple partitions we want to only tick our clock
1014+
forward once all partitions have caught up. This test simulates that
1015+
"""
1016+
now = datetime.now().replace(second=0, microsecond=0)
1017+
1018+
# Tick for 4 partitions
1019+
try_monitor_tasks_trigger(ts=now, partition=0)
1020+
try_monitor_tasks_trigger(ts=now, partition=1)
1021+
try_monitor_tasks_trigger(ts=now, partition=2)
1022+
try_monitor_tasks_trigger(ts=now, partition=3)
1023+
assert dispatch_tasks.call_count == 1
1024+
assert dispatch_tasks.mock_calls[0] == mock.call(now)
1025+
1026+
# Tick forward 3 of the partitions, global clock does not tick
1027+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1), partition=0)
1028+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1), partition=1)
1029+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1), partition=2)
1030+
assert dispatch_tasks.call_count == 1
1031+
1032+
# Slowest partition ticks forward, global clock ticks
1033+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1), partition=3)
1034+
assert dispatch_tasks.call_count == 2
1035+
assert dispatch_tasks.mock_calls[1] == mock.call(now + timedelta(minutes=1))
1036+
1037+
1038+
@mock.patch("sentry.monitors.tasks._dispatch_tasks")
1039+
def test_monitor_task_trigger_partition_tick_skip(dispatch_tasks):
1040+
"""
1041+
In a scenario where all partitions move multiple ticks past the slowest
1042+
partition we may end up skipping a tick.
1043+
"""
1044+
now = datetime.now().replace(second=0, microsecond=0)
1045+
1046+
# Tick for 4 partitions
1047+
try_monitor_tasks_trigger(ts=now, partition=0)
1048+
try_monitor_tasks_trigger(ts=now, partition=1)
1049+
try_monitor_tasks_trigger(ts=now, partition=2)
1050+
try_monitor_tasks_trigger(ts=now, partition=3)
1051+
assert dispatch_tasks.call_count == 1
1052+
assert dispatch_tasks.mock_calls[0] == mock.call(now)
1053+
1054+
# Tick forward twice for 3 partitions
1055+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1), partition=0)
1056+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1), partition=1)
1057+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=1), partition=2)
1058+
1059+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=2), partition=0)
1060+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=3), partition=1)
1061+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=3), partition=2)
1062+
assert dispatch_tasks.call_count == 1
1063+
1064+
# Slowest partition catches up, but has a timestamp gap, capture the fact
1065+
# that we skipped a minute
1066+
with mock.patch("sentry_sdk.capture_message") as capture_message:
1067+
assert capture_message.call_count == 0
1068+
try_monitor_tasks_trigger(ts=now + timedelta(minutes=2), partition=3)
1069+
capture_message.assert_called_with("Monitor task dispatch minute skipped")
1070+
1071+
# XXX(epurkhiser): Another approach we could take here is to detect the
1072+
# skipped minute and generate a tick for that minute, since we know
1073+
# processed past that minute.
1074+
#
1075+
# This still could be a problem though since it may mean we will not
1076+
# produce missed check-ins since the monitor already may have already
1077+
# checked-in after and moved the `next_checkin_latest` forward.
1078+
#
1079+
# In practice this should almost never happen since we have a high volume of
1080+
10021081
assert dispatch_tasks.call_count == 2
1082+
assert dispatch_tasks.mock_calls[1] == mock.call(now + timedelta(minutes=2))

0 commit comments

Comments
 (0)