Skip to content

Commit c2c5727

Browse files
fix(celery): close celery.apply spans using task_protocol 1 [backport 2.14] (#10874)
Backport 6346fcb from #10848 to 2.14. This PR fixes an issue where Celery's closing signals got triggered but dd-trace-py skipped closing the `celery.apply` span due to not finding the task id. In celery's `task_protocol: 1`, the id is in the message of the body: https://docs.celeryq.dev/en/main/internals/protocol.html#message-body . The issue with the previous logic is that if the headers does have information (even if the headers were unrelated to the id), it would skip the check of the id in the body: before: ``` if headers: ``` after (this PR): ``` if headers and 'id' in headers: ``` By doing this, we check the headers for the id, then check the body for the id. If it fails to find the task id in the body or header, then it still hits the debug log, `unable to extract the Task and the task_id. This version of Celery may not be supported.` . This PR relates to the goal of #10676 , to close celery spans. If for some reason the logic in this PR fails to close an open `celery.apply` span, #10676 will act as a fail safe and close it. Special Thanks: @timmc-edx for helping us track this down! ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: wantsui <[email protected]>
1 parent 51eb0e2 commit c2c5727

File tree

3 files changed

+103
-10
lines changed

3 files changed

+103
-10
lines changed

ddtrace/contrib/internal/celery/utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,12 @@ def retrieve_task_id(context):
128128
"""
129129
headers = context.get("headers")
130130
body = context.get("body")
131-
if headers:
131+
# Check if the id is in the headers, then check the body for it.
132+
# If we don't check the id first, we could wrongly assume no task_id
133+
# when the task_id is in the body.
134+
if headers and "id" in headers:
132135
# Protocol Version 2 (default from Celery 4.0)
133136
return headers.get("id")
134-
else:
137+
elif body and "id" in body:
135138
# Protocol Version 1
136139
return body.get("id")
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
tracing(celery): Fixes an issue where ``celery.apply`` spans using task_protocol 1 didn't close by improving the check for the task id in the body.

tests/contrib/celery/test_utils.py

Lines changed: 94 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,11 @@ def test_tags_from_context_empty_keys():
131131
assert {} == tags
132132

133133

134-
def test_task_id_from_protocol_v1():
134+
def test_task_id_from_protocol_v1_no_headers():
135135
# ensures a `task_id` is properly returned when Protocol v1 is used.
136136
# `context` is an example of an emitted Signal with Protocol v1
137+
# this test assumes the headers are blank
138+
test_id = "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7"
137139
context = {
138140
"body": {
139141
"expires": None,
@@ -143,7 +145,7 @@ def test_task_id_from_protocol_v1():
143145
"callbacks": None,
144146
"errbacks": None,
145147
"taskset": None,
146-
"id": "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7",
148+
"id": test_id,
147149
"retries": 0,
148150
"task": "tests.contrib.celery.test_integration.fn_task_parameters",
149151
"timelimit": (None, None),
@@ -159,12 +161,87 @@ def test_task_id_from_protocol_v1():
159161
}
160162

161163
task_id = retrieve_task_id(context)
162-
assert task_id == "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7"
164+
assert task_id == test_id
163165

164166

165-
def test_task_id_from_protocol_v2():
167+
def test_task_id_from_protocol_v1_with_headers():
168+
# ensures a `task_id` is properly returned when Protocol v1 is used with headers.
169+
# `context` is an example of an emitted Signal with Protocol v1
170+
# this tests ensures that the headers have other information
171+
test_id = "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7"
172+
context = {
173+
"body": {
174+
"expires": None,
175+
"utc": True,
176+
"args": ["user"],
177+
"chord": None,
178+
"callbacks": None,
179+
"errbacks": None,
180+
"taskset": None,
181+
"id": test_id,
182+
"retries": 0,
183+
"task": "tests.contrib.celery.test_integration.fn_task_parameters",
184+
"timelimit": (None, None),
185+
"eta": None,
186+
"kwargs": {"force_logout": True},
187+
},
188+
"sender": "tests.contrib.celery.test_integration.fn_task_parameters",
189+
"exchange": "celery",
190+
"routing_key": "celery",
191+
"retry_policy": None,
192+
"headers": {
193+
"header1": "value",
194+
"header2": "value",
195+
},
196+
"properties": {},
197+
}
198+
199+
task_id = retrieve_task_id(context)
200+
assert task_id == test_id
201+
202+
203+
def test_task_id_from_protocol_v2_no_body():
166204
# ensures a `task_id` is properly returned when Protocol v2 is used.
167205
# `context` is an example of an emitted Signal with Protocol v2
206+
# this tests assumes the body has no data
207+
test_id = "7e917b83-4018-431d-9832-73a28e1fb6c0"
208+
context = {
209+
"body": {},
210+
"sender": "tests.contrib.celery.test_integration.fn_task_parameters",
211+
"exchange": "",
212+
"routing_key": "celery",
213+
"retry_policy": None,
214+
"headers": {
215+
"origin": "gen83744@hostname",
216+
"root_id": test_id,
217+
"expires": None,
218+
"shadow": None,
219+
"id": test_id,
220+
"kwargsrepr": "{'force_logout': True}",
221+
"lang": "py",
222+
"retries": 0,
223+
"task": "tests.contrib.celery.test_integration.fn_task_parameters",
224+
"group": None,
225+
"timelimit": [None, None],
226+
"parent_id": None,
227+
"argsrepr": "['user']",
228+
"eta": None,
229+
},
230+
"properties": {
231+
"reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca",
232+
"correlation_id": test_id,
233+
},
234+
}
235+
236+
task_id = retrieve_task_id(context)
237+
assert task_id == test_id
238+
239+
240+
def test_task_id_from_protocol_v2_with_body():
241+
# ensures a `task_id` is properly returned when Protocol v2 is used.
242+
# `context` is an example of an emitted Signal with Protocol v2
243+
# this tests assumes the body has data
244+
test_id = "7e917b83-4018-431d-9832-73a28e1fb6c0"
168245
context = {
169246
"body": (
170247
["user"],
@@ -177,10 +254,10 @@ def test_task_id_from_protocol_v2():
177254
"retry_policy": None,
178255
"headers": {
179256
"origin": "gen83744@hostname",
180-
"root_id": "7e917b83-4018-431d-9832-73a28e1fb6c0",
257+
"root_id": test_id,
181258
"expires": None,
182259
"shadow": None,
183-
"id": "7e917b83-4018-431d-9832-73a28e1fb6c0",
260+
"id": test_id,
184261
"kwargsrepr": "{'force_logout': True}",
185262
"lang": "py",
186263
"retries": 0,
@@ -193,9 +270,18 @@ def test_task_id_from_protocol_v2():
193270
},
194271
"properties": {
195272
"reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca",
196-
"correlation_id": "7e917b83-4018-431d-9832-73a28e1fb6c0",
273+
"correlation_id": test_id,
197274
},
198275
}
199276

200277
task_id = retrieve_task_id(context)
201-
assert task_id == "7e917b83-4018-431d-9832-73a28e1fb6c0"
278+
assert task_id == test_id
279+
280+
281+
def test_task_id_from_blank_context():
282+
# if there is no context (thus missing headers and body),
283+
# no task_id is returned
284+
context = {}
285+
286+
task_id = retrieve_task_id(context)
287+
assert task_id is None

0 commit comments

Comments
 (0)