Skip to content

asyncio: fix duplicate instrumentation #3408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3113](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3113))
- `opentelemetry-instrumentation-grpc` Fix error when using gprc versions <= 1.50.0 with unix sockets.
([[#3393](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3393)])
- `opentelemetry-instrumentation-asyncio` Fix duplicate instrumentation.
([[#3383](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3383)])

## Version 1.31.0/0.52b0 (2025-03-12)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ def func():

from wrapt import wrap_function_wrapper as _wrap

from opentelemetry.instrumentation.asyncio.instrumentation_state import (
_is_instrumented,
_mark_instrumented,
)
from opentelemetry.instrumentation.asyncio.package import _instruments
from opentelemetry.instrumentation.asyncio.utils import (
get_coros_to_trace,
Expand Down Expand Up @@ -237,7 +241,14 @@ def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None:
)

def trace_to_thread(self, func: callable):
"""Trace a function."""
"""
Trace a function, but if already instrumented, skip double-wrapping.
"""
if _is_instrumented(func):
return func

_mark_instrumented(func)

start = default_timer()
func_name = getattr(func, "__name__", None)
if func_name is None and isinstance(func, functools.partial):
Expand Down Expand Up @@ -270,6 +281,15 @@ def trace_item(self, coro_or_future):
return coro_or_future

async def trace_coroutine(self, coro):
"""
Wrap a coroutine so that we measure its duration, metrics, etc.
If already instrumented, simply 'await coro' to preserve call behavior.
"""
if _is_instrumented(coro):
return await coro

_mark_instrumented(coro)

if not hasattr(coro, "__name__"):
return await coro
start = default_timer()
Expand Down Expand Up @@ -303,6 +323,14 @@ async def trace_coroutine(self, coro):
self.record_process(start, attr, span, exception)

def trace_future(self, future):
"""
Wrap a Future's done callback. If already instrumented, skip re-wrapping.
"""
if _is_instrumented(future):
return future

_mark_instrumented(future)

start = default_timer()
span = (
self._tracer.start_span(f"{ASYNCIO_PREFIX} future")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Instrumentation State Tracker

This module provides helper functions to safely track whether a coroutine,
Future, or function has already been instrumented by the OpenTelemetry
asyncio instrumentation layer.

Because some Python objects (like coroutines or functions) may not support
adding custom attributes or may not be weak-referenceable, we use a
weak-reference-based dictionary to track instrumented objects safely and
efficiently without causing memory leaks.

If an object cannot be weak-referenced, we skip tracking it to avoid
runtime errors.

Usage:
if not _is_instrumented(obj):
_mark_instrumented(obj)
# instrument the object...
"""

import weakref

# A global dictionary to track whether an object has been instrumented.
# Keys are weak references to avoid preventing garbage collection.
_instrumented_tasks = {}


def _get_weak_key(obj):
"""
Attempt to create a weak reference key for the given object.

Some object types (e.g., built-in functions or async_generator_asend)
do not support weak references. In those cases, return None.

Args:
obj: The object to generate a weak reference for.

Returns:
A weakref.ref to the object if supported, otherwise None.
"""
try:
return weakref.ref(obj)
except TypeError:
return None


def _is_instrumented(obj) -> bool:
"""
Check if the object has already been instrumented.

Args:
obj: The coroutine, function, or Future to check.

Returns:
True if the object is already marked as instrumented, False otherwise.
"""
key = _get_weak_key(obj)
return key in _instrumented_tasks if key else False


def _mark_instrumented(obj):
"""
Mark the object as instrumented to avoid double-instrumentation.

Only objects that support weak references are tracked. Unsupported
objects are silently skipped.

Args:
obj: The coroutine, function, or Future to mark.
"""
key = _get_weak_key(obj)
if key:
_instrumented_tasks[key] = True
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
A general test verifying that when the same Future objects (or coroutines) are
repeatedly instrumented (for example, via `trace_future`), callback references
do not leak. In this example, we mimic a typical scenario where a small set of
Futures might be reused throughout an application's lifecycle.
"""

import asyncio

from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
from opentelemetry.test.test_base import TestBase


class MockSubscription:
"""
Example class holding an unsubscribe_future, similar to something like
aiokafka's subscription.
"""

def __init__(self):
self.unsubscribe_future = asyncio.Future()


class MockGroupCoordinator:
"""
Example class modeling repeated instrumentation of the same Future objects.
"""

def __init__(self):
self._closing = asyncio.Future()
self.subscription = MockSubscription()
self._rejoin_needed_fut = asyncio.Future()

async def run_routine(self, instrumentor):
"""
Each time this routine is called, the same 3 Futures are 'traced' again.
In a real-life scenario, there's often a loop reusing these objects.
"""
instrumentor.trace_future(self._closing)
instrumentor.trace_future(self.subscription.unsubscribe_future)
instrumentor.trace_future(self._rejoin_needed_fut)


class TestAsyncioDuplicateInstrument(TestBase):
"""
Tests whether repeated instrumentation of the same Futures leads to
exponential callback growth (potential memory leak).
"""

def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

self.instrumentor = AsyncioInstrumentor()
self.instrumentor.instrument()

def tearDown(self):
self.instrumentor.uninstrument()
self.loop.close()
asyncio.set_event_loop(None)
super().tearDown()

def test_duplicate_instrumentation_of_futures(self):
"""
If instrumentor.trace_future is called multiple times on the same Future,
we should NOT see an unbounded accumulation of callbacks.
"""
coordinator = MockGroupCoordinator()

# Simulate calling the routine multiple times
num_iterations = 10
for _ in range(num_iterations):
self.loop.run_until_complete(
coordinator.run_routine(self.instrumentor)
)

# Check for callback accumulation
closing_cb_count = len(coordinator._closing._callbacks)
unsub_cb_count = len(
coordinator.subscription.unsubscribe_future._callbacks
)
rejoin_cb_count = len(coordinator._rejoin_needed_fut._callbacks)

# If instrumentation is properly deduplicated, each Future might have ~1-2 callbacks.
max_expected_callbacks = 2
self.assertLessEqual(
closing_cb_count,
max_expected_callbacks,
f"_closing Future has {closing_cb_count} callbacks. Potential leak!",
)
self.assertLessEqual(
unsub_cb_count,
max_expected_callbacks,
f"unsubscribe_future has {unsub_cb_count} callbacks. Potential leak!",
)
self.assertLessEqual(
rejoin_cb_count,
max_expected_callbacks,
f"_rejoin_needed_fut has {rejoin_cb_count} callbacks. Potential leak!",
)