Skip to content

Commit c8fc781

Browse files
authored
Add Celery receive latency (#3174)
Add new header to instrumented celery tasks to calculate `messaging.message.receive.latency`.
1 parent 0878593 commit c8fc781

File tree

4 files changed

+54
-2
lines changed

4 files changed

+54
-2
lines changed

sentry_sdk/consts.py

+5
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,11 @@ class SPANDATA:
301301
Number of retries/attempts to process a message.
302302
"""
303303

304+
MESSAGING_MESSAGE_RECEIVE_LATENCY = "messaging.message.receive.latency"
305+
"""
306+
The latency between when the task was enqueued and when it was started to be processed.
307+
"""
308+
304309
MESSAGING_SYSTEM = "messaging.system"
305310
"""
306311
The messaging system's name, e.g. `kafka`, `aws_sqs`

sentry_sdk/integrations/celery/__init__.py

+22
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
181181
}
182182
)
183183

184+
# Add the time the task was enqueued to the headers
185+
# This is used in the consumer to calculate the latency
186+
updated_headers.update(
187+
{"sentry-task-enqueued-time": _now_seconds_since_epoch()}
188+
)
189+
184190
if headers:
185191
existing_baggage = updated_headers.get(BAGGAGE_HEADER_NAME)
186192
sentry_baggage = headers.get(BAGGAGE_HEADER_NAME)
@@ -360,12 +366,28 @@ def _inner(*args, **kwargs):
360366
op=OP.QUEUE_PROCESS, description=task.name
361367
) as span:
362368
_set_messaging_destination_name(task, span)
369+
370+
latency = None
371+
with capture_internal_exceptions():
372+
if (
373+
task.request.headers is not None
374+
and "sentry-task-enqueued-time" in task.request.headers
375+
):
376+
latency = _now_seconds_since_epoch() - task.request.headers.pop(
377+
"sentry-task-enqueued-time"
378+
)
379+
380+
if latency is not None:
381+
span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
382+
363383
with capture_internal_exceptions():
364384
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
385+
365386
with capture_internal_exceptions():
366387
span.set_data(
367388
SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
368389
)
390+
369391
with capture_internal_exceptions():
370392
span.set_data(
371393
SPANDATA.MESSAGING_SYSTEM,

tests/integrations/celery/test_celery.py

+16
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ def dummy_task(self, x, y):
530530
# Newly added headers
531531
expected_headers["sentry-trace"] = mock.ANY
532532
expected_headers["baggage"] = mock.ANY
533+
expected_headers["sentry-task-enqueued-time"] = mock.ANY
533534

534535
assert result.get() == expected_headers
535536

@@ -754,3 +755,18 @@ def task(): ...
754755
assert span["data"]["messaging.message.retry.count"] == 0
755756

756757
monkeypatch.setattr(kombu.messaging.Producer, "_publish", old_publish)
758+
759+
760+
def test_receive_latency(init_celery, capture_events):
761+
celery = init_celery(traces_sample_rate=1.0)
762+
events = capture_events()
763+
764+
@celery.task()
765+
def task(): ...
766+
767+
task.apply_async()
768+
769+
(event,) = events
770+
(span,) = event["spans"]
771+
assert "messaging.message.receive.latency" in span["data"]
772+
assert span["data"]["messaging.message.receive.latency"] > 0

tests/integrations/celery/test_update_celery_task_headers.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,25 @@ def test_monitor_beat_tasks(monitor_beat_tasks):
2929

3030
if monitor_beat_tasks:
3131
assert updated_headers == {
32-
"headers": {"sentry-monitor-start-timestamp-s": mock.ANY},
32+
"headers": {
33+
"sentry-monitor-start-timestamp-s": mock.ANY,
34+
"sentry-task-enqueued-time": mock.ANY,
35+
},
3336
"sentry-monitor-start-timestamp-s": mock.ANY,
37+
"sentry-task-enqueued-time": mock.ANY,
3438
}
3539
else:
36-
assert updated_headers == headers
40+
assert updated_headers == {
41+
"sentry-task-enqueued-time": mock.ANY,
42+
}
3743

3844

3945
@pytest.mark.parametrize("monitor_beat_tasks", [True, False, None, "", "bla", 1, 0])
4046
def test_monitor_beat_tasks_with_headers(monitor_beat_tasks):
4147
headers = {
4248
"blub": "foo",
4349
"sentry-something": "bar",
50+
"sentry-task-enqueued-time": mock.ANY,
4451
}
4552
span = None
4653

@@ -53,8 +60,10 @@ def test_monitor_beat_tasks_with_headers(monitor_beat_tasks):
5360
"headers": {
5461
"sentry-monitor-start-timestamp-s": mock.ANY,
5562
"sentry-something": "bar",
63+
"sentry-task-enqueued-time": mock.ANY,
5664
},
5765
"sentry-monitor-start-timestamp-s": mock.ANY,
66+
"sentry-task-enqueued-time": mock.ANY,
5867
}
5968
else:
6069
assert updated_headers == headers

0 commit comments

Comments
 (0)