forked from getsentry/sentry-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
535 lines (430 loc) · 18.4 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
import sys
import warnings
from collections.abc import Mapping
from functools import wraps
import sentry_sdk
from sentry_sdk import isolation_scope
from sentry_sdk.api import continue_trace
from sentry_sdk.consts import OP, SPANSTATUS, SPANDATA
from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable
from sentry_sdk.integrations.celery.beat import (
_patch_beat_apply_entry,
_patch_redbeat_maybe_due,
_setup_celery_beat_signals,
)
from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, TRANSACTION_SOURCE_TASK
from sentry_sdk.tracing_utils import Baggage
from sentry_sdk.utils import (
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
reraise,
)
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any
from typing import Callable
from typing import List
from typing import Optional
from typing import TypeVar
from typing import Union
from sentry_sdk._types import EventProcessor, Event, Hint, ExcInfo
from sentry_sdk.tracing import Span
F = TypeVar("F", bound=Callable[..., Any])
try:
from celery import VERSION as CELERY_VERSION # type: ignore
from celery.app.task import Task # type: ignore
from celery.app.trace import task_has_custom
from celery.exceptions import ( # type: ignore
Ignore,
Reject,
Retry,
SoftTimeLimitExceeded,
)
from kombu import Producer # type: ignore
except ImportError:
raise DidNotEnable("Celery not installed")
CELERY_CONTROL_FLOW_EXCEPTIONS = (Retry, Ignore, Reject)
class CeleryIntegration(Integration):
identifier = "celery"
origin = f"auto.queue.{identifier}"
def __init__(
self,
propagate_traces=True,
monitor_beat_tasks=False,
exclude_beat_tasks=None,
):
# type: (bool, bool, Optional[List[str]]) -> None
warnings.warn(
"The `propagate_traces` parameter is deprecated. Please use `trace_propagation_targets` instead.",
DeprecationWarning,
stacklevel=2,
)
self.propagate_traces = propagate_traces
self.monitor_beat_tasks = monitor_beat_tasks
self.exclude_beat_tasks = exclude_beat_tasks
_patch_beat_apply_entry()
_patch_redbeat_maybe_due()
_setup_celery_beat_signals(monitor_beat_tasks)
@staticmethod
def setup_once():
# type: () -> None
_check_minimum_version(CeleryIntegration, CELERY_VERSION)
_patch_build_tracer()
_patch_task_apply_async()
_patch_celery_send_task()
_patch_worker_exit()
_patch_producer_publish()
# This logger logs every status of every task that ran on the worker.
# Meaning that every task's breadcrumbs are full of stuff like "Task
# <foo> raised unexpected <bar>".
ignore_logger("celery.worker.job")
ignore_logger("celery.app.trace")
# This is stdout/err redirected to a logger, can't deal with this
# (need event_level=logging.WARN to reproduce)
ignore_logger("celery.redirected")
def _set_status(status):
# type: (str) -> None
with capture_internal_exceptions():
scope = sentry_sdk.get_current_scope()
if scope.span is not None:
scope.span.set_status(status)
def _capture_exception(task, exc_info):
# type: (Any, ExcInfo) -> None
client = sentry_sdk.get_client()
if client.get_integration(CeleryIntegration) is None:
return
if isinstance(exc_info[1], CELERY_CONTROL_FLOW_EXCEPTIONS):
# ??? Doesn't map to anything
_set_status("aborted")
return
_set_status("internal_error")
if hasattr(task, "throws") and isinstance(exc_info[1], task.throws):
return
event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={"type": "celery", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)
def _make_event_processor(task, uuid, args, kwargs, request=None):
# type: (Any, Any, Any, Any, Optional[Any]) -> EventProcessor
def event_processor(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
tags = event.setdefault("tags", {})
tags["celery_task_id"] = uuid
extra = event.setdefault("extra", {})
extra["celery-job"] = {
"task_name": task.name,
"args": args,
"kwargs": kwargs,
}
if "exc_info" in hint:
with capture_internal_exceptions():
if issubclass(hint["exc_info"][0], SoftTimeLimitExceeded):
event["fingerprint"] = [
"celery",
"SoftTimeLimitExceeded",
getattr(task, "name", task),
]
return event
return event_processor
def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
# type: (dict[str, Any], Optional[Span], bool) -> dict[str, Any]
"""
Updates the headers of the Celery task with the tracing information
and eventually Sentry Crons monitoring information for beat tasks.
"""
updated_headers = original_headers.copy()
with capture_internal_exceptions():
# if span is None (when the task was started by Celery Beat)
# this will return the trace headers from the scope.
headers = dict(
sentry_sdk.get_isolation_scope().iter_trace_propagation_headers(span=span)
)
if monitor_beat_tasks:
headers.update(
{
"sentry-monitor-start-timestamp-s": "%.9f"
% _now_seconds_since_epoch(),
}
)
# Add the time the task was enqueued to the headers
# This is used in the consumer to calculate the latency
updated_headers.update(
{"sentry-task-enqueued-time": _now_seconds_since_epoch()}
)
if headers:
existing_baggage = updated_headers.get(BAGGAGE_HEADER_NAME)
sentry_baggage = headers.get(BAGGAGE_HEADER_NAME)
combined_baggage = sentry_baggage or existing_baggage
if sentry_baggage and existing_baggage:
# Merge incoming and sentry baggage, where the sentry trace information
# in the incoming baggage takes precedence and the third-party items
# are concatenated.
incoming = Baggage.from_incoming_header(existing_baggage)
combined = Baggage.from_incoming_header(sentry_baggage)
combined.sentry_items.update(incoming.sentry_items)
combined.third_party_items = ",".join(
[
x
for x in [
combined.third_party_items,
incoming.third_party_items,
]
if x is not None and x != ""
]
)
combined_baggage = combined.serialize(include_third_party=True)
updated_headers.update(headers)
if combined_baggage:
updated_headers[BAGGAGE_HEADER_NAME] = combined_baggage
# https://github.com/celery/celery/issues/4875
#
# Need to setdefault the inner headers too since other
# tracing tools (dd-trace-py) also employ this exact
# workaround and we don't want to break them.
updated_headers.setdefault("headers", {}).update(headers)
if combined_baggage:
updated_headers["headers"][BAGGAGE_HEADER_NAME] = combined_baggage
# Add the Sentry options potentially added in `sentry_apply_entry`
# to the headers (done when auto-instrumenting Celery Beat tasks)
for key, value in updated_headers.items():
if key.startswith("sentry-"):
updated_headers["headers"][key] = value
return updated_headers
class NoOpMgr:
def __enter__(self):
# type: () -> None
return None
def __exit__(self, exc_type, exc_value, traceback):
# type: (Any, Any, Any) -> None
return None
def _wrap_task_run(f):
# type: (F) -> F
@wraps(f)
def apply_async(*args, **kwargs):
# type: (*Any, **Any) -> Any
# Note: kwargs can contain headers=None, so no setdefault!
# Unsure which backend though.
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
if integration is None:
return f(*args, **kwargs)
kwarg_headers = kwargs.get("headers") or {}
propagate_traces = kwarg_headers.pop(
"sentry-propagate-traces", integration.propagate_traces
)
if not propagate_traces:
return f(*args, **kwargs)
if isinstance(args[0], Task):
task_name = args[0].name # type: str
elif len(args) > 1 and isinstance(args[1], str):
task_name = args[1]
else:
task_name = "<unknown Celery task>"
task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat"
span_mgr = (
sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_CELERY,
name=task_name,
origin=CeleryIntegration.origin,
)
if not task_started_from_beat
else NoOpMgr()
) # type: Union[Span, NoOpMgr]
with span_mgr as span:
kwargs["headers"] = _update_celery_task_headers(
kwarg_headers, span, integration.monitor_beat_tasks
)
return f(*args, **kwargs)
return apply_async # type: ignore
def _wrap_tracer(task, f):
# type: (Any, F) -> F
# Need to wrap tracer for pushing the scope before prerun is sent, and
# popping it after postrun is sent.
#
# This is the reason we don't use signals for hooking in the first place.
# Also because in Celery 3, signal dispatch returns early if one handler
# crashes.
@wraps(f)
@ensure_integration_enabled(CeleryIntegration, f)
def _inner(*args, **kwargs):
# type: (*Any, **Any) -> Any
with isolation_scope() as scope:
scope._name = "celery"
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(task, *args, **kwargs))
transaction = None
# Celery task objects are not a thing to be trusted. Even
# something such as attribute access can fail.
with capture_internal_exceptions():
headers = args[3].get("headers") or {}
transaction = continue_trace(
headers,
op=OP.QUEUE_TASK_CELERY,
name="unknown celery task",
source=TRANSACTION_SOURCE_TASK,
origin=CeleryIntegration.origin,
)
transaction.name = task.name
transaction.set_status(SPANSTATUS.OK)
if transaction is None:
return f(*args, **kwargs)
with sentry_sdk.start_transaction(
transaction,
custom_sampling_context={
"celery_job": {
"task": task.name,
# for some reason, args[1] is a list if non-empty but a
# tuple if empty
"args": list(args[1]),
"kwargs": args[2],
}
},
):
return f(*args, **kwargs)
return _inner # type: ignore
def _set_messaging_destination_name(task, span):
# type: (Any, Span) -> None
"""Set "messaging.destination.name" tag for span"""
with capture_internal_exceptions():
delivery_info = task.request.delivery_info
if delivery_info:
routing_key = delivery_info.get("routing_key")
if delivery_info.get("exchange") == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning the tasks
# are sent to the queue with the same name as the routing key.
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
def _wrap_task_call(task, f):
# type: (Any, F) -> F
# Need to wrap task call because the exception is caught before we get to
# see it. Also celery's reported stacktrace is untrustworthy.
# functools.wraps is important here because celery-once looks at this
# method's name. @ensure_integration_enabled internally calls functools.wraps,
# but if we ever remove the @ensure_integration_enabled decorator, we need
# to add @functools.wraps(f) here.
# https://github.com/getsentry/sentry-python/issues/421
@ensure_integration_enabled(CeleryIntegration, f)
def _inner(*args, **kwargs):
# type: (*Any, **Any) -> Any
try:
with sentry_sdk.start_span(
op=OP.QUEUE_PROCESS,
name=task.name,
origin=CeleryIntegration.origin,
) as span:
_set_messaging_destination_name(task, span)
latency = None
with capture_internal_exceptions():
if (
task.request.headers is not None
and "sentry-task-enqueued-time" in task.request.headers
):
latency = _now_seconds_since_epoch() - task.request.headers.pop(
"sentry-task-enqueued-time"
)
if latency is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
with capture_internal_exceptions():
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
with capture_internal_exceptions():
span.set_data(
SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
)
with capture_internal_exceptions():
span.set_data(
SPANDATA.MESSAGING_SYSTEM,
task.app.connection().transport.driver_type,
)
return f(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(task, exc_info)
reraise(*exc_info)
return _inner # type: ignore
def _patch_build_tracer():
# type: () -> None
import celery.app.trace as trace # type: ignore
original_build_tracer = trace.build_tracer
def sentry_build_tracer(name, task, *args, **kwargs):
# type: (Any, Any, *Any, **Any) -> Any
if not getattr(task, "_sentry_is_patched", False):
# determine whether Celery will use __call__ or run and patch
# accordingly
if task_has_custom(task, "__call__"):
type(task).__call__ = _wrap_task_call(task, type(task).__call__)
else:
task.run = _wrap_task_call(task, task.run)
# `build_tracer` is apparently called for every task
# invocation. Can't wrap every celery task for every invocation
# or we will get infinitely nested wrapper functions.
task._sentry_is_patched = True
return _wrap_tracer(task, original_build_tracer(name, task, *args, **kwargs))
trace.build_tracer = sentry_build_tracer
def _patch_task_apply_async():
# type: () -> None
Task.apply_async = _wrap_task_run(Task.apply_async)
def _patch_celery_send_task():
# type: () -> None
from celery import Celery
Celery.send_task = _wrap_task_run(Celery.send_task)
def _patch_worker_exit():
# type: () -> None
# Need to flush queue before worker shutdown because a crashing worker will
# call os._exit
from billiard.pool import Worker # type: ignore
original_workloop = Worker.workloop
def sentry_workloop(*args, **kwargs):
# type: (*Any, **Any) -> Any
try:
return original_workloop(*args, **kwargs)
finally:
with capture_internal_exceptions():
if (
sentry_sdk.get_client().get_integration(CeleryIntegration)
is not None
):
sentry_sdk.flush()
Worker.workloop = sentry_workloop
def _patch_producer_publish():
# type: () -> None
original_publish = Producer.publish
@ensure_integration_enabled(CeleryIntegration, original_publish)
def sentry_publish(self, *args, **kwargs):
# type: (Producer, *Any, **Any) -> Any
kwargs_headers = kwargs.get("headers", {})
if not isinstance(kwargs_headers, Mapping):
# Ensure kwargs_headers is a Mapping, so we can safely call get().
# We don't expect this to happen, but it's better to be safe. Even
# if it does happen, only our instrumentation breaks. This line
# does not overwrite kwargs["headers"], so the original publish
# method will still work.
kwargs_headers = {}
task_name = kwargs_headers.get("task")
task_id = kwargs_headers.get("id")
retries = kwargs_headers.get("retries")
routing_key = kwargs.get("routing_key")
exchange = kwargs.get("exchange")
with sentry_sdk.start_span(
op=OP.QUEUE_PUBLISH,
name=task_name,
origin=CeleryIntegration.origin,
) as span:
if task_id is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id)
if exchange == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning messages are
# routed to the queue with the same name as the routing key.
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
if retries is not None:
span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)
with capture_internal_exceptions():
span.set_data(
SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type
)
return original_publish(self, *args, **kwargs)
Producer.publish = sentry_publish