diff --git a/CHANGELOG.md b/CHANGELOG.md index 42ec9138cd..85876a1801 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-instrumentation-asyncio` Fix duplicate instrumentation. + ([[#3383](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3383)]) + ### Added ### Fixed @@ -39,9 +42,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3113](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3113)) - `opentelemetry-instrumentation-grpc` Fix error when using gprc versions <= 1.50.0 with unix sockets. ([[#3393](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3393)]) +- `opentelemetry-instrumentation-asyncio` Fix duplicate instrumentation. + ([[#3383](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3383)]) - `opentelemetry-instrumentation-aiokafka` Fix send_and_wait method no headers kwargs error. ([[#3332](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3332)]) + ## Version 1.31.0/0.52b0 (2025-03-12) ### Added diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py index 9905d91dbd..f2db958a5a 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py @@ -93,6 +93,9 @@ def func(): from wrapt import wrap_function_wrapper as _wrap +from opentelemetry.instrumentation.asyncio.instrumentation_state import ( + _is_instrumented, +) from opentelemetry.instrumentation.asyncio.package import _instruments from opentelemetry.instrumentation.asyncio.utils import ( get_coros_to_trace, @@ -237,7 +240,12 @@ def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None: ) def trace_to_thread(self, func: callable): - """Trace a function.""" + """ + Trace a function, but if already instrumented, skip double-wrapping. + """ + if _is_instrumented(func): + return func + start = default_timer() func_name = getattr(func, "__name__", None) if func_name is None and isinstance(func, functools.partial): @@ -270,6 +278,13 @@ def trace_item(self, coro_or_future): return coro_or_future async def trace_coroutine(self, coro): + """ + Wrap a coroutine so that we measure its duration, metrics, etc. + If already instrumented, simply 'await coro' to preserve call behavior. + """ + if _is_instrumented(coro): + return await coro + if not hasattr(coro, "__name__"): return await coro start = default_timer() @@ -303,6 +318,12 @@ async def trace_coroutine(self, coro): self.record_process(start, attr, span, exception) def trace_future(self, future): + """ + Wrap a Future's done callback. If already instrumented, skip re-wrapping. + """ + if _is_instrumented(future): + return future + start = default_timer() span = ( self._tracer.start_span(f"{ASYNCIO_PREFIX} future") diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/instrumentation_state.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/instrumentation_state.py new file mode 100644 index 0000000000..370c791a08 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/instrumentation_state.py @@ -0,0 +1,66 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Instrumentation State Tracker + +This module provides helper functions to safely track whether a coroutine, +Future, or function has already been instrumented by the OpenTelemetry +asyncio instrumentation layer. + +Some Python objects (like coroutines or functions) may not support setting +custom attributes or weak references. To avoid memory leaks and runtime +errors, this module uses a WeakKeyDictionary to safely track instrumented +objects. + +If an object cannot be weak-referenced, it is silently skipped. + +Usage: + if not _is_instrumented(obj): + _mark_instrumented(obj) + # instrument the object... +""" + +import weakref +from typing import Any + +# A global WeakSet to track instrumented objects. +# Entries are automatically removed when the objects are garbage collected. +_instrumented_tasks = weakref.WeakSet() + + +def _is_instrumented(obj: Any) -> bool: + """ + Check whether the object has already been instrumented. + If not, mark it as instrumented (only if weakref is supported). + + Args: + obj: A coroutine, function, or Future. + + Returns: + True if the object was already instrumented. + False if the object is not trackable (no weakref support), or just marked now. + + Note: + In Python 3.12+, some internal types like `async_generator_asend` + raise TypeError when weakref is attempted. + """ + try: + if obj in _instrumented_tasks: + return True + _instrumented_tasks.add(obj) + return False + except TypeError: + # Object doesn't support weak references → can't track instrumentation + return False diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_duplicate_instrument.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_duplicate_instrument.py new file mode 100644 index 0000000000..ac6706407d --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_duplicate_instrument.py @@ -0,0 +1,70 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +A general test verifying that when the same Future objects (or coroutines) are +repeatedly instrumented (for example, via `trace_future`), callback references +do not leak. In this example, we mimic a typical scenario where a small set of +Futures might be reused throughout an application's lifecycle. +""" + +import asyncio + +from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor +from opentelemetry.test.test_base import TestBase + + +class TestAsyncioDuplicateInstrument(TestBase): + """ + Tests whether repeated instrumentation of the same Futures leads to + exponential callback growth (potential memory leak). + """ + + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + self.instrumentor = AsyncioInstrumentor() + self.instrumentor.instrument() + + def tearDown(self): + self.instrumentor.uninstrument() + self.loop.close() + asyncio.set_event_loop(None) + super().tearDown() + + def test_duplicate_instrumentation_of_futures(self): + """ + If instrumentor.trace_future is called multiple times on the same Future, + we should NOT see an unbounded accumulation of callbacks. + """ + fut1 = asyncio.Future() + fut2 = asyncio.Future() + + num_iterations = 10 + for _ in range(num_iterations): + self.instrumentor.trace_future(fut1) + self.instrumentor.trace_future(fut2) + + self.assertLessEqual( + len(fut1._callbacks), + 1, + f"fut1 has {len(fut1._callbacks)} callbacks. Potential leak!", + ) + self.assertLessEqual( + len(fut2._callbacks), + 1, + f"fut2 has {len(fut2._callbacks)} callbacks. Potential leak!", + )