Skip to content

Commit de17e6e

Browse files
author
asagitullin
committed
fix instrumentation of connection when pool.acquire was called multiple times
1 parent f8e51c4 commit de17e6e

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

Diff for: instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,11 @@ def acquire(self):
8686
async def _acquire(self):
8787
# pylint: disable=protected-access
8888
connection = await self.__wrapped__._acquire()
89-
return get_traced_connection_proxy(
90-
connection, db_api_integration, *args, **kwargs
91-
)
89+
if not hasattr(connection, "__wrapped__"):
90+
connection = get_traced_connection_proxy(
91+
connection, db_api_integration, *args, **kwargs
92+
)
93+
return connection
9294

9395
return TracedPoolProxy(pool, *args, **kwargs)
9496

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import asyncio
2+
import os
3+
4+
import aiopg
5+
6+
from opentelemetry.instrumentation.aiopg import AiopgInstrumentor
7+
from opentelemetry.test.test_base import TestBase
8+
9+
POSTGRES_HOST = os.getenv("POSTGRESQL_HOST", "localhost")
10+
POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT", "5432"))
11+
POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "opentelemetry-tests")
12+
POSTGRES_PASSWORD = os.getenv("POSTGRESQL_PASSWORD", "testpassword")
13+
POSTGRES_USER = os.getenv("POSTGRESQL_USER", "testuser")
14+
15+
16+
def async_call(coro):
17+
loop = asyncio.get_event_loop()
18+
return loop.run_until_complete(coro)
19+
20+
21+
class TestFunctionalAsyncPG(TestBase):
22+
@classmethod
23+
def setUpClass(cls):
24+
super().setUpClass()
25+
cls._connection = None
26+
cls._cursor = None
27+
cls._tracer = cls.tracer_provider.get_tracer(__name__)
28+
AiopgInstrumentor().instrument(tracer_provider=cls.tracer_provider)
29+
cls._dsn = (
30+
f"dbname='{POSTGRES_DB_NAME}' user='{POSTGRES_USER}' password='{POSTGRES_PASSWORD}'"
31+
f" host='{POSTGRES_HOST}' port='{POSTGRES_PORT}'"
32+
)
33+
34+
@classmethod
35+
def tearDownClass(cls):
36+
AiopgInstrumentor().uninstrument()
37+
38+
def test_instrumented_pool_with_multiple_acquires(self, *_, **__):
39+
async def double_asquire():
40+
pool = await aiopg.create_pool(dsn=self._dsn)
41+
async with pool.acquire() as conn:
42+
async with conn.cursor() as cursor:
43+
query = "SELECT 1"
44+
await cursor.execute(query)
45+
async with pool.acquire() as conn:
46+
async with conn.cursor() as cursor:
47+
query = "SELECT 1"
48+
await cursor.execute(query)
49+
50+
async_call(double_asquire())
51+
spans = self.memory_exporter.get_finished_spans()
52+
self.assertEqual(len(spans), 2)

0 commit comments

Comments
 (0)