diff --git a/CHANGELOG.md b/CHANGELOG.md index dbff5818b0..418490c84a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/259)) - `opentelemetry-exporter-datadog` Fix unintentional type change of span trace flags ([#261](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/261)) +- `opentelemetry-instrumentation-aiopg` Fix AttributeError `__aexit__` when `aiopg.connect` and `aio[g].create_pool` used with async context manager + ([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235)) ## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26 diff --git a/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/wrappers.py b/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/wrappers.py index cb976e85c7..63a5d17514 100644 --- a/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/wrappers.py +++ b/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/wrappers.py @@ -34,6 +34,7 @@ import aiopg import wrapt +from aiopg.utils import _ContextManager, _PoolContextManager from opentelemetry.instrumentation.aiopg.aiopg_integration import ( AiopgIntegration, @@ -99,7 +100,7 @@ def wrap_connect( """ # pylint: disable=unused-argument - async def wrap_connect_( + def wrap_connect_( wrapped: typing.Callable[..., typing.Any], instance: typing.Any, args: typing.Tuple[typing.Any, typing.Any], @@ -113,7 +114,9 @@ async def wrap_connect_( version=version, tracer_provider=tracer_provider, ) - return await db_integration.wrapped_connection(wrapped, args, kwargs) + return _ContextManager( + db_integration.wrapped_connection(wrapped, args, kwargs) + ) try: wrapt.wrap_function_wrapper(aiopg, "connect", wrap_connect_) @@ -191,7 +194,7 @@ def wrap_create_pool( tracer_provider: typing.Optional[TracerProvider] = None, ): # pylint: disable=unused-argument - async def wrap_create_pool_( + def wrap_create_pool_( wrapped: typing.Callable[..., typing.Any], instance: typing.Any, args: typing.Tuple[typing.Any, typing.Any], @@ -205,7 +208,9 @@ async def wrap_create_pool_( version=version, tracer_provider=tracer_provider, ) - return await db_integration.wrapped_pool(wrapped, args, kwargs) + return _PoolContextManager( + db_integration.wrapped_pool(wrapped, args, kwargs) + ) try: wrapt.wrap_function_wrapper(aiopg, "create_pool", wrap_create_pool_) diff --git a/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py b/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py index c6f771e1b2..ad935cfdfd 100644 --- a/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py @@ -79,6 +79,26 @@ def test_instrumentor_connect(self): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) + def test_instrumentor_connect_ctx_manager(self): + async def _ctx_manager_connect(): + AiopgInstrumentor().instrument() + + async with aiopg.connect(database="test") as cnx: + async with cnx.cursor() as cursor: + query = "SELECT * FROM test" + await cursor.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.aiopg + ) + + async_call(_ctx_manager_connect()) + def test_instrumentor_create_pool(self): AiopgInstrumentor().instrument() @@ -110,6 +130,27 @@ def test_instrumentor_create_pool(self): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) + def test_instrumentor_create_pool_ctx_manager(self): + async def _ctx_manager_pool(): + AiopgInstrumentor().instrument() + + async with aiopg.create_pool(database="test") as pool: + async with pool.acquire() as cnx: + async with cnx.cursor() as cursor: + query = "SELECT * FROM test" + await cursor.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.aiopg + ) + + async_call(_ctx_manager_pool()) + def test_custom_tracer_provider_connect(self): resource = resources.Resource.create({}) result = self.create_tracer_provider(resource=resource) @@ -428,6 +469,12 @@ async def _acquire(self): ) return connect + def close(self): + pass + + async def wait_closed(self): + pass + class MockPsycopg2Connection: def __init__(self, database, server_port, server_host, user): @@ -471,6 +518,9 @@ async def callproc(self, query, params=None, throw_exception=False): if throw_exception: raise Exception("Test Exception") + def close(self): + pass + class AiopgConnectionMock: _conn = MagicMock()