Skip to content

Commit 9be899e

Browse files
authored
Added context propagation support to celery instrumentation (#1135)
1 parent 28e3a39 commit 9be899e

File tree

12 files changed

+227
-82
lines changed

12 files changed

+227
-82
lines changed

instrumentation/opentelemetry-instrumentation-celery/CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## Unreleased
44

5+
- Span operation names now include the task type. ([#1135](https://github.com/open-telemetry/opentelemetry-python/pull/1135))
6+
- Added automatic context propagation. ([#1135](https://github.com/open-telemetry/opentelemetry-python/pull/1135))
7+
58
## Version 0.12b0
69

710
Released 2020-08-14

instrumentation/opentelemetry-instrumentation-celery/README.rst

+20-2
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,20 @@ Usage
2929

3030
.. code-block:: python
3131
32+
from opentelemetry import trace
33+
from opentelemetry.sdk.trace import TracerProvider
34+
from opentelemetry.sdk.trace.export import BatchExportSpanProcessor
3235
from opentelemetry.instrumentation.celery import CeleryInstrumentor
3336
34-
CeleryInstrumentor().instrument()
35-
3637
from celery import Celery
38+
from celery.signals import worker_process_init
39+
40+
@worker_process_init.connect(weak=False)
41+
def init_celery_tracing(*args, **kwargs):
42+
trace.set_tracer_provider(TracerProvider())
43+
span_processor = BatchExportSpanProcessor(ConsoleSpanExporter())
44+
trace.get_tracer_provider().add_span_processor(span_processor)
45+
CeleryInstrumentor().instrument()
3746
3847
app = Celery("tasks", broker="amqp://localhost")
3948
@@ -43,6 +52,15 @@ Usage
4352
4453
add.delay(42, 50)
4554
55+
56+
Setting up tracing
57+
--------------------
58+
59+
When tracing a celery worker process, tracing and instrumention both must be initialized after the celery worker
60+
process is initialized. This is required for any tracing components that might use threading to work correctly
61+
such as the BatchExportSpanProcessor. Celery provides a signal called ``worker_process_init`` that can be used to
62+
accomplish this as shown in the example above.
63+
4664
References
4765
----------
4866
* `OpenTelemetry Celery Instrumentation <https://opentelemetry-python.readthedocs.io/en/latest/ext/celery/celery.html>`_

instrumentation/opentelemetry-instrumentation-celery/setup.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ install_requires =
4646
[options.extras_require]
4747
test =
4848
pytest
49+
celery ~= 4.0
4950
opentelemetry-test == 0.14.dev0
5051

5152
[options.packages.find]

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

+37-5
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,20 @@
3030
3131
.. code:: python
3232
33+
from opentelemetry import trace
34+
from opentelemetry.sdk.trace import TracerProvider
35+
from opentelemetry.sdk.trace.export import BatchExportSpanProcessor
3336
from opentelemetry.instrumentation.celery import CeleryInstrumentor
3437
35-
CeleryInstrumentor().instrument()
36-
3738
from celery import Celery
39+
from celery.signals import worker_process_init
40+
41+
@worker_process_init.connect(weak=False)
42+
def init_celery_tracing(*args, **kwargs):
43+
trace.set_tracer_provider(TracerProvider())
44+
span_processor = BatchExportSpanProcessor(ConsoleSpanExporter())
45+
trace.get_tracer_provider().add_span_processor(span_processor)
46+
CeleryInstrumentor().instrument()
3847
3948
app = Celery("tasks", broker="amqp://localhost")
4049
@@ -50,13 +59,15 @@ def add(x, y):
5059

5160
import logging
5261
import signal
62+
from collections.abc import Iterable
5363

5464
from celery import signals # pylint: disable=no-name-in-module
5565

56-
from opentelemetry import trace
66+
from opentelemetry import propagators, trace
5767
from opentelemetry.instrumentation.celery import utils
5868
from opentelemetry.instrumentation.celery.version import __version__
5969
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
70+
from opentelemetry.trace.propagation import get_current_span
6071
from opentelemetry.trace.status import Status, StatusCanonicalCode
6172

6273
logger = logging.getLogger(__name__)
@@ -106,9 +117,16 @@ def _trace_prerun(self, *args, **kwargs):
106117
if task is None or task_id is None:
107118
return
108119

120+
request = task.request
121+
tracectx = propagators.extract(carrier_extractor, request) or {}
122+
parent = get_current_span(tracectx)
123+
109124
logger.debug("prerun signal start task_id=%s", task_id)
110125

111-
span = self._tracer.start_span(task.name, kind=trace.SpanKind.CONSUMER)
126+
operation_name = "{0}/{1}".format(_TASK_RUN, task.name)
127+
span = self._tracer.start_span(
128+
operation_name, parent=parent, kind=trace.SpanKind.CONSUMER
129+
)
112130

113131
activation = self._tracer.use_span(span, end_on_exit=True)
114132
activation.__enter__()
@@ -146,7 +164,10 @@ def _trace_before_publish(self, *args, **kwargs):
146164
if task is None or task_id is None:
147165
return
148166

149-
span = self._tracer.start_span(task.name, kind=trace.SpanKind.PRODUCER)
167+
operation_name = "{0}/{1}".format(_TASK_APPLY_ASYNC, task.name)
168+
span = self._tracer.start_span(
169+
operation_name, kind=trace.SpanKind.PRODUCER
170+
)
150171

151172
# apply some attributes here because most of the data is not available
152173
span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC)
@@ -158,6 +179,10 @@ def _trace_before_publish(self, *args, **kwargs):
158179
activation.__enter__()
159180
utils.attach_span(task, task_id, (span, activation), is_publish=True)
160181

182+
headers = kwargs.get("headers")
183+
if headers:
184+
propagators.inject(type(headers).__setitem__, headers)
185+
161186
@staticmethod
162187
def _trace_after_publish(*args, **kwargs):
163188
task = utils.retrieve_task_from_sender(kwargs)
@@ -221,3 +246,10 @@ def _trace_retry(*args, **kwargs):
221246
# Use `str(reason)` instead of `reason.message` in case we get
222247
# something that isn't an `Exception`
223248
span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason))
249+
250+
251+
def carrier_extractor(carrier, key):
252+
value = getattr(carrier, key, [])
253+
if isinstance(value, str) or not isinstance(value, Iterable):
254+
value = (value,)
255+
return value
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from celery import Celery
16+
17+
18+
class Config:
19+
result_backend = "rpc"
20+
broker_backend = "memory"
21+
22+
23+
app = Celery(broker="memory:///")
24+
app.config_from_object(Config)
25+
26+
27+
@app.task
28+
def task_add(num_a, num_b):
29+
return num_a + num_b
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import threading
16+
import time
17+
18+
from opentelemetry.instrumentation.celery import CeleryInstrumentor
19+
from opentelemetry.test.test_base import TestBase
20+
from opentelemetry.trace import SpanKind
21+
22+
from .celery_test_tasks import app, task_add
23+
24+
25+
class TestCeleryInstrumentation(TestBase):
26+
def setUp(self):
27+
super().setUp()
28+
self._worker = app.Worker(app=app, pool="solo", concurrency=1)
29+
self._thread = threading.Thread(target=self._worker.start)
30+
self._thread.daemon = True
31+
self._thread.start()
32+
33+
def tearDown(self):
34+
super().tearDown()
35+
self._worker.stop()
36+
self._thread.join()
37+
38+
def test_task(self):
39+
CeleryInstrumentor().instrument()
40+
41+
result = task_add.delay(1, 2)
42+
while not result.ready():
43+
time.sleep(0.05)
44+
45+
spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
46+
self.assertEqual(len(spans), 2)
47+
48+
consumer, producer = spans
49+
50+
self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add")
51+
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
52+
self.assert_span_has_attributes(
53+
consumer,
54+
{
55+
"celery.action": "run",
56+
"celery.state": "SUCCESS",
57+
"messaging.destination": "celery",
58+
"celery.task_name": "tests.celery_test_tasks.task_add",
59+
},
60+
)
61+
62+
self.assertEqual(
63+
producer.name, "apply_async/tests.celery_test_tasks.task_add"
64+
)
65+
self.assertEqual(producer.kind, SpanKind.PRODUCER)
66+
self.assert_span_has_attributes(
67+
producer,
68+
{
69+
"celery.action": "apply_async",
70+
"celery.task_name": "tests.celery_test_tasks.task_add",
71+
"messaging.destination_kind": "queue",
72+
"messaging.destination": "celery",
73+
},
74+
)
75+
76+
self.assertNotEqual(consumer.parent, producer.context)
77+
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
78+
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

0 commit comments

Comments
 (0)