Skip to content

Commit 29b0612

Browse files
committed
Updated dbapi and psycopg2 instrumentations.
Changes: - Update dbapi instrumentation to use the SQL statement name as the span instead of the entire SQL query. - Renamed TracedCursor with CursorTracing. The class was not a valid Cursor so the name was confusing. - Updated CursorTracing's (previously TracedCursor) traced_execution method to accept the cursor instance as the first argument. This is required as for some dbapi implementations, we need a reference to the cursor in order to correctly format the SQL query. - Updated psycopg2 instrumentation to leverage dbapi's `cursor_factory` mechanism instead of wrapping the cursor with wrapt. This results in a simpler instrumentation without monkey patching objects at runtime and allows psycopg2's type registration system to work. This should make it possible to use psycopg2 instrumentation when using the JSONB feature or with frameworks like Django.
1 parent d12f67f commit 29b0612

File tree

13 files changed

+293
-92
lines changed

13 files changed

+293
-92
lines changed

.pylintrc

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ disable=missing-docstring,
7878
protected-access, # temp-pylint-upgrade
7979
super-init-not-called, # temp-pylint-upgrade
8080
invalid-overridden-method, # temp-pylint-upgrade
81-
missing-module-docstring, # temp-pylint-upgrad, # temp-pylint-upgradee
81+
missing-module-docstring, # temp-pylint-upgrade
8282

8383
# Enable the message, report, category or checker with the given id(s). You can
8484
# either give multiple identifier separated by comma (,) or put this option

CHANGELOG.md

+7-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5656
- `opentelemetry-instrumentation-aiopg` Fix AttributeError `__aexit__` when `aiopg.connect` and `aio[g].create_pool` used with async context manager
5757
([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235))
5858
- `opentelemetry-exporter-datadog` `opentelemetry-sdk-extension-aws` Fix reference to ids_generator in sdk
59-
([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235))
59+
([#283](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235))
60+
- `opentelemetry-instrumentation-dbapi`, `TracedCursor` replaced by `CursorTracer`
61+
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
62+
- `opentelemetry-instrumentation-psycopg2`, Added support for psycopg2 registered types.
63+
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
64+
- `opentelemetry-instrumentation-dbapi`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-aiopg` Use SQL command name as the span operation name instead of the entire query.
65+
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
6066

6167
## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26
6268

instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py

+18-14
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from aiopg.utils import _ContextManager, _PoolAcquireContextManager
55

66
from opentelemetry.instrumentation.dbapi import (
7+
CursorTracer,
78
DatabaseApiIntegration,
8-
TracedCursor,
99
)
1010
from opentelemetry.trace import SpanKind
1111
from opentelemetry.trace.status import Status, StatusCode
@@ -94,25 +94,29 @@ async def _acquire(self):
9494
return TracedPoolProxy(pool, *args, **kwargs)
9595

9696

97-
class AsyncTracedCursor(TracedCursor):
97+
class AsyncCursorTracer(CursorTracer):
9898
async def traced_execution(
9999
self,
100+
cursor,
100101
query_method: typing.Callable[..., typing.Any],
101102
*args: typing.Tuple[typing.Any, typing.Any],
102103
**kwargs: typing.Dict[typing.Any, typing.Any]
103104
):
104105
name = ""
105-
if len(args) > 0 and args[0]:
106-
name = args[0]
107-
elif self._db_api_integration.database:
108-
name = self._db_api_integration.database
109-
else:
110-
name = self._db_api_integration.name
106+
if args:
107+
name = self.get_operation_name(cursor, args)
108+
109+
if not name:
110+
name = (
111+
self._db_api_integration.database
112+
if self._db_api_integration.database
113+
else self._db_api_integration.name
114+
)
111115

112116
with self._db_api_integration.get_tracer().start_as_current_span(
113117
name, kind=SpanKind.CLIENT
114118
) as span:
115-
self._populate_span(span, *args)
119+
self._populate_span(span, cursor, *args)
116120
try:
117121
result = await query_method(*args, **kwargs)
118122
return result
@@ -123,30 +127,30 @@ async def traced_execution(
123127

124128

125129
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
126-
_traced_cursor = AsyncTracedCursor(db_api_integration)
130+
_traced_cursor = AsyncCursorTracer(db_api_integration)
127131

128132
# pylint: disable=abstract-method
129-
class AsyncTracedCursorProxy(AsyncProxyObject):
133+
class AsyncCursorTracerProxy(AsyncProxyObject):
130134

131135
# pylint: disable=unused-argument
132136
def __init__(self, cursor, *args, **kwargs):
133137
super().__init__(cursor)
134138

135139
async def execute(self, *args, **kwargs):
136140
result = await _traced_cursor.traced_execution(
137-
self.__wrapped__.execute, *args, **kwargs
141+
self, self.__wrapped__.execute, *args, **kwargs
138142
)
139143
return result
140144

141145
async def executemany(self, *args, **kwargs):
142146
result = await _traced_cursor.traced_execution(
143-
self.__wrapped__.executemany, *args, **kwargs
147+
self, self.__wrapped__.executemany, *args, **kwargs
144148
)
145149
return result
146150

147151
async def callproc(self, *args, **kwargs):
148152
result = await _traced_cursor.traced_execution(
149-
self.__wrapped__.callproc, *args, **kwargs
153+
self, self.__wrapped__.callproc, *args, **kwargs
150154
)
151155
return result
152156

instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ def test_span_succeeded(self):
256256
spans_list = self.memory_exporter.get_finished_spans()
257257
self.assertEqual(len(spans_list), 1)
258258
span = spans_list[0]
259-
self.assertEqual(span.name, "Test query")
259+
self.assertEqual(span.name, "Test")
260260
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
261261

262262
self.assertEqual(span.attributes["component"], "testcomponent")

instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py

+42-19
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def trace_integration(
6060
connection_attributes: typing.Dict = None,
6161
tracer_provider: typing.Optional[TracerProvider] = None,
6262
capture_parameters: bool = False,
63+
db_api_integration_factory=None,
6364
):
6465
"""Integrate with DB API library.
6566
https://www.python.org/dev/peps/pep-0249/
@@ -86,6 +87,7 @@ def trace_integration(
8687
version=__version__,
8788
tracer_provider=tracer_provider,
8889
capture_parameters=capture_parameters,
90+
db_api_integration_factory=db_api_integration_factory,
8991
)
9092

9193

@@ -99,6 +101,7 @@ def wrap_connect(
99101
version: str = "",
100102
tracer_provider: typing.Optional[TracerProvider] = None,
101103
capture_parameters: bool = False,
104+
db_api_integration_factory=None,
102105
):
103106
"""Integrate with DB API library.
104107
https://www.python.org/dev/peps/pep-0249/
@@ -115,6 +118,9 @@ def wrap_connect(
115118
capture_parameters: Configure if db.statement.parameters should be captured.
116119
117120
"""
121+
db_api_integration_factory = (
122+
db_api_integration_factory or DatabaseApiIntegration
123+
)
118124

119125
# pylint: disable=unused-argument
120126
def wrap_connect_(
@@ -123,7 +129,7 @@ def wrap_connect_(
123129
args: typing.Tuple[typing.Any, typing.Any],
124130
kwargs: typing.Dict[typing.Any, typing.Any],
125131
):
126-
db_integration = DatabaseApiIntegration(
132+
db_integration = db_api_integration_factory(
127133
name,
128134
database_component,
129135
database_type=database_type,
@@ -314,16 +320,19 @@ def __exit__(self, *args, **kwargs):
314320
return TracedConnectionProxy(connection, *args, **kwargs)
315321

316322

317-
class TracedCursor:
323+
class CursorTracer:
318324
def __init__(self, db_api_integration: DatabaseApiIntegration):
319325
self._db_api_integration = db_api_integration
320326

321327
def _populate_span(
322-
self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any]
328+
self,
329+
span: trace_api.Span,
330+
cursor,
331+
*args: typing.Tuple[typing.Any, typing.Any]
323332
):
324333
if not span.is_recording():
325334
return
326-
statement = args[0] if args else ""
335+
statement = self.get_statement(cursor, args)
327336
span.set_attribute(
328337
"component", self._db_api_integration.database_component
329338
)
@@ -342,24 +351,38 @@ def _populate_span(
342351
if self._db_api_integration.capture_parameters and len(args) > 1:
343352
span.set_attribute("db.statement.parameters", str(args[1]))
344353

354+
def get_operation_name(self, cursor, args): # pylint: disable=no-self-use
355+
if args and isinstance(args[0], str):
356+
return args[0].split()[0]
357+
return ""
358+
359+
def get_statement(self, cursor, args): # pylint: disable=no-self-use
360+
if not args:
361+
return ""
362+
statement = args[0]
363+
if isinstance(statement, bytes):
364+
return statement.decode("utf8", "replace")
365+
return statement
366+
345367
def traced_execution(
346368
self,
369+
cursor,
347370
query_method: typing.Callable[..., typing.Any],
348371
*args: typing.Tuple[typing.Any, typing.Any],
349372
**kwargs: typing.Dict[typing.Any, typing.Any]
350373
):
351-
name = ""
352-
if args:
353-
name = args[0]
354-
elif self._db_api_integration.database:
355-
name = self._db_api_integration.database
356-
else:
357-
name = self._db_api_integration.name
374+
name = self.get_operation_name(cursor, args)
375+
if not name:
376+
name = (
377+
self._db_api_integration.database
378+
if self._db_api_integration.database
379+
else self._db_api_integration.name
380+
)
358381

359382
with self._db_api_integration.get_tracer().start_as_current_span(
360383
name, kind=SpanKind.CLIENT
361384
) as span:
362-
self._populate_span(span, *args)
385+
self._populate_span(span, cursor, *args)
363386
try:
364387
result = query_method(*args, **kwargs)
365388
return result
@@ -370,7 +393,7 @@ def traced_execution(
370393

371394

372395
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
373-
_traced_cursor = TracedCursor(db_api_integration)
396+
_cursor_tracer = CursorTracer(db_api_integration)
374397

375398
# pylint: disable=abstract-method
376399
class TracedCursorProxy(wrapt.ObjectProxy):
@@ -380,18 +403,18 @@ def __init__(self, cursor, *args, **kwargs):
380403
wrapt.ObjectProxy.__init__(self, cursor)
381404

382405
def execute(self, *args, **kwargs):
383-
return _traced_cursor.traced_execution(
384-
self.__wrapped__.execute, *args, **kwargs
406+
return _cursor_tracer.traced_execution(
407+
self.__wrapped__, self.__wrapped__.execute, *args, **kwargs
385408
)
386409

387410
def executemany(self, *args, **kwargs):
388-
return _traced_cursor.traced_execution(
389-
self.__wrapped__.executemany, *args, **kwargs
411+
return _cursor_tracer.traced_execution(
412+
self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs
390413
)
391414

392415
def callproc(self, *args, **kwargs):
393-
return _traced_cursor.traced_execution(
394-
self.__wrapped__.callproc, *args, **kwargs
416+
return _cursor_tracer.traced_execution(
417+
self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs
395418
)
396419

397420
def __enter__(self):

instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py

+23-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def test_span_succeeded(self):
5050
spans_list = self.memory_exporter.get_finished_spans()
5151
self.assertEqual(len(spans_list), 1)
5252
span = spans_list[0]
53-
self.assertEqual(span.name, "Test query")
53+
self.assertEqual(span.name, "Test")
5454
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
5555

5656
self.assertEqual(span.attributes["component"], "testcomponent")
@@ -65,6 +65,27 @@ def test_span_succeeded(self):
6565
span.status.status_code, trace_api.status.StatusCode.UNSET
6666
)
6767

68+
def test_span_name(self):
69+
db_integration = dbapi.DatabaseApiIntegration(
70+
self.tracer, "testcomponent", "testtype", {}
71+
)
72+
mock_connection = db_integration.wrapped_connection(
73+
mock_connect, {}, {}
74+
)
75+
cursor = mock_connection.cursor()
76+
cursor.execute("Test query", ("param1Value", False))
77+
cursor.execute(
78+
"""multi
79+
line
80+
query"""
81+
)
82+
cursor.execute("tab\tseparated query")
83+
spans_list = self.memory_exporter.get_finished_spans()
84+
self.assertEqual(len(spans_list), 3)
85+
self.assertEqual(spans_list[0].name, "Test")
86+
self.assertEqual(spans_list[1].name, "multi")
87+
self.assertEqual(spans_list[2].name, "tab")
88+
6889
def test_span_succeeded_with_capture_of_statement_parameters(self):
6990
connection_props = {
7091
"database": "testdatabase",
@@ -93,7 +114,7 @@ def test_span_succeeded_with_capture_of_statement_parameters(self):
93114
spans_list = self.memory_exporter.get_finished_spans()
94115
self.assertEqual(len(spans_list), 1)
95116
span = spans_list[0]
96-
self.assertEqual(span.name, "Test query")
117+
self.assertEqual(span.name, "Test")
97118
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
98119

99120
self.assertEqual(span.attributes["component"], "testcomponent")

0 commit comments

Comments
 (0)