Skip to content

Commit ea0df4e

Browse files
committed
Merge remote-tracking branch 'origin/main' into zhaez/gen-ai-support
2 parents d62e5da + ce90639 commit ea0df4e

File tree

7 files changed

+228
-8
lines changed

7 files changed

+228
-8
lines changed

CHANGELOG.md

+14
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
1212
## Unreleased
1313

14+
- `opentelemetry-instrumentation-asyncio` Fix duplicate instrumentation.
15+
([[#3383](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3383)])
16+
17+
### Added
18+
19+
### Fixed
20+
21+
- `opentelemetry-instrumentation` Catch `ModuleNotFoundError` when the library is not installed
22+
and log as debug instead of exception
23+
([#3423](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3423))
24+
1425
- `opentelemetry-instrumentation-botocore` Add GenAI instrumentation for additional Bedrock models for InvokeModel API
1526
([#3419](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3419))
1627

@@ -34,9 +45,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3445
([#3113](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3113))
3546
- `opentelemetry-instrumentation-grpc` Fix error when using gprc versions <= 1.50.0 with unix sockets.
3647
([[#3393](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3393)])
48+
- `opentelemetry-instrumentation-asyncio` Fix duplicate instrumentation.
49+
([[#3383](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3383)])
3750
- `opentelemetry-instrumentation-aiokafka` Fix send_and_wait method no headers kwargs error.
3851
([[#3332](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3332)])
3952

53+
4054
## Version 1.31.0/0.52b0 (2025-03-12)
4155

4256
### Added

instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py

+22-1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ def func():
9393

9494
from wrapt import wrap_function_wrapper as _wrap
9595

96+
from opentelemetry.instrumentation.asyncio.instrumentation_state import (
97+
_is_instrumented,
98+
)
9699
from opentelemetry.instrumentation.asyncio.package import _instruments
97100
from opentelemetry.instrumentation.asyncio.utils import (
98101
get_coros_to_trace,
@@ -237,7 +240,12 @@ def wrap_taskgroup_create_task(method, instance, args, kwargs) -> None:
237240
)
238241

239242
def trace_to_thread(self, func: callable):
240-
"""Trace a function."""
243+
"""
244+
Trace a function, but if already instrumented, skip double-wrapping.
245+
"""
246+
if _is_instrumented(func):
247+
return func
248+
241249
start = default_timer()
242250
func_name = getattr(func, "__name__", None)
243251
if func_name is None and isinstance(func, functools.partial):
@@ -270,6 +278,13 @@ def trace_item(self, coro_or_future):
270278
return coro_or_future
271279

272280
async def trace_coroutine(self, coro):
281+
"""
282+
Wrap a coroutine so that we measure its duration, metrics, etc.
283+
If already instrumented, simply 'await coro' to preserve call behavior.
284+
"""
285+
if _is_instrumented(coro):
286+
return await coro
287+
273288
if not hasattr(coro, "__name__"):
274289
return await coro
275290
start = default_timer()
@@ -303,6 +318,12 @@ async def trace_coroutine(self, coro):
303318
self.record_process(start, attr, span, exception)
304319

305320
def trace_future(self, future):
321+
"""
322+
Wrap a Future's done callback. If already instrumented, skip re-wrapping.
323+
"""
324+
if _is_instrumented(future):
325+
return future
326+
306327
start = default_timer()
307328
span = (
308329
self._tracer.start_span(f"{ASYNCIO_PREFIX} future")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Instrumentation State Tracker
17+
18+
This module provides helper functions to safely track whether a coroutine,
19+
Future, or function has already been instrumented by the OpenTelemetry
20+
asyncio instrumentation layer.
21+
22+
Some Python objects (like coroutines or functions) may not support setting
23+
custom attributes or weak references. To avoid memory leaks and runtime
24+
errors, this module uses a WeakKeyDictionary to safely track instrumented
25+
objects.
26+
27+
If an object cannot be weak-referenced, it is silently skipped.
28+
29+
Usage:
30+
if not _is_instrumented(obj):
31+
_mark_instrumented(obj)
32+
# instrument the object...
33+
"""
34+
35+
import weakref
36+
from typing import Any
37+
38+
# A global WeakSet to track instrumented objects.
39+
# Entries are automatically removed when the objects are garbage collected.
40+
_instrumented_tasks = weakref.WeakSet()
41+
42+
43+
def _is_instrumented(obj: Any) -> bool:
44+
"""
45+
Check whether the object has already been instrumented.
46+
If not, mark it as instrumented (only if weakref is supported).
47+
48+
Args:
49+
obj: A coroutine, function, or Future.
50+
51+
Returns:
52+
True if the object was already instrumented.
53+
False if the object is not trackable (no weakref support), or just marked now.
54+
55+
Note:
56+
In Python 3.12+, some internal types like `async_generator_asend`
57+
raise TypeError when weakref is attempted.
58+
"""
59+
try:
60+
if obj in _instrumented_tasks:
61+
return True
62+
_instrumented_tasks.add(obj)
63+
return False
64+
except TypeError:
65+
# Object doesn't support weak references → can't track instrumentation
66+
return False
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
A general test verifying that when the same Future objects (or coroutines) are
17+
repeatedly instrumented (for example, via `trace_future`), callback references
18+
do not leak. In this example, we mimic a typical scenario where a small set of
19+
Futures might be reused throughout an application's lifecycle.
20+
"""
21+
22+
import asyncio
23+
24+
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
25+
from opentelemetry.test.test_base import TestBase
26+
27+
28+
class TestAsyncioDuplicateInstrument(TestBase):
29+
"""
30+
Tests whether repeated instrumentation of the same Futures leads to
31+
exponential callback growth (potential memory leak).
32+
"""
33+
34+
def setUp(self):
35+
super().setUp()
36+
self.loop = asyncio.new_event_loop()
37+
asyncio.set_event_loop(self.loop)
38+
39+
self.instrumentor = AsyncioInstrumentor()
40+
self.instrumentor.instrument()
41+
42+
def tearDown(self):
43+
self.instrumentor.uninstrument()
44+
self.loop.close()
45+
asyncio.set_event_loop(None)
46+
super().tearDown()
47+
48+
def test_duplicate_instrumentation_of_futures(self):
49+
"""
50+
If instrumentor.trace_future is called multiple times on the same Future,
51+
we should NOT see an unbounded accumulation of callbacks.
52+
"""
53+
fut1 = asyncio.Future()
54+
fut2 = asyncio.Future()
55+
56+
num_iterations = 10
57+
for _ in range(num_iterations):
58+
self.instrumentor.trace_future(fut1)
59+
self.instrumentor.trace_future(fut2)
60+
61+
self.assertLessEqual(
62+
len(fut1._callbacks),
63+
1,
64+
f"fut1 has {len(fut1._callbacks)} callbacks. Potential leak!",
65+
)
66+
self.assertLessEqual(
67+
len(fut2._callbacks),
68+
1,
69+
f"fut2 has {len(fut2._callbacks)} callbacks. Potential leak!",
70+
)

instrumentation/opentelemetry-instrumentation-tornado/test-requirements.txt

-6
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
asgiref==3.8.1
2-
blinker==1.7.0
32
certifi==2024.7.4
43
charset-normalizer==3.3.2
54
click==8.1.7
65
Deprecated==1.2.14
7-
Flask==3.0.2
86
http_server_mock==1.7
97
idna==3.7
108
iniconfig==2.0.0
11-
itsdangerous==2.1.2
12-
Jinja2==3.1.6
13-
MarkupSafe==2.1.5
149
packaging==24.0
1510
pluggy==1.5.0
1611
py-cpuinfo==9.0.0
@@ -20,7 +15,6 @@ tomli==2.0.1
2015
tornado==6.4.2
2116
typing_extensions==4.12.2
2217
urllib3==2.2.2
23-
Werkzeug==3.0.6
2418
wrapt==1.16.0
2519
zipp==3.19.2
2620
-e opentelemetry-instrumentation

opentelemetry-instrumentation/src/opentelemetry/instrumentation/auto_instrumentation/_load.py

+8
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ def _load_instrumentors(distro):
8282
exc.conflict,
8383
)
8484
continue
85+
except ModuleNotFoundError as exc:
86+
# ModuleNotFoundError is raised when the library is not installed
87+
# and the instrumentation is not required to be loaded.
88+
# See https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3421
89+
_logger.debug(
90+
"Skipping instrumentation %s: %s", entry_point.name, exc.msg
91+
)
92+
continue
8593
except ImportError:
8694
# in scenarios using the kubernetes operator to do autoinstrumentation some
8795
# instrumentors (usually requiring binary extensions) may fail to load

opentelemetry-instrumentation/tests/auto_instrumentation/test_load.py

+48-1
Original file line numberDiff line numberDiff line change
@@ -326,11 +326,12 @@ def test_load_instrumentors_dep_conflict(self, iter_mock, mock_logger): # pylin
326326
]
327327
)
328328

329+
@patch("opentelemetry.instrumentation.auto_instrumentation._load._logger")
329330
@patch(
330331
"opentelemetry.instrumentation.auto_instrumentation._load.entry_points"
331332
)
332333
def test_load_instrumentors_import_error_does_not_stop_everything(
333-
self, iter_mock
334+
self, iter_mock, mock_logger
334335
):
335336
ep_mock1 = Mock(name="instr1")
336337
ep_mock2 = Mock(name="instr2")
@@ -354,6 +355,12 @@ def test_load_instrumentors_import_error_does_not_stop_everything(
354355
]
355356
)
356357
self.assertEqual(distro_mock.load_instrumentor.call_count, 2)
358+
mock_logger.exception.assert_any_call(
359+
"Importing of %s failed, skipping it",
360+
ep_mock1.name,
361+
)
362+
363+
mock_logger.debug.assert_any_call("Instrumented %s", ep_mock2.name)
357364

358365
@patch(
359366
"opentelemetry.instrumentation.auto_instrumentation._load.entry_points"
@@ -382,6 +389,46 @@ def test_load_instrumentors_raises_exception(self, iter_mock):
382389
)
383390
self.assertEqual(distro_mock.load_instrumentor.call_count, 1)
384391

392+
@patch("opentelemetry.instrumentation.auto_instrumentation._load._logger")
393+
@patch(
394+
"opentelemetry.instrumentation.auto_instrumentation._load.entry_points"
395+
)
396+
def test_load_instrumentors_module_not_found_error(
397+
self, iter_mock, mock_logger
398+
):
399+
ep_mock1 = Mock()
400+
ep_mock1.name = "instr1"
401+
402+
ep_mock2 = Mock()
403+
ep_mock2.name = "instr2"
404+
405+
distro_mock = Mock()
406+
407+
distro_mock.load_instrumentor.side_effect = [
408+
ModuleNotFoundError("No module named 'fake_module'"),
409+
None,
410+
]
411+
412+
iter_mock.side_effect = [(), (ep_mock1, ep_mock2), ()]
413+
414+
_load._load_instrumentors(distro_mock)
415+
416+
distro_mock.load_instrumentor.assert_has_calls(
417+
[
418+
call(ep_mock1, raise_exception_on_conflict=True),
419+
call(ep_mock2, raise_exception_on_conflict=True),
420+
]
421+
)
422+
self.assertEqual(distro_mock.load_instrumentor.call_count, 2)
423+
424+
mock_logger.debug.assert_any_call(
425+
"Skipping instrumentation %s: %s",
426+
"instr1",
427+
"No module named 'fake_module'",
428+
)
429+
430+
mock_logger.debug.assert_any_call("Instrumented %s", ep_mock2.name)
431+
385432
def test_load_instrumentors_no_entry_point_mocks(self):
386433
distro_mock = Mock()
387434
_load._load_instrumentors(distro_mock)

0 commit comments

Comments
 (0)