Skip to content

Commit dfd7713

Browse files
committed
feat(asyncio): add weakref-based tracking for instrumented objects
1 parent da3d617 commit dfd7713

File tree

3 files changed

+99
-11
lines changed

3 files changed

+99
-11
lines changed

Diff for: instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ def func():
9393

9494
from wrapt import wrap_function_wrapper as _wrap
9595

96+
from opentelemetry.instrumentation.asyncio.instrumentation_state import (
97+
_is_instrumented,
98+
_mark_instrumented,
99+
)
96100
from opentelemetry.instrumentation.asyncio.package import _instruments
97101
from opentelemetry.instrumentation.asyncio.utils import (
98102
get_coros_to_trace,
@@ -107,7 +111,7 @@ def func():
107111
from opentelemetry.trace.status import Status, StatusCode
108112

109113
ASYNCIO_PREFIX = "asyncio"
110-
_IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE = "_otel_asyncio_instrumented"
114+
111115

112116
class AsyncioInstrumentor(BaseInstrumentor):
113117
"""
@@ -240,10 +244,10 @@ def trace_to_thread(self, func: callable):
240244
"""
241245
Trace a function, but if already instrumented, skip double-wrapping.
242246
"""
243-
if getattr(func, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, False):
247+
if _is_instrumented(func):
244248
return func
245249

246-
setattr(func, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, True)
250+
_mark_instrumented(func)
247251

248252
start = default_timer()
249253
func_name = getattr(func, "__name__", None)
@@ -281,10 +285,10 @@ async def trace_coroutine(self, coro):
281285
Wrap a coroutine so that we measure its duration, metrics, etc.
282286
If already instrumented, simply 'await coro' to preserve call behavior.
283287
"""
284-
if getattr(coro, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, False):
288+
if _is_instrumented(coro):
285289
return await coro
286290

287-
setattr(coro, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, True)
291+
_mark_instrumented(coro)
288292

289293
if not hasattr(coro, "__name__"):
290294
return await coro
@@ -322,10 +326,10 @@ def trace_future(self, future):
322326
"""
323327
Wrap a Future's done callback. If already instrumented, skip re-wrapping.
324328
"""
325-
if getattr(future, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, False):
329+
if _is_instrumented(future):
326330
return future
327331

328-
setattr(future, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, True)
332+
_mark_instrumented(future)
329333

330334
start = default_timer()
331335
span = (
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""
2+
Instrumentation State Tracker
3+
4+
This module provides helper functions to safely track whether a coroutine,
5+
Future, or function has already been instrumented by the OpenTelemetry
6+
asyncio instrumentation layer.
7+
8+
Because some Python objects (like coroutines or functions) may not support
9+
adding custom attributes or may not be weak-referenceable, we use a
10+
weak-reference-based dictionary to track instrumented objects safely and
11+
efficiently without causing memory leaks.
12+
13+
If an object cannot be weak-referenced, we skip tracking it to avoid
14+
runtime errors.
15+
16+
Usage:
17+
if not _is_instrumented(obj):
18+
_mark_instrumented(obj)
19+
# instrument the object...
20+
"""
21+
22+
import weakref
23+
24+
# A global dictionary to track whether an object has been instrumented.
25+
# Keys are weak references to avoid preventing garbage collection.
26+
_instrumented_tasks = {}
27+
28+
29+
def _get_weak_key(obj):
30+
"""
31+
Attempt to create a weak reference key for the given object.
32+
33+
Some object types (e.g., built-in functions or async_generator_asend)
34+
do not support weak references. In those cases, return None.
35+
36+
Args:
37+
obj: The object to generate a weak reference for.
38+
39+
Returns:
40+
A weakref.ref to the object if supported, otherwise None.
41+
"""
42+
try:
43+
return weakref.ref(obj)
44+
except TypeError:
45+
return None
46+
47+
48+
def _is_instrumented(obj) -> bool:
49+
"""
50+
Check if the object has already been instrumented.
51+
52+
Args:
53+
obj: The coroutine, function, or Future to check.
54+
55+
Returns:
56+
True if the object is already marked as instrumented, False otherwise.
57+
"""
58+
key = _get_weak_key(obj)
59+
return key in _instrumented_tasks if key else False
60+
61+
62+
def _mark_instrumented(obj):
63+
"""
64+
Mark the object as instrumented to avoid double-instrumentation.
65+
66+
Only objects that support weak references are tracked. Unsupported
67+
objects are silently skipped.
68+
69+
Args:
70+
obj: The coroutine, function, or Future to mark.
71+
"""
72+
key = _get_weak_key(obj)
73+
if key:
74+
_instrumented_tasks[key] = True

Diff for: instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_duplicate_instrument.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@
2121

2222
import asyncio
2323

24-
from opentelemetry.test.test_base import TestBase
25-
2624
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
25+
from opentelemetry.test.test_base import TestBase
2726

2827

2928
class MockSubscription:
3029
"""
3130
Example class holding an unsubscribe_future, similar to something like
3231
aiokafka's subscription.
3332
"""
33+
3434
def __init__(self):
3535
self.unsubscribe_future = asyncio.Future()
3636

@@ -39,6 +39,7 @@ class MockGroupCoordinator:
3939
"""
4040
Example class modeling repeated instrumentation of the same Future objects.
4141
"""
42+
4243
def __init__(self):
4344
self._closing = asyncio.Future()
4445
self.subscription = MockSubscription()
@@ -62,11 +63,16 @@ class TestAsyncioDuplicateInstrument(TestBase):
6263

6364
def setUp(self):
6465
super().setUp()
66+
self.loop = asyncio.new_event_loop()
67+
asyncio.set_event_loop(self.loop)
68+
6569
self.instrumentor = AsyncioInstrumentor()
6670
self.instrumentor.instrument()
6771

6872
def tearDown(self):
6973
self.instrumentor.uninstrument()
74+
self.loop.close()
75+
asyncio.set_event_loop(None)
7076
super().tearDown()
7177

7278
def test_duplicate_instrumentation_of_futures(self):
@@ -79,11 +85,15 @@ def test_duplicate_instrumentation_of_futures(self):
7985
# Simulate calling the routine multiple times
8086
num_iterations = 10
8187
for _ in range(num_iterations):
82-
asyncio.run(coordinator.run_routine(self.instrumentor))
88+
self.loop.run_until_complete(
89+
coordinator.run_routine(self.instrumentor)
90+
)
8391

8492
# Check for callback accumulation
8593
closing_cb_count = len(coordinator._closing._callbacks)
86-
unsub_cb_count = len(coordinator.subscription.unsubscribe_future._callbacks)
94+
unsub_cb_count = len(
95+
coordinator.subscription.unsubscribe_future._callbacks
96+
)
8797
rejoin_cb_count = len(coordinator._rejoin_needed_fut._callbacks)
8898

8999
# If instrumentation is properly deduplicated, each Future might have ~1-2 callbacks.

0 commit comments

Comments
 (0)