From 86a7f8692a39342240066fda63b63a819d59e45c Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Thu, 16 Feb 2023 13:27:38 +0200 Subject: [PATCH 01/13] Add metrics instrumentation celery --- instrumentation/README.md | 2 +- .../instrumentation/celery/__init__.py | 41 ++++++- .../instrumentation/celery/package.py | 1 + .../tests/test_metrics.py | 109 ++++++++++++++++++ 4 files changed, 150 insertions(+), 3 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py diff --git a/instrumentation/README.md b/instrumentation/README.md index aa71744761..b24d830231 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -10,7 +10,7 @@ | [opentelemetry-instrumentation-boto](./opentelemetry-instrumentation-boto) | boto~=2.0 | No | [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto3 ~= 1.0 | No | [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 | No -| [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | No +| [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | Yes | [opentelemetry-instrumentation-confluent-kafka](./opentelemetry-instrumentation-confluent-kafka) | confluent-kafka >= 1.8.2, < 2.0.0 | No | [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi | No | [opentelemetry-instrumentation-django](./opentelemetry-instrumentation-django) | django >= 1.10 | Yes diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index c6a7540ccd..e8bff28283 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -60,6 +60,7 @@ def add(x, y): """ import logging +from timeit import default_timer from typing import Collection, Iterable from celery import signals # pylint: disable=no-name-in-module @@ -69,6 +70,7 @@ def add(x, y): from opentelemetry.instrumentation.celery.package import _instruments from opentelemetry.instrumentation.celery.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.metrics import get_meter from opentelemetry.propagate import extract, inject from opentelemetry.propagators.textmap import Getter from opentelemetry.semconv.trace import SpanAttributes @@ -104,6 +106,11 @@ def keys(self, carrier): class CeleryInstrumentor(BaseInstrumentor): + def __init__(self): + super().__init__() + self.metrics = None + self.task_id_to_start_time = {} + def instrumentation_dependencies(self) -> Collection[str]: return _instruments @@ -113,6 +120,11 @@ def _instrument(self, **kwargs): # pylint: disable=attribute-defined-outside-init self._tracer = trace.get_tracer(__name__, __version__, tracer_provider) + meter_provider = kwargs.get("meter_provider") + meter = get_meter(__name__, __version__, meter_provider) + + self.create_celery_metrics(meter) + signals.task_prerun.connect(self._trace_prerun, weak=False) signals.task_postrun.connect(self._trace_postrun, weak=False) signals.before_task_publish.connect( @@ -139,6 +151,7 @@ def _trace_prerun(self, *args, **kwargs): if task is None or task_id is None: return + self.update_task_start_time(task_id) request = task.request tracectx = extract(request, getter=celery_getter) or None @@ -153,8 +166,7 @@ def _trace_prerun(self, *args, **kwargs): activation.__enter__() # pylint: disable=E1101 utils.attach_span(task, task_id, (span, activation)) - @staticmethod - def _trace_postrun(*args, **kwargs): + def _trace_postrun(self, *args, **kwargs): task = utils.retrieve_task(kwargs) task_id = utils.retrieve_task_id(kwargs) @@ -178,6 +190,12 @@ def _trace_postrun(*args, **kwargs): activation.__exit__(None, None, None) utils.detach_span(task, task_id) + self.update_task_start_time(task_id) + labels = { + 'task': task.name, + 'worker': task.request.hostname + } + self._record_histograms(task_id, labels) def _trace_before_publish(self, *args, **kwargs): task = utils.retrieve_task_from_sender(kwargs) @@ -270,3 +288,22 @@ def _trace_retry(*args, **kwargs): # Use `str(reason)` instead of `reason.message` in case we get # something that isn't an `Exception` span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason)) + + def update_task_start_time(self, task_id): + cur_time = default_timer() + elapsed_time = cur_time - self.task_id_to_start_time[ + task_id] if task_id in self.task_id_to_start_time else cur_time + self.task_id_to_start_time[task_id] = elapsed_time + + def _record_histograms(self, task_id, metric_attributes): + self.metrics["flower.task.runtime.seconds"].record( + self.task_id_to_start_time.get(task_id), attributes=metric_attributes) + + def create_celery_metrics(self, meter) -> None: + self.metrics = { + "flower.task.runtime.seconds": meter.create_histogram( + name="flower.task.runtime.seconds", + unit="seconds", + description="The time it took to run the task.", + ) + } diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py index ecc4318c67..c90c40293e 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py @@ -14,3 +14,4 @@ _instruments = ("celery >= 4.0, < 6.0",) +_supports_metrics = True diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py new file mode 100644 index 0000000000..7a50452b86 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py @@ -0,0 +1,109 @@ +import time +import threading +from timeit import default_timer +from typing import Union, Optional + +from opentelemetry.sdk.metrics._internal.point import Metric +from opentelemetry.sdk.metrics.export import ( + HistogramDataPoint, + NumberDataPoint, +) +from opentelemetry.test.test_base import TestBase +from opentelemetry.instrumentation.celery import CeleryInstrumentor + +from .celery_test_tasks import task_add, app + + +class TestMetrics(TestBase): + def setUp(self): + super().setUp() + self._worker = app.Worker(app=app, pool="solo", concurrency=1, + hostname='celery@akochavi') + self._thread = threading.Thread(target=self._worker.start) + self._thread.daemon = True + self._thread.start() + + def tearDown(self): + super().tearDown() + self._worker.stop() + self._thread.join() + + def get_metrics(self): + CeleryInstrumentor().instrument() + result = task_add.delay(1, 2) + + timeout = time.time() + 60 * 1 # 1 minutes from now + while not result.ready(): + if time.time() > timeout: + break + time.sleep(0.05) + CeleryInstrumentor().uninstrument() + resource_metrics = ( + self.memory_metrics_reader.get_metrics_data().resource_metrics + ) + + all_metrics = [] + for metrics in resource_metrics: + for scope_metrics in metrics.scope_metrics: + all_metrics.extend(scope_metrics.metrics) + + return all_metrics + + def assert_metric_expected( + self, + metric: Metric, + expected_value: Union[int, float], + expected_attributes: dict = None, + est_delta: Optional[float] = None, + ): + data_point = next(iter(metric.data.data_points)) + + if isinstance(data_point, HistogramDataPoint): + self.assertEqual( + data_point.count, + 1, + ) + if est_delta is None: + self.assertEqual( + data_point.sum, + expected_value, + ) + else: + self.assertAlmostEqual( + data_point.sum, + expected_value, + delta=est_delta, + ) + elif isinstance(data_point, NumberDataPoint): + self.assertEqual( + data_point.value, + expected_value, + ) + + if expected_attributes: + self.assertDictEqual( + expected_attributes, + dict(data_point.attributes), + ) + + def test_basic_metric(self): + start_time = default_timer() + task_runtime_estimated = (default_timer() - start_time) * 1000 + + metrics = self.get_metrics() + self.assertEqual(len(metrics), 1) + + task_runtime = metrics[0] + print(task_runtime) + self.assertEqual( + task_runtime.name, "flower.task.runtime.seconds" + ) + self.assert_metric_expected( + task_runtime, + task_runtime_estimated, + { + 'task': 'tests.celery_test_tasks.task_add', + 'worker': 'celery@akochavi', + }, + est_delta=200, + ) From 6f6aae8ec379dd207d65fd25f9c8034b2d55dd35 Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Thu, 16 Feb 2023 13:35:53 +0200 Subject: [PATCH 02/13] wip --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 52c5df9902..62506ed557 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add metric instrumentation for celery + ([#1679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1679/files)) + ### Added - Support `aio_pika` 9.x (([#1670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1670]) From 1bf999e4dae5c65627de39bf11fcb1ca913c53f7 Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Thu, 16 Feb 2023 14:15:22 +0200 Subject: [PATCH 03/13] lint fixes --- .../instrumentation/celery/__init__.py | 16 +++++++++------- .../tests/test_metrics.py | 13 ++++++------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index e8bff28283..3394978c3c 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -191,10 +191,7 @@ def _trace_postrun(self, *args, **kwargs): activation.__exit__(None, None, None) utils.detach_span(task, task_id) self.update_task_start_time(task_id) - labels = { - 'task': task.name, - 'worker': task.request.hostname - } + labels = {"task": task.name, "worker": task.request.hostname} self._record_histograms(task_id, labels) def _trace_before_publish(self, *args, **kwargs): @@ -291,13 +288,18 @@ def _trace_retry(*args, **kwargs): def update_task_start_time(self, task_id): cur_time = default_timer() - elapsed_time = cur_time - self.task_id_to_start_time[ - task_id] if task_id in self.task_id_to_start_time else cur_time + elapsed_time = ( + cur_time - self.task_id_to_start_time[task_id] + if task_id in self.task_id_to_start_time + else cur_time + ) self.task_id_to_start_time[task_id] = elapsed_time def _record_histograms(self, task_id, metric_attributes): self.metrics["flower.task.runtime.seconds"].record( - self.task_id_to_start_time.get(task_id), attributes=metric_attributes) + self.task_id_to_start_time.get(task_id), + attributes=metric_attributes, + ) def create_celery_metrics(self, meter) -> None: self.metrics = { diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py index 7a50452b86..8f4a013bb2 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py @@ -17,8 +17,9 @@ class TestMetrics(TestBase): def setUp(self): super().setUp() - self._worker = app.Worker(app=app, pool="solo", concurrency=1, - hostname='celery@akochavi') + self._worker = app.Worker( + app=app, pool="solo", concurrency=1, hostname="celery@akochavi" + ) self._thread = threading.Thread(target=self._worker.start) self._thread.daemon = True self._thread.start() @@ -95,15 +96,13 @@ def test_basic_metric(self): task_runtime = metrics[0] print(task_runtime) - self.assertEqual( - task_runtime.name, "flower.task.runtime.seconds" - ) + self.assertEqual(task_runtime.name, "flower.task.runtime.seconds") self.assert_metric_expected( task_runtime, task_runtime_estimated, { - 'task': 'tests.celery_test_tasks.task_add', - 'worker': 'celery@akochavi', + "task": "tests.celery_test_tasks.task_add", + "worker": "celery@akochavi", }, est_delta=200, ) From f8486ec50e080f0a751be47e1b7b7f772a7ca7ef Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Thu, 16 Feb 2023 15:21:10 +0200 Subject: [PATCH 04/13] wip --- CHANGELOG.md | 2 +- .../tests/test_metrics.py | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 62506ed557..fa2dd27c2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased - Add metric instrumentation for celery - ([#1679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1679/files)) + ([#1679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1679)) ### Added diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py index 8f4a013bb2..72f340b03f 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py @@ -30,7 +30,6 @@ def tearDown(self): self._thread.join() def get_metrics(self): - CeleryInstrumentor().instrument() result = task_add.delay(1, 2) timeout = time.time() + 60 * 1 # 1 minutes from now @@ -38,7 +37,6 @@ def get_metrics(self): if time.time() > timeout: break time.sleep(0.05) - CeleryInstrumentor().uninstrument() resource_metrics = ( self.memory_metrics_reader.get_metrics_data().resource_metrics ) @@ -88,10 +86,12 @@ def assert_metric_expected( ) def test_basic_metric(self): + CeleryInstrumentor().instrument() start_time = default_timer() task_runtime_estimated = (default_timer() - start_time) * 1000 metrics = self.get_metrics() + CeleryInstrumentor().uninstrument() self.assertEqual(len(metrics), 1) task_runtime = metrics[0] @@ -106,3 +106,16 @@ def test_basic_metric(self): }, est_delta=200, ) + + def test_metric_uninstrument(self): + CeleryInstrumentor().instrument() + metrics = self.get_metrics() + self.assertEqual(len(metrics), 1) + CeleryInstrumentor().uninstrument() + + metrics = self.get_metrics() + self.assertEqual(len(metrics), 1) + + for metric in metrics: + for point in list(metric.data.data_points): + self.assertEqual(point.count, 1) From 8cd3daf7cec1cb10978e604d6a812d9d7435df66 Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Mon, 20 Feb 2023 07:50:09 +0200 Subject: [PATCH 05/13] wip --- .../tests/test_metrics.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py index 72f340b03f..6feff37841 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py @@ -1,17 +1,17 @@ -import time import threading +import time from timeit import default_timer -from typing import Union, Optional +from typing import Optional, Union +from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.sdk.metrics._internal.point import Metric from opentelemetry.sdk.metrics.export import ( HistogramDataPoint, NumberDataPoint, ) from opentelemetry.test.test_base import TestBase -from opentelemetry.instrumentation.celery import CeleryInstrumentor -from .celery_test_tasks import task_add, app +from .celery_test_tasks import app, task_add class TestMetrics(TestBase): From d491e944e5a6be0590e805a0cc2e06d63873332f Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Wed, 1 Mar 2023 14:50:47 +0200 Subject: [PATCH 06/13] wip --- .../tests/test_metrics.py | 62 ++++--------------- 1 file changed, 13 insertions(+), 49 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py index 6feff37841..71afa1a4bf 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py @@ -1,14 +1,8 @@ import threading import time from timeit import default_timer -from typing import Optional, Union from opentelemetry.instrumentation.celery import CeleryInstrumentor -from opentelemetry.sdk.metrics._internal.point import Metric -from opentelemetry.sdk.metrics.export import ( - HistogramDataPoint, - NumberDataPoint, -) from opentelemetry.test.test_base import TestBase from .celery_test_tasks import app, task_add @@ -48,43 +42,6 @@ def get_metrics(self): return all_metrics - def assert_metric_expected( - self, - metric: Metric, - expected_value: Union[int, float], - expected_attributes: dict = None, - est_delta: Optional[float] = None, - ): - data_point = next(iter(metric.data.data_points)) - - if isinstance(data_point, HistogramDataPoint): - self.assertEqual( - data_point.count, - 1, - ) - if est_delta is None: - self.assertEqual( - data_point.sum, - expected_value, - ) - else: - self.assertAlmostEqual( - data_point.sum, - expected_value, - delta=est_delta, - ) - elif isinstance(data_point, NumberDataPoint): - self.assertEqual( - data_point.value, - expected_value, - ) - - if expected_attributes: - self.assertDictEqual( - expected_attributes, - dict(data_point.attributes), - ) - def test_basic_metric(self): CeleryInstrumentor().instrument() start_time = default_timer() @@ -99,12 +56,19 @@ def test_basic_metric(self): self.assertEqual(task_runtime.name, "flower.task.runtime.seconds") self.assert_metric_expected( task_runtime, - task_runtime_estimated, - { - "task": "tests.celery_test_tasks.task_add", - "worker": "celery@akochavi", - }, - est_delta=200, + [ + self.create_histogram_data_point( + count=1, + sum_data_point=task_runtime_estimated, + max_data_point=task_runtime_estimated, + min_data_point=task_runtime_estimated, + attributes={ + "task": "tests.celery_test_tasks.task_add", + "worker": "celery@akochavi", + }, + ) + ], + est_value_delta=200, ) def test_metric_uninstrument(self): From fbc6200db003ba935fc606a2c7e6aa82a3a5bb77 Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Thu, 2 Mar 2023 11:07:56 +0200 Subject: [PATCH 07/13] wip --- .../tests/test_metrics.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py index 71afa1a4bf..46e39a6046 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_metrics.py @@ -31,16 +31,7 @@ def get_metrics(self): if time.time() > timeout: break time.sleep(0.05) - resource_metrics = ( - self.memory_metrics_reader.get_metrics_data().resource_metrics - ) - - all_metrics = [] - for metrics in resource_metrics: - for scope_metrics in metrics.scope_metrics: - all_metrics.extend(scope_metrics.metrics) - - return all_metrics + return self.get_sorted_metrics() def test_basic_metric(self): CeleryInstrumentor().instrument() From 1678415621737be32c66bb2df114c8912f981275 Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Thu, 2 Mar 2023 11:38:22 +0200 Subject: [PATCH 08/13] wip --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfcf11b52e..dd222111f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1654](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1654)) - `opentelemetry-instrumentation-system-metrics` Fix initialization of the instrumentation class when configuration is provided ([#1438](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1439)) ->>>>>>> main ## Version 1.16.0/0.37b0 (2023-02-17) From 001cc06f96708d02df6ae1beff8f1f0d1f73748c Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Thu, 4 May 2023 11:45:09 +0300 Subject: [PATCH 09/13] wip --- instrumentation/README.md | 2 +- .../src/opentelemetry/instrumentation/celery/__init__.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/instrumentation/README.md b/instrumentation/README.md index b70f181639..e69dd6adbd 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -10,7 +10,7 @@ | [opentelemetry-instrumentation-boto](./opentelemetry-instrumentation-boto) | boto~=2.0 | No | [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto3 ~= 1.0 | No | [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 | No -| [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | Yes +| [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | No | [opentelemetry-instrumentation-confluent-kafka](./opentelemetry-instrumentation-confluent-kafka) | confluent-kafka >= 1.8.2, < 2.0.0 | No | [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi | No | [opentelemetry-instrumentation-django](./opentelemetry-instrumentation-django) | django >= 1.10 | Yes diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index ddb92e4d95..9fe053692a 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -295,12 +295,12 @@ def _trace_retry(*args, **kwargs): def update_task_start_time(self, task_id): cur_time = default_timer() - elapsed_time = ( + start_time = ( cur_time - self.task_id_to_start_time[task_id] if task_id in self.task_id_to_start_time else cur_time ) - self.task_id_to_start_time[task_id] = elapsed_time + self.task_id_to_start_time[task_id] = start_time def _record_histograms(self, task_id, metric_attributes): self.metrics["flower.task.runtime.seconds"].record( From b727d65f693f20a720710f1a618b1b894f6b3c29 Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Thu, 4 May 2023 13:30:21 +0300 Subject: [PATCH 10/13] wip --- .../src/opentelemetry/instrumentation/celery/package.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py index c90c40293e..ae06d71a2d 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py @@ -13,5 +13,4 @@ # limitations under the License. -_instruments = ("celery >= 4.0, < 6.0",) -_supports_metrics = True +_instruments = ("celery >= 4.0, < 6.0",) \ No newline at end of file From 4fabcbff9735e219551210999fb15a395cec46b1 Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Thu, 4 May 2023 14:23:48 +0300 Subject: [PATCH 11/13] wip --- .../src/opentelemetry/instrumentation/celery/package.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py index ae06d71a2d..ecc4318c67 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py @@ -13,4 +13,4 @@ # limitations under the License. -_instruments = ("celery >= 4.0, < 6.0",) \ No newline at end of file +_instruments = ("celery >= 4.0, < 6.0",) From 3be197ca7a4f11077a1e77d1920818a5d62e0b80 Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Sun, 7 May 2023 15:38:09 +0300 Subject: [PATCH 12/13] wip --- .../src/opentelemetry/instrumentation/celery/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 9fe053692a..8109c167de 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -295,14 +295,17 @@ def _trace_retry(*args, **kwargs): def update_task_start_time(self, task_id): cur_time = default_timer() - start_time = ( + task_start_time = ( cur_time - self.task_id_to_start_time[task_id] if task_id in self.task_id_to_start_time else cur_time ) - self.task_id_to_start_time[task_id] = start_time + self.task_id_to_start_time[task_id] = task_start_time def _record_histograms(self, task_id, metric_attributes): + if task_id is None: + return + self.metrics["flower.task.runtime.seconds"].record( self.task_id_to_start_time.get(task_id), attributes=metric_attributes, From f8dd80baf5cd47f16fcd08776e27a51a8006296a Mon Sep 17 00:00:00 2001 From: Adi Kochavi Date: Tue, 9 May 2023 09:33:41 +0300 Subject: [PATCH 13/13] wip --- .../opentelemetry/instrumentation/celery/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 8109c167de..a216765fb0 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -151,7 +151,7 @@ def _trace_prerun(self, *args, **kwargs): if task is None or task_id is None: return - self.update_task_start_time(task_id) + self.update_task_duration_time(task_id) request = task.request tracectx = extract(request, getter=celery_getter) or None @@ -190,7 +190,7 @@ def _trace_postrun(self, *args, **kwargs): activation.__exit__(None, None, None) utils.detach_span(task, task_id) - self.update_task_start_time(task_id) + self.update_task_duration_time(task_id) labels = {"task": task.name, "worker": task.request.hostname} self._record_histograms(task_id, labels) @@ -293,14 +293,14 @@ def _trace_retry(*args, **kwargs): # something that isn't an `Exception` span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason)) - def update_task_start_time(self, task_id): + def update_task_duration_time(self, task_id): cur_time = default_timer() - task_start_time = ( + task_duration_time_until_now = ( cur_time - self.task_id_to_start_time[task_id] if task_id in self.task_id_to_start_time else cur_time ) - self.task_id_to_start_time[task_id] = task_start_time + self.task_id_to_start_time[task_id] = task_duration_time_until_now def _record_histograms(self, task_id, metric_attributes): if task_id is None: