Skip to content

Commit 53f05e0

Browse files
committed
open-telemetry#3383 fix duplicate instrument
1 parent dde065b commit 53f05e0

File tree

2 files changed

+131
-2
lines changed

2 files changed

+131
-2
lines changed

instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py

+26-2
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def func():
107107
from opentelemetry.trace.status import Status, StatusCode
108108

109109
ASYNCIO_PREFIX = "asyncio"
110-
110+
_IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE = "_otel_asyncio_instrumented"
111111

112112
class AsyncioInstrumentor(BaseInstrumentor):
113113
"""
@@ -237,7 +237,14 @@ def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None:
237237
)
238238

239239
def trace_to_thread(self, func: callable):
240-
"""Trace a function."""
240+
"""
241+
Trace a function, but if already instrumented, skip double-wrapping.
242+
"""
243+
if getattr(func, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, False):
244+
return func
245+
246+
setattr(func, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, True)
247+
241248
start = default_timer()
242249
func_name = getattr(func, "__name__", None)
243250
if func_name is None and isinstance(func, functools.partial):
@@ -270,6 +277,15 @@ def trace_item(self, coro_or_future):
270277
return coro_or_future
271278

272279
async def trace_coroutine(self, coro):
280+
"""
281+
Wrap a coroutine so that we measure its duration, metrics, etc.
282+
If already instrumented, simply 'await coro' to preserve call behavior.
283+
"""
284+
if getattr(coro, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, False):
285+
return await coro
286+
287+
setattr(coro, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, True)
288+
273289
if not hasattr(coro, "__name__"):
274290
return await coro
275291
start = default_timer()
@@ -303,6 +319,14 @@ async def trace_coroutine(self, coro):
303319
self.record_process(start, attr, span, exception)
304320

305321
def trace_future(self, future):
322+
"""
323+
Wrap a Future's done callback. If already instrumented, skip re-wrapping.
324+
"""
325+
if getattr(future, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, False):
326+
return future
327+
328+
setattr(future, _IS_ASYNCIO_INSTRUMENTED_ATTRIBUTE, True)
329+
306330
start = default_timer()
307331
span = (
308332
self._tracer.start_span(f"{ASYNCIO_PREFIX} future")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
A general test verifying that when the same Future objects (or coroutines) are
17+
repeatedly instrumented (for example, via `trace_future`), callback references
18+
do not leak. In this example, we mimic a typical scenario where a small set of
19+
Futures might be reused throughout an application's lifecycle.
20+
"""
21+
22+
import asyncio
23+
24+
from opentelemetry.test.test_base import TestBase
25+
26+
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
27+
28+
29+
class MockSubscription:
30+
"""
31+
Example class holding an unsubscribe_future, similar to something like
32+
aiokafka's subscription.
33+
"""
34+
def __init__(self):
35+
self.unsubscribe_future = asyncio.Future()
36+
37+
38+
class MockGroupCoordinator:
39+
"""
40+
Example class modeling repeated instrumentation of the same Future objects.
41+
"""
42+
def __init__(self):
43+
self._closing = asyncio.Future()
44+
self.subscription = MockSubscription()
45+
self._rejoin_needed_fut = asyncio.Future()
46+
47+
async def run_routine(self, instrumentor):
48+
"""
49+
Each time this routine is called, the same 3 Futures are 'traced' again.
50+
In a real-life scenario, there's often a loop reusing these objects.
51+
"""
52+
instrumentor.trace_future(self._closing)
53+
instrumentor.trace_future(self.subscription.unsubscribe_future)
54+
instrumentor.trace_future(self._rejoin_needed_fut)
55+
56+
57+
class TestAsyncioDuplicateInstrument(TestBase):
58+
"""
59+
Tests whether repeated instrumentation of the same Futures leads to
60+
exponential callback growth (potential memory leak).
61+
"""
62+
63+
def setUp(self):
64+
super().setUp()
65+
self.instrumentor = AsyncioInstrumentor()
66+
self.instrumentor.instrument()
67+
68+
def tearDown(self):
69+
self.instrumentor.uninstrument()
70+
super().tearDown()
71+
72+
def test_duplicate_instrumentation_of_futures(self):
73+
"""
74+
If instrumentor.trace_future is called multiple times on the same Future,
75+
we should NOT see an unbounded accumulation of callbacks.
76+
"""
77+
coordinator = MockGroupCoordinator()
78+
79+
# Simulate calling the routine multiple times
80+
num_iterations = 10
81+
for _ in range(num_iterations):
82+
asyncio.run(coordinator.run_routine(self.instrumentor))
83+
84+
# Check for callback accumulation
85+
closing_cb_count = len(coordinator._closing._callbacks)
86+
unsub_cb_count = len(coordinator.subscription.unsubscribe_future._callbacks)
87+
rejoin_cb_count = len(coordinator._rejoin_needed_fut._callbacks)
88+
89+
# If instrumentation is properly deduplicated, each Future might have ~1-2 callbacks.
90+
max_expected_callbacks = 2
91+
self.assertLessEqual(
92+
closing_cb_count,
93+
max_expected_callbacks,
94+
f"_closing Future has {closing_cb_count} callbacks. Potential leak!",
95+
)
96+
self.assertLessEqual(
97+
unsub_cb_count,
98+
max_expected_callbacks,
99+
f"unsubscribe_future has {unsub_cb_count} callbacks. Potential leak!",
100+
)
101+
self.assertLessEqual(
102+
rejoin_cb_count,
103+
max_expected_callbacks,
104+
f"_rejoin_needed_fut has {rejoin_cb_count} callbacks. Potential leak!",
105+
)

0 commit comments

Comments
 (0)