forked from open-telemetry/opentelemetry-python-contrib
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_asyncpg_wrapper.py
36 lines (31 loc) · 1.36 KB
/
test_asyncpg_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from asyncpg import Connection
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.test.test_base import TestBase
class TestAsyncPGInstrumentation(TestBase):
def test_duplicated_instrumentation_can_be_uninstrumented(self):
AsyncPGInstrumentor().instrument()
AsyncPGInstrumentor().instrument()
AsyncPGInstrumentor().instrument()
AsyncPGInstrumentor().uninstrument()
for method_name in ["execute", "fetch"]:
method = getattr(Connection, method_name, None)
self.assertFalse(
hasattr(method, "_opentelemetry_ext_asyncpg_applied")
)
def test_duplicated_instrumentation_works(self):
first = AsyncPGInstrumentor()
first.instrument()
second = AsyncPGInstrumentor()
second.instrument()
self.assertIsNotNone(first._tracer)
self.assertIsNotNone(second._tracer)
def test_duplicated_uninstrumentation(self):
AsyncPGInstrumentor().instrument()
AsyncPGInstrumentor().uninstrument()
AsyncPGInstrumentor().uninstrument()
AsyncPGInstrumentor().uninstrument()
for method_name in ["execute", "fetch"]:
method = getattr(Connection, method_name, None)
self.assertFalse(
hasattr(method, "_opentelemetry_ext_asyncpg_applied")
)