Skip to content

Commit bc59bc5

Browse files
authored
Merge branch 'main' into feature/1765-add-otelTraceSampled-logging
2 parents 979ed59 + 46e4b1d commit bc59bc5

File tree

5 files changed

+72
-3
lines changed

5 files changed

+72
-3
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2121
([#1730](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1730))
2222
- Make ASGI request span attributes available for `start_span`.
2323
([#1762](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1762))
24+
- `opentelemetry-instrumentation-celery` Add support for anonymous tasks.
25+
([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407)
26+
2427

2528
### Fixed
2629

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,17 @@ def _trace_before_publish(self, *args, **kwargs):
183183
task = utils.retrieve_task_from_sender(kwargs)
184184
task_id = utils.retrieve_task_id_from_message(kwargs)
185185

186-
if task is None or task_id is None:
186+
if task_id is None:
187187
return
188188

189-
operation_name = f"{_TASK_APPLY_ASYNC}/{task.name}"
189+
if task is None:
190+
# task is an anonymous task send using send_task or using canvas workflow
191+
# Signatures() to send to a task not in the current processes dependency
192+
# tree
193+
task_name = kwargs.get("sender", "unknown")
194+
else:
195+
task_name = task.name
196+
operation_name = f"{_TASK_APPLY_ASYNC}/{task_name}"
190197
span = self._tracer.start_span(
191198
operation_name, kind=trace.SpanKind.PRODUCER
192199
)
@@ -195,7 +202,7 @@ def _trace_before_publish(self, *args, **kwargs):
195202
if span.is_recording():
196203
span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC)
197204
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id)
198-
span.set_attribute(_TASK_NAME_KEY, task.name)
205+
span.set_attribute(_TASK_NAME_KEY, task_name)
199206
utils.set_attributes_from_context(span, kwargs)
200207

201208
activation = trace.use_span(span, end_on_exit=True)

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py

+2
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ def attach_span(task, task_id, span, is_publish=False):
132132
NOTE: We cannot test for this well yet, because we do not run a celery worker,
133133
and cannot run `task.apply_async()`
134134
"""
135+
if task is None:
136+
return
135137
span_dict = getattr(task, CTX_KEY, None)
136138
if span_dict is None:
137139
span_dict = {}

instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py

+50
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def tearDown(self):
3636
CeleryInstrumentor().uninstrument()
3737
self._worker.stop()
3838
self._thread.join()
39+
CeleryInstrumentor().uninstrument()
3940

4041
def test_task(self):
4142
CeleryInstrumentor().instrument()
@@ -97,3 +98,52 @@ def test_uninstrument(self):
9798

9899
spans = self.memory_exporter.get_finished_spans()
99100
self.assertEqual(len(spans), 0)
101+
102+
103+
class TestCelerySignatureTask(TestBase):
104+
def setUp(self):
105+
super().setUp()
106+
107+
def start_app(*args, **kwargs):
108+
# Add an additional task that will not be registered with parent thread
109+
@app.task
110+
def hidden_task(num_a):
111+
return num_a * 2
112+
113+
self._worker = app.Worker(app=app, pool="solo", concurrency=1)
114+
return self._worker.start(*args, **kwargs)
115+
116+
self._thread = threading.Thread(target=start_app)
117+
self._worker = app.Worker(app=app, pool="solo", concurrency=1)
118+
self._thread.daemon = True
119+
self._thread.start()
120+
121+
def tearDown(self):
122+
super().tearDown()
123+
self._worker.stop()
124+
self._thread.join()
125+
CeleryInstrumentor().uninstrument()
126+
127+
def test_hidden_task(self):
128+
# no-op since already instrumented
129+
CeleryInstrumentor().instrument()
130+
131+
res = app.signature("tests.test_tasks.hidden_task", (2,)).apply_async()
132+
while not res.ready():
133+
time.sleep(0.05)
134+
spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
135+
self.assertEqual(len(spans), 2)
136+
137+
consumer, producer = spans
138+
139+
self.assertEqual(consumer.name, "run/tests.test_tasks.hidden_task")
140+
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
141+
142+
self.assertEqual(
143+
producer.name, "apply_async/tests.test_tasks.hidden_task"
144+
)
145+
self.assertEqual(producer.kind, SpanKind.PRODUCER)
146+
147+
self.assertNotEqual(consumer.parent, producer.context)
148+
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
149+
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py

+7
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@ def fn_task():
185185
utils.detach_span(fn_task, task_id)
186186
self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None))
187187

188+
def test_optional_task_span_attach(self):
189+
task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f"
190+
span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext))
191+
192+
# assert this is is a no-aop
193+
self.assertIsNone(utils.attach_span(None, task_id, span))
194+
188195
def test_span_delete_empty(self):
189196
# ensure detach_span doesn't raise an exception if span is not present
190197
@self.app.task

0 commit comments

Comments
 (0)