Skip to content

Commit 25b75af

Browse files
antonpirkerarjenzorgdoc
authored andcommitted
Reapply "Refactor the Celery Beat integration (getsentry#3105)" (getsentry#3144) (getsentry#3175)
This reverts the revert that was done to mitigate the regression error with Crons not being sending ok/error checkins. This reapplies the refactoring and also fixes the root cause of the regression and also adds integration tests to make sure it does not happen again.
1 parent d9fb440 commit 25b75af

File tree

10 files changed

+463
-145
lines changed

10 files changed

+463
-145
lines changed

.github/workflows/test-integrations-data-processing.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ jobs:
3636
- uses: actions/setup-python@v5
3737
with:
3838
python-version: ${{ matrix.python-version }}
39+
- name: Start Redis
40+
uses: supercharge/[email protected]
3941
- name: Setup Test Env
4042
run: |
4143
pip install coverage tox
@@ -108,6 +110,8 @@ jobs:
108110
- uses: actions/setup-python@v5
109111
with:
110112
python-version: ${{ matrix.python-version }}
113+
- name: Start Redis
114+
uses: supercharge/[email protected]
111115
- name: Setup Test Env
112116
run: |
113117
pip install coverage tox

scripts/split-tox-gh-actions/split-tox-gh-actions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
"asyncpg",
3636
}
3737

38+
FRAMEWORKS_NEEDING_REDIS = {
39+
"celery",
40+
}
41+
3842
FRAMEWORKS_NEEDING_CLICKHOUSE = {
3943
"clickhouse_driver",
4044
}
@@ -275,6 +279,7 @@ def render_template(group, frameworks, py_versions_pinned, py_versions_latest):
275279
"needs_aws_credentials": bool(set(frameworks) & FRAMEWORKS_NEEDING_AWS),
276280
"needs_clickhouse": bool(set(frameworks) & FRAMEWORKS_NEEDING_CLICKHOUSE),
277281
"needs_postgres": bool(set(frameworks) & FRAMEWORKS_NEEDING_POSTGRES),
282+
"needs_redis": bool(set(frameworks) & FRAMEWORKS_NEEDING_REDIS),
278283
"needs_github_secrets": bool(
279284
set(frameworks) & FRAMEWORKS_NEEDING_GITHUB_SECRETS
280285
),

scripts/split-tox-gh-actions/templates/test_group.jinja

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@
5353
- uses: getsentry/action-clickhouse-in-ci@v1
5454
{% endif %}
5555

56+
{% if needs_redis %}
57+
- name: Start Redis
58+
uses: supercharge/[email protected]
59+
{% endif %}
60+
5661
- name: Setup Test Env
5762
run: |
5863
pip install coverage tox

sentry_sdk/integrations/celery/__init__.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,9 @@ def __init__(
7070
self.monitor_beat_tasks = monitor_beat_tasks
7171
self.exclude_beat_tasks = exclude_beat_tasks
7272

73-
if monitor_beat_tasks:
74-
_patch_beat_apply_entry()
75-
_patch_redbeat_maybe_due()
76-
_setup_celery_beat_signals()
73+
_patch_beat_apply_entry()
74+
_patch_redbeat_maybe_due()
75+
_setup_celery_beat_signals(monitor_beat_tasks)
7776

7877
@staticmethod
7978
def setup_once():
@@ -167,11 +166,11 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
167166
"""
168167
updated_headers = original_headers.copy()
169168
with capture_internal_exceptions():
170-
headers = {}
171-
if span is not None:
172-
headers = dict(
173-
Scope.get_current_scope().iter_trace_propagation_headers(span=span)
174-
)
169+
# if span is None (when the task was started by Celery Beat)
170+
# this will return the trace headers from the scope.
171+
headers = dict(
172+
Scope.get_isolation_scope().iter_trace_propagation_headers(span=span)
173+
)
175174

176175
if monitor_beat_tasks:
177176
headers.update(

sentry_sdk/integrations/celery/beat.py

Lines changed: 72 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -113,133 +113,109 @@ def _get_monitor_config(celery_schedule, app, monitor_name):
113113
return monitor_config
114114

115115

116-
def _patch_beat_apply_entry():
117-
# type: () -> None
116+
def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
117+
# type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None
118118
"""
119-
Makes sure that the Sentry Crons information is set in the Celery Beat task's
120-
headers so that is is monitored with Sentry Crons.
121-
122-
This is only called by Celery Beat. After apply_entry is called
123-
Celery will call apply_async to put the task in the queue.
119+
Add Sentry Crons information to the schedule_entry headers.
124120
"""
125-
from sentry_sdk.integrations.celery import CeleryIntegration
126-
127-
original_apply_entry = Scheduler.apply_entry
128-
129-
def sentry_apply_entry(*args, **kwargs):
130-
# type: (*Any, **Any) -> None
131-
scheduler, schedule_entry = args
132-
app = scheduler.app
133-
134-
celery_schedule = schedule_entry.schedule
135-
monitor_name = schedule_entry.name
136-
137-
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
138-
if integration is None:
139-
return original_apply_entry(*args, **kwargs)
140-
141-
if match_regex_list(monitor_name, integration.exclude_beat_tasks):
142-
return original_apply_entry(*args, **kwargs)
121+
if not integration.monitor_beat_tasks:
122+
return
143123

144-
# Tasks started by Celery Beat start a new Trace
145-
scope = Scope.get_isolation_scope()
146-
scope.set_new_propagation_context()
147-
scope._name = "celery-beat"
124+
monitor_name = schedule_entry.name
148125

149-
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
126+
task_should_be_excluded = match_regex_list(
127+
monitor_name, integration.exclude_beat_tasks
128+
)
129+
if task_should_be_excluded:
130+
return
150131

151-
is_supported_schedule = bool(monitor_config)
152-
if is_supported_schedule:
153-
headers = schedule_entry.options.pop("headers", {})
154-
headers.update(
155-
{
156-
"sentry-monitor-slug": monitor_name,
157-
"sentry-monitor-config": monitor_config,
158-
}
159-
)
132+
celery_schedule = schedule_entry.schedule
133+
app = scheduler.app
160134

161-
check_in_id = capture_checkin(
162-
monitor_slug=monitor_name,
163-
monitor_config=monitor_config,
164-
status=MonitorStatus.IN_PROGRESS,
165-
)
166-
headers.update({"sentry-monitor-check-in-id": check_in_id})
135+
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
167136

168-
# Set the Sentry configuration in the options of the ScheduleEntry.
169-
# Those will be picked up in `apply_async` and added to the headers.
170-
schedule_entry.options["headers"] = headers
137+
is_supported_schedule = bool(monitor_config)
138+
if not is_supported_schedule:
139+
return
171140

172-
return original_apply_entry(*args, **kwargs)
141+
headers = schedule_entry.options.pop("headers", {})
142+
headers.update(
143+
{
144+
"sentry-monitor-slug": monitor_name,
145+
"sentry-monitor-config": monitor_config,
146+
}
147+
)
173148

174-
Scheduler.apply_entry = sentry_apply_entry
149+
check_in_id = capture_checkin(
150+
monitor_slug=monitor_name,
151+
monitor_config=monitor_config,
152+
status=MonitorStatus.IN_PROGRESS,
153+
)
154+
headers.update({"sentry-monitor-check-in-id": check_in_id})
175155

156+
# Set the Sentry configuration in the options of the ScheduleEntry.
157+
# Those will be picked up in `apply_async` and added to the headers.
158+
schedule_entry.options["headers"] = headers
176159

177-
def _patch_redbeat_maybe_due():
178-
# type: () -> None
179160

180-
if RedBeatScheduler is None:
181-
return
161+
def _wrap_beat_scheduler(original_function):
162+
# type: (Callable[..., Any]) -> Callable[..., Any]
163+
"""
164+
Makes sure that:
165+
- a new Sentry trace is started for each task started by Celery Beat and
166+
it is propagated to the task.
167+
- the Sentry Crons information is set in the Celery Beat task's
168+
headers so that is is monitored with Sentry Crons.
169+
170+
After the patched function is called,
171+
Celery Beat will call apply_async to put the task in the queue.
172+
"""
173+
# Patch only once
174+
# Can't use __name__ here, because some of our tests mock original_apply_entry
175+
already_patched = "sentry_patched_scheduler" in str(original_function)
176+
if already_patched:
177+
return original_function
182178

183179
from sentry_sdk.integrations.celery import CeleryIntegration
184180

185-
original_maybe_due = RedBeatScheduler.maybe_due
186-
187-
def sentry_maybe_due(*args, **kwargs):
181+
def sentry_patched_scheduler(*args, **kwargs):
188182
# type: (*Any, **Any) -> None
189-
scheduler, schedule_entry = args
190-
app = scheduler.app
191-
192-
celery_schedule = schedule_entry.schedule
193-
monitor_name = schedule_entry.name
194-
195183
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
196184
if integration is None:
197-
return original_maybe_due(*args, **kwargs)
198-
199-
task_should_be_excluded = match_regex_list(
200-
monitor_name, integration.exclude_beat_tasks
201-
)
202-
if task_should_be_excluded:
203-
return original_maybe_due(*args, **kwargs)
185+
return original_function(*args, **kwargs)
204186

205187
# Tasks started by Celery Beat start a new Trace
206188
scope = Scope.get_isolation_scope()
207189
scope.set_new_propagation_context()
208190
scope._name = "celery-beat"
209191

210-
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
211-
212-
is_supported_schedule = bool(monitor_config)
213-
if is_supported_schedule:
214-
headers = schedule_entry.options.pop("headers", {})
215-
headers.update(
216-
{
217-
"sentry-monitor-slug": monitor_name,
218-
"sentry-monitor-config": monitor_config,
219-
}
220-
)
192+
scheduler, schedule_entry = args
193+
_apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)
221194

222-
check_in_id = capture_checkin(
223-
monitor_slug=monitor_name,
224-
monitor_config=monitor_config,
225-
status=MonitorStatus.IN_PROGRESS,
226-
)
227-
headers.update({"sentry-monitor-check-in-id": check_in_id})
195+
return original_function(*args, **kwargs)
228196

229-
# Set the Sentry configuration in the options of the ScheduleEntry.
230-
# Those will be picked up in `apply_async` and added to the headers.
231-
schedule_entry.options["headers"] = headers
197+
return sentry_patched_scheduler
232198

233-
return original_maybe_due(*args, **kwargs)
234199

235-
RedBeatScheduler.maybe_due = sentry_maybe_due
200+
def _patch_beat_apply_entry():
201+
# type: () -> None
202+
Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
236203

237204

238-
def _setup_celery_beat_signals():
205+
def _patch_redbeat_maybe_due():
239206
# type: () -> None
240-
task_success.connect(crons_task_success)
241-
task_failure.connect(crons_task_failure)
242-
task_retry.connect(crons_task_retry)
207+
if RedBeatScheduler is None:
208+
return
209+
210+
RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due)
211+
212+
213+
def _setup_celery_beat_signals(monitor_beat_tasks):
214+
# type: (bool) -> None
215+
if monitor_beat_tasks:
216+
task_success.connect(crons_task_success)
217+
task_failure.connect(crons_task_failure)
218+
task_retry.connect(crons_task_retry)
243219

244220

245221
def crons_task_success(sender, **kwargs):

sentry_sdk/scope.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -604,9 +604,10 @@ def iter_headers(self):
604604
def iter_trace_propagation_headers(self, *args, **kwargs):
605605
# type: (Any, Any) -> Generator[Tuple[str, str], None, None]
606606
"""
607-
Return HTTP headers which allow propagation of trace data. Data taken
608-
from the span representing the request, if available, or the current
609-
span on the scope if not.
607+
Return HTTP headers which allow propagation of trace data.
608+
609+
If a span is given, the trace data will taken from the span.
610+
If no span is given, the trace data is taken from the scope.
610611
"""
611612
client = Scope.get_client()
612613
if not client.options.get("propagate_traces"):
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import os
2+
import signal
3+
import tempfile
4+
import threading
5+
import time
6+
7+
from celery.beat import Scheduler
8+
9+
from sentry_sdk.utils import logger
10+
11+
12+
class ImmediateScheduler(Scheduler):
13+
"""
14+
A custom scheduler that starts tasks immediately after starting Celery beat.
15+
"""
16+
17+
def setup_schedule(self):
18+
super().setup_schedule()
19+
for _, entry in self.schedule.items():
20+
self.apply_entry(entry)
21+
22+
def tick(self):
23+
# Override tick to prevent the normal schedule cycle
24+
return 1
25+
26+
27+
def kill_beat(beat_pid_file, delay_seconds=1):
28+
"""
29+
Terminates Celery Beat after the given `delay_seconds`.
30+
"""
31+
logger.info("Starting Celery Beat killer...")
32+
time.sleep(delay_seconds)
33+
pid = int(open(beat_pid_file, "r").read())
34+
logger.info("Terminating Celery Beat...")
35+
os.kill(pid, signal.SIGTERM)
36+
37+
38+
def run_beat(celery_app, runtime_seconds=1, loglevel="warning", quiet=True):
39+
"""
40+
Run Celery Beat that immediately starts tasks.
41+
The Celery Beat instance is automatically terminated after `runtime_seconds`.
42+
"""
43+
logger.info("Starting Celery Beat...")
44+
pid_file = os.path.join(tempfile.mkdtemp(), f"celery-beat-{os.getpid()}.pid")
45+
46+
t = threading.Thread(
47+
target=kill_beat,
48+
args=(pid_file,),
49+
kwargs={"delay_seconds": runtime_seconds},
50+
)
51+
t.start()
52+
53+
beat_instance = celery_app.Beat(
54+
loglevel=loglevel,
55+
quiet=quiet,
56+
pidfile=pid_file,
57+
)
58+
beat_instance.run()

0 commit comments

Comments
 (0)