Skip to content

Commit 1dd17ed

Browse files
Akochavishalevr
andauthored
Add metrics instrumentation celery (#1679)
Co-authored-by: Shalev Roda <[email protected]>
1 parent 7804083 commit 1dd17ed

File tree

3 files changed

+123
-2
lines changed

3 files changed

+123
-2
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
1011
### Added
1112

1213
- Make Flask request span attributes available for `start_span`.
@@ -16,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1617
- Instrument all httpx versions >= 0.18. ([#1748](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1748))
1718
- Fix `Invalid type NoneType for attribute X (opentelemetry-instrumentation-aws-lambda)` error when some attributes do not exist
1819
([#1780](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1780))
20+
- Add metric instrumentation for celery
21+
([#1679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1679))
1922

2023
## Version 1.18.0/0.39b0 (2023-05-10)
2124

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

+44-2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def add(x, y):
6060
"""
6161

6262
import logging
63+
from timeit import default_timer
6364
from typing import Collection, Iterable
6465

6566
from celery import signals # pylint: disable=no-name-in-module
@@ -69,6 +70,7 @@ def add(x, y):
6970
from opentelemetry.instrumentation.celery.package import _instruments
7071
from opentelemetry.instrumentation.celery.version import __version__
7172
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
73+
from opentelemetry.metrics import get_meter
7274
from opentelemetry.propagate import extract, inject
7375
from opentelemetry.propagators.textmap import Getter
7476
from opentelemetry.semconv.trace import SpanAttributes
@@ -104,6 +106,11 @@ def keys(self, carrier):
104106

105107

106108
class CeleryInstrumentor(BaseInstrumentor):
109+
def __init__(self):
110+
super().__init__()
111+
self.metrics = None
112+
self.task_id_to_start_time = {}
113+
107114
def instrumentation_dependencies(self) -> Collection[str]:
108115
return _instruments
109116

@@ -113,6 +120,11 @@ def _instrument(self, **kwargs):
113120
# pylint: disable=attribute-defined-outside-init
114121
self._tracer = trace.get_tracer(__name__, __version__, tracer_provider)
115122

123+
meter_provider = kwargs.get("meter_provider")
124+
meter = get_meter(__name__, __version__, meter_provider)
125+
126+
self.create_celery_metrics(meter)
127+
116128
signals.task_prerun.connect(self._trace_prerun, weak=False)
117129
signals.task_postrun.connect(self._trace_postrun, weak=False)
118130
signals.before_task_publish.connect(
@@ -139,6 +151,7 @@ def _trace_prerun(self, *args, **kwargs):
139151
if task is None or task_id is None:
140152
return
141153

154+
self.update_task_duration_time(task_id)
142155
request = task.request
143156
tracectx = extract(request, getter=celery_getter) or None
144157

@@ -153,8 +166,7 @@ def _trace_prerun(self, *args, **kwargs):
153166
activation.__enter__() # pylint: disable=E1101
154167
utils.attach_span(task, task_id, (span, activation))
155168

156-
@staticmethod
157-
def _trace_postrun(*args, **kwargs):
169+
def _trace_postrun(self, *args, **kwargs):
158170
task = utils.retrieve_task(kwargs)
159171
task_id = utils.retrieve_task_id(kwargs)
160172

@@ -178,6 +190,9 @@ def _trace_postrun(*args, **kwargs):
178190

179191
activation.__exit__(None, None, None)
180192
utils.detach_span(task, task_id)
193+
self.update_task_duration_time(task_id)
194+
labels = {"task": task.name, "worker": task.request.hostname}
195+
self._record_histograms(task_id, labels)
181196

182197
def _trace_before_publish(self, *args, **kwargs):
183198
task = utils.retrieve_task_from_sender(kwargs)
@@ -277,3 +292,30 @@ def _trace_retry(*args, **kwargs):
277292
# Use `str(reason)` instead of `reason.message` in case we get
278293
# something that isn't an `Exception`
279294
span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason))
295+
296+
def update_task_duration_time(self, task_id):
297+
cur_time = default_timer()
298+
task_duration_time_until_now = (
299+
cur_time - self.task_id_to_start_time[task_id]
300+
if task_id in self.task_id_to_start_time
301+
else cur_time
302+
)
303+
self.task_id_to_start_time[task_id] = task_duration_time_until_now
304+
305+
def _record_histograms(self, task_id, metric_attributes):
306+
if task_id is None:
307+
return
308+
309+
self.metrics["flower.task.runtime.seconds"].record(
310+
self.task_id_to_start_time.get(task_id),
311+
attributes=metric_attributes,
312+
)
313+
314+
def create_celery_metrics(self, meter) -> None:
315+
self.metrics = {
316+
"flower.task.runtime.seconds": meter.create_histogram(
317+
name="flower.task.runtime.seconds",
318+
unit="seconds",
319+
description="The time it took to run the task.",
320+
)
321+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import threading
2+
import time
3+
from timeit import default_timer
4+
5+
from opentelemetry.instrumentation.celery import CeleryInstrumentor
6+
from opentelemetry.test.test_base import TestBase
7+
8+
from .celery_test_tasks import app, task_add
9+
10+
11+
class TestMetrics(TestBase):
12+
def setUp(self):
13+
super().setUp()
14+
self._worker = app.Worker(
15+
app=app, pool="solo", concurrency=1, hostname="celery@akochavi"
16+
)
17+
self._thread = threading.Thread(target=self._worker.start)
18+
self._thread.daemon = True
19+
self._thread.start()
20+
21+
def tearDown(self):
22+
super().tearDown()
23+
self._worker.stop()
24+
self._thread.join()
25+
26+
def get_metrics(self):
27+
result = task_add.delay(1, 2)
28+
29+
timeout = time.time() + 60 * 1 # 1 minutes from now
30+
while not result.ready():
31+
if time.time() > timeout:
32+
break
33+
time.sleep(0.05)
34+
return self.get_sorted_metrics()
35+
36+
def test_basic_metric(self):
37+
CeleryInstrumentor().instrument()
38+
start_time = default_timer()
39+
task_runtime_estimated = (default_timer() - start_time) * 1000
40+
41+
metrics = self.get_metrics()
42+
CeleryInstrumentor().uninstrument()
43+
self.assertEqual(len(metrics), 1)
44+
45+
task_runtime = metrics[0]
46+
print(task_runtime)
47+
self.assertEqual(task_runtime.name, "flower.task.runtime.seconds")
48+
self.assert_metric_expected(
49+
task_runtime,
50+
[
51+
self.create_histogram_data_point(
52+
count=1,
53+
sum_data_point=task_runtime_estimated,
54+
max_data_point=task_runtime_estimated,
55+
min_data_point=task_runtime_estimated,
56+
attributes={
57+
"task": "tests.celery_test_tasks.task_add",
58+
"worker": "celery@akochavi",
59+
},
60+
)
61+
],
62+
est_value_delta=200,
63+
)
64+
65+
def test_metric_uninstrument(self):
66+
CeleryInstrumentor().instrument()
67+
metrics = self.get_metrics()
68+
self.assertEqual(len(metrics), 1)
69+
CeleryInstrumentor().uninstrument()
70+
71+
metrics = self.get_metrics()
72+
self.assertEqual(len(metrics), 1)
73+
74+
for metric in metrics:
75+
for point in list(metric.data.data_points):
76+
self.assertEqual(point.count, 1)

0 commit comments

Comments
 (0)