Skip to content

Commit 4c39049

Browse files
committed
Updated dbapi and psycopg2 instrumentations.
Changes: - Update dbapi instrumentation to use the SQL statement name as the span instead of the entire SQL query. - Renamed TracedCursor with CursorTracing. The class was not a valid Cursor so the name was confusing. - Updated CursorTracing's (previously TracedCursor) traced_execution method to accept the cursor instance as the first argument. This is required as for some dbapi implementations, we need a reference to the cursor in order to correctly format the SQL query. - Updated psycopg2 instrumentation to leverage dbapi's `cursor_factory` mechanism instead of wrapping the cursor with wrapt. This results in a simpler instrumentation without monkey patching objects at runtime and allows psycopg2's type registration system to work. This should make it possible to use psycopg2 instrumentation when using the JSONB feature or with frameworks like Django.
1 parent e5a0153 commit 4c39049

File tree

14 files changed

+234
-87
lines changed

14 files changed

+234
-87
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ lib64
2222
__pycache__
2323
venv*/
2424
.venv*/
25-
opentelemetry-python-core*/
25+
/opentelemetry-python-core*/
26+
/opentelemetry-python-core
2627

2728
# Installer logs
2829
pip-log.txt

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3535
([#236](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/236))
3636
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-falcon`, `opentelemetry-instrumentation-flask`, `opentelemetry-instrumentation-pyramid`, `opentelemetry-instrumentation-wsgi` Renamed `host.port` attribute to `net.host.port`
3737
([#242](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/242))
38+
- `opentelemetry-instrumentation-dbapi`, `TracedCursor` replaced by `CursorTracer`
39+
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
40+
- `opentelemetry-instrumentation-psycopg2`, Added support for psycopg2 registered types.
41+
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
42+
- `opentelemetry-instrumentation-dbapi`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-aiopg` Use SQL command name as the span operation name instead of the entire query.
43+
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
3844

3945
## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26
4046

instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py

+17-13
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from aiopg.utils import _ContextManager, _PoolAcquireContextManager
55

66
from opentelemetry.instrumentation.dbapi import (
7+
CursorTracer,
78
DatabaseApiIntegration,
8-
TracedCursor,
99
)
1010
from opentelemetry.trace import SpanKind
1111
from opentelemetry.trace.status import Status, StatusCode
@@ -94,25 +94,29 @@ async def _acquire(self):
9494
return TracedPoolProxy(pool, *args, **kwargs)
9595

9696

97-
class AsyncTracedCursor(TracedCursor):
97+
class AsyncCursorTracer(CursorTracer):
9898
async def traced_execution(
9999
self,
100+
cursor,
100101
query_method: typing.Callable[..., typing.Any],
101102
*args: typing.Tuple[typing.Any, typing.Any],
102103
**kwargs: typing.Dict[typing.Any, typing.Any]
103104
):
104105
name = ""
105-
if len(args) > 0 and args[0]:
106-
name = args[0]
107-
elif self._db_api_integration.database:
108-
name = self._db_api_integration.database
109-
else:
110-
name = self._db_api_integration.name
106+
if args:
107+
name = self.get_operation_name(cursor, args)
108+
109+
if not name:
110+
name = (
111+
self._db_api_integration.database
112+
if self._db_api_integration.database
113+
else self._db_api_integration.name
114+
)
111115

112116
with self._db_api_integration.get_tracer().start_as_current_span(
113117
name, kind=SpanKind.CLIENT
114118
) as span:
115-
self._populate_span(span, *args)
119+
self._populate_span(cursor, span, *args)
116120
try:
117121
result = await query_method(*args, **kwargs)
118122
return result
@@ -123,7 +127,7 @@ async def traced_execution(
123127

124128

125129
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
126-
_traced_cursor = AsyncTracedCursor(db_api_integration)
130+
_traced_cursor = AsyncCursorTracer(db_api_integration)
127131

128132
# pylint: disable=abstract-method
129133
class AsyncTracedCursorProxy(AsyncProxyObject):
@@ -134,19 +138,19 @@ def __init__(self, cursor, *args, **kwargs):
134138

135139
async def execute(self, *args, **kwargs):
136140
result = await _traced_cursor.traced_execution(
137-
self.__wrapped__.execute, *args, **kwargs
141+
self, self.__wrapped__.execute, *args, **kwargs
138142
)
139143
return result
140144

141145
async def executemany(self, *args, **kwargs):
142146
result = await _traced_cursor.traced_execution(
143-
self.__wrapped__.executemany, *args, **kwargs
147+
self, self.__wrapped__.executemany, *args, **kwargs
144148
)
145149
return result
146150

147151
async def callproc(self, *args, **kwargs):
148152
result = await _traced_cursor.traced_execution(
149-
self.__wrapped__.callproc, *args, **kwargs
153+
self, self.__wrapped__.callproc, *args, **kwargs
150154
)
151155
return result
152156

instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ def test_span_succeeded(self):
215215
spans_list = self.memory_exporter.get_finished_spans()
216216
self.assertEqual(len(spans_list), 1)
217217
span = spans_list[0]
218-
self.assertEqual(span.name, "Test query")
218+
self.assertEqual(span.name, "Test")
219219
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
220220

221221
self.assertEqual(span.attributes["component"], "testcomponent")

instrumentation/opentelemetry-instrumentation-asyncpg/tests/test_asyncpg_wrapper.py

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncpg
22
from asyncpg import Connection
3-
43
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
54
from opentelemetry.test.test_base import TestBase
65

instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py

+43-17
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def trace_integration(
6060
connection_attributes: typing.Dict = None,
6161
tracer_provider: typing.Optional[TracerProvider] = None,
6262
capture_parameters: bool = False,
63+
db_api_integration_factory=None,
6364
):
6465
"""Integrate with DB API library.
6566
https://www.python.org/dev/peps/pep-0249/
@@ -86,6 +87,7 @@ def trace_integration(
8687
version=__version__,
8788
tracer_provider=tracer_provider,
8889
capture_parameters=capture_parameters,
90+
db_api_integration_factory=db_api_integration_factory,
8991
)
9092

9193

@@ -99,6 +101,7 @@ def wrap_connect(
99101
version: str = "",
100102
tracer_provider: typing.Optional[TracerProvider] = None,
101103
capture_parameters: bool = False,
104+
db_api_integration_factory=None,
102105
):
103106
"""Integrate with DB API library.
104107
https://www.python.org/dev/peps/pep-0249/
@@ -115,6 +118,9 @@ def wrap_connect(
115118
capture_parameters: Configure if db.statement.parameters should be captured.
116119
117120
"""
121+
db_api_integration_factory = (
122+
db_api_integration_factory or DatabaseApiIntegration
123+
)
118124

119125
# pylint: disable=unused-argument
120126
def wrap_connect_(
@@ -123,7 +129,7 @@ def wrap_connect_(
123129
args: typing.Tuple[typing.Any, typing.Any],
124130
kwargs: typing.Dict[typing.Any, typing.Any],
125131
):
126-
db_integration = DatabaseApiIntegration(
132+
db_integration = db_api_integration_factory(
127133
name,
128134
database_component,
129135
database_type=database_type,
@@ -314,16 +320,19 @@ def __exit__(self, *args, **kwargs):
314320
return TracedConnectionProxy(connection, *args, **kwargs)
315321

316322

317-
class TracedCursor:
323+
class CursorTracer:
318324
def __init__(self, db_api_integration: DatabaseApiIntegration):
319325
self._db_api_integration = db_api_integration
320326

321327
def _populate_span(
322-
self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any]
328+
self,
329+
cursor,
330+
span: trace_api.Span,
331+
*args: typing.Tuple[typing.Any, typing.Any]
323332
):
324333
if not span.is_recording():
325334
return
326-
statement = args[0] if args else ""
335+
statement = self.get_statement(cursor, args)
327336
span.set_attribute(
328337
"component", self._db_api_integration.database_component
329338
)
@@ -342,24 +351,41 @@ def _populate_span(
342351
if self._db_api_integration.capture_parameters and len(args) > 1:
343352
span.set_attribute("db.statement.parameters", str(args[1]))
344353

354+
def get_operation_name(self, cursor, args):
355+
if args and isinstance(args[0], str):
356+
return args[0].split(" ")[0]
357+
return ""
358+
359+
def get_statement(self, cursor, args):
360+
if not args:
361+
return ""
362+
statement = args[0]
363+
if isinstance(statement, bytes):
364+
return statement.decode("utf8", "replace")
365+
return statement
366+
345367
def traced_execution(
346368
self,
369+
cursor,
347370
query_method: typing.Callable[..., typing.Any],
348371
*args: typing.Tuple[typing.Any, typing.Any],
349372
**kwargs: typing.Dict[typing.Any, typing.Any]
350373
):
351374
name = ""
352375
if args:
353-
name = args[0]
354-
elif self._db_api_integration.database:
355-
name = self._db_api_integration.database
356-
else:
357-
name = self._db_api_integration.name
376+
name = self.get_operation_name(cursor, args)
377+
378+
if not name:
379+
name = (
380+
self._db_api_integration.database
381+
if self._db_api_integration.database
382+
else self._db_api_integration.name
383+
)
358384

359385
with self._db_api_integration.get_tracer().start_as_current_span(
360386
name, kind=SpanKind.CLIENT
361387
) as span:
362-
self._populate_span(span, *args)
388+
self._populate_span(cursor, span, *args)
363389
try:
364390
result = query_method(*args, **kwargs)
365391
return result
@@ -370,7 +396,7 @@ def traced_execution(
370396

371397

372398
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
373-
_traced_cursor = TracedCursor(db_api_integration)
399+
_cursor_tracer = CursorTracer(db_api_integration)
374400

375401
# pylint: disable=abstract-method
376402
class TracedCursorProxy(wrapt.ObjectProxy):
@@ -380,18 +406,18 @@ def __init__(self, cursor, *args, **kwargs):
380406
wrapt.ObjectProxy.__init__(self, cursor)
381407

382408
def execute(self, *args, **kwargs):
383-
return _traced_cursor.traced_execution(
384-
self.__wrapped__.execute, *args, **kwargs
409+
return _cursor_tracer.traced_execution(
410+
self.__wrapped__, self.__wrapped__.execute, *args, **kwargs
385411
)
386412

387413
def executemany(self, *args, **kwargs):
388-
return _traced_cursor.traced_execution(
389-
self.__wrapped__.executemany, *args, **kwargs
414+
return _cursor_tracer.traced_execution(
415+
self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs
390416
)
391417

392418
def callproc(self, *args, **kwargs):
393-
return _traced_cursor.traced_execution(
394-
self.__wrapped__.callproc, *args, **kwargs
419+
return _cursor_tracer.traced_execution(
420+
self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs
395421
)
396422

397423
def __enter__(self):

instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def test_span_succeeded(self):
5050
spans_list = self.memory_exporter.get_finished_spans()
5151
self.assertEqual(len(spans_list), 1)
5252
span = spans_list[0]
53-
self.assertEqual(span.name, "Test query")
53+
self.assertEqual(span.name, "Test")
5454
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
5555

5656
self.assertEqual(span.attributes["component"], "testcomponent")
@@ -93,7 +93,7 @@ def test_span_succeeded_with_capture_of_statement_parameters(self):
9393
spans_list = self.memory_exporter.get_finished_spans()
9494
self.assertEqual(len(spans_list), 1)
9595
span = spans_list[0]
96-
self.assertEqual(span.name, "Test query")
96+
self.assertEqual(span.name, "Test")
9797
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
9898

9999
self.assertEqual(span.attributes["component"], "testcomponent")

0 commit comments

Comments
 (0)