From c14d9e4faee8296b927b4eb4ae8cccd3edb6cd93 Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Fri, 11 Dec 2020 16:13:37 +0530 Subject: [PATCH] Updated dbapi and psycopg2 instrumentations. Changes: - Update dbapi instrumentation to use the SQL statement name as the span instead of the entire SQL query. - Renamed TracedCursor with CursorTracing. The class was not a valid Cursor so the name was confusing. - Updated CursorTracing's (previously TracedCursor) traced_execution method to accept the cursor instance as the first argument. This is required as for some dbapi implementations, we need a reference to the cursor in order to correctly format the SQL query. - Updated psycopg2 instrumentation to leverage dbapi's `cursor_factory` mechanism instead of wrapping the cursor with wrapt. This results in a simpler instrumentation without monkey patching objects at runtime and allows psycopg2's type registration system to work. This should make it possible to use psycopg2 instrumentation when using the JSONB feature or with frameworks like Django. --- .pylintrc | 2 +- CHANGELOG.md | 8 +- .../aiopg/aiopg_integration.py | 34 ++--- .../tests/test_aiopg_integration.py | 2 +- .../instrumentation/dbapi/__init__.py | 61 ++++++--- .../tests/test_dbapi_integration.py | 25 +++- .../instrumentation/psycopg2/__init__.py | 122 ++++++++++++++---- .../tests/test_psycopg2_integration.py | 71 ++++++++-- .../tests/test_sqlite3.py | 4 +- .../tests/mysql/test_mysql_functional.py | 8 +- .../tests/postgres/test_aiopg_functional.py | 8 +- .../tests/postgres/test_psycopg_functional.py | 36 +++++- .../tests/pymysql/test_pymysql_functional.py | 6 +- 13 files changed, 294 insertions(+), 93 deletions(-) diff --git a/.pylintrc b/.pylintrc index 9f767af293..2a2ad87040 100644 --- a/.pylintrc +++ b/.pylintrc @@ -78,7 +78,7 @@ disable=missing-docstring, protected-access, # temp-pylint-upgrade super-init-not-called, # temp-pylint-upgrade invalid-overridden-method, # temp-pylint-upgrade - missing-module-docstring, # temp-pylint-upgrad, # temp-pylint-upgradee + missing-module-docstring, # temp-pylint-upgrade # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/CHANGELOG.md b/CHANGELOG.md index 0312ed9764..80d75b2bad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,7 +56,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-aiopg` Fix AttributeError `__aexit__` when `aiopg.connect` and `aio[g].create_pool` used with async context manager ([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235)) - `opentelemetry-exporter-datadog` `opentelemetry-sdk-extension-aws` Fix reference to ids_generator in sdk - ([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235)) + ([#283](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235)) +- `opentelemetry-instrumentation-dbapi`, `TracedCursor` replaced by `CursorTracer` + ([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246)) +- `opentelemetry-instrumentation-psycopg2`, Added support for psycopg2 registered types. + ([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246)) +- `opentelemetry-instrumentation-dbapi`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-aiopg` Use SQL command name as the span operation name instead of the entire query. + ([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246)) ## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26 diff --git a/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py b/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py index 9824237565..b130ef2b51 100644 --- a/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-aiopg/src/opentelemetry/instrumentation/aiopg/aiopg_integration.py @@ -4,8 +4,8 @@ from aiopg.utils import _ContextManager, _PoolAcquireContextManager from opentelemetry.instrumentation.dbapi import ( + CursorTracer, DatabaseApiIntegration, - TracedCursor, ) from opentelemetry.trace import SpanKind from opentelemetry.trace.status import Status, StatusCode @@ -94,25 +94,29 @@ async def _acquire(self): return TracedPoolProxy(pool, *args, **kwargs) -class AsyncTracedCursor(TracedCursor): +class AsyncCursorTracer(CursorTracer): async def traced_execution( self, + cursor, query_method: typing.Callable[..., typing.Any], *args: typing.Tuple[typing.Any, typing.Any], **kwargs: typing.Dict[typing.Any, typing.Any] ): name = "" - if len(args) > 0 and args[0]: - name = args[0] - elif self._db_api_integration.database: - name = self._db_api_integration.database - else: - name = self._db_api_integration.name + if args: + name = self.get_operation_name(cursor, args) + + if not name: + name = ( + self._db_api_integration.database + if self._db_api_integration.database + else self._db_api_integration.name + ) with self._db_api_integration.get_tracer().start_as_current_span( name, kind=SpanKind.CLIENT ) as span: - self._populate_span(span, *args) + self._populate_span(span, cursor, *args) try: result = await query_method(*args, **kwargs) return result @@ -123,10 +127,10 @@ async def traced_execution( def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs): - _traced_cursor = AsyncTracedCursor(db_api_integration) + _traced_cursor = AsyncCursorTracer(db_api_integration) # pylint: disable=abstract-method - class AsyncTracedCursorProxy(AsyncProxyObject): + class AsyncCursorTracerProxy(AsyncProxyObject): # pylint: disable=unused-argument def __init__(self, cursor, *args, **kwargs): @@ -134,20 +138,20 @@ def __init__(self, cursor, *args, **kwargs): async def execute(self, *args, **kwargs): result = await _traced_cursor.traced_execution( - self.__wrapped__.execute, *args, **kwargs + self, self.__wrapped__.execute, *args, **kwargs ) return result async def executemany(self, *args, **kwargs): result = await _traced_cursor.traced_execution( - self.__wrapped__.executemany, *args, **kwargs + self, self.__wrapped__.executemany, *args, **kwargs ) return result async def callproc(self, *args, **kwargs): result = await _traced_cursor.traced_execution( - self.__wrapped__.callproc, *args, **kwargs + self, self.__wrapped__.callproc, *args, **kwargs ) return result - return AsyncTracedCursorProxy(cursor, *args, **kwargs) + return AsyncCursorTracerProxy(cursor, *args, **kwargs) diff --git a/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py b/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py index ad935cfdfd..78c342df9b 100644 --- a/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-aiopg/tests/test_aiopg_integration.py @@ -256,7 +256,7 @@ def test_span_succeeded(self): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) span = spans_list[0] - self.assertEqual(span.name, "Test query") + self.assertEqual(span.name, "Test") self.assertIs(span.kind, trace_api.SpanKind.CLIENT) self.assertEqual(span.attributes["component"], "testcomponent") diff --git a/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py b/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py index 197f4ade44..67c25f9881 100644 --- a/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py @@ -60,6 +60,7 @@ def trace_integration( connection_attributes: typing.Dict = None, tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, + db_api_integration_factory=None, ): """Integrate with DB API library. https://www.python.org/dev/peps/pep-0249/ @@ -86,6 +87,7 @@ def trace_integration( version=__version__, tracer_provider=tracer_provider, capture_parameters=capture_parameters, + db_api_integration_factory=db_api_integration_factory, ) @@ -99,6 +101,7 @@ def wrap_connect( version: str = "", tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, + db_api_integration_factory=None, ): """Integrate with DB API library. https://www.python.org/dev/peps/pep-0249/ @@ -115,6 +118,9 @@ def wrap_connect( capture_parameters: Configure if db.statement.parameters should be captured. """ + db_api_integration_factory = ( + db_api_integration_factory or DatabaseApiIntegration + ) # pylint: disable=unused-argument def wrap_connect_( @@ -123,7 +129,7 @@ def wrap_connect_( args: typing.Tuple[typing.Any, typing.Any], kwargs: typing.Dict[typing.Any, typing.Any], ): - db_integration = DatabaseApiIntegration( + db_integration = db_api_integration_factory( name, database_component, database_type=database_type, @@ -314,16 +320,19 @@ def __exit__(self, *args, **kwargs): return TracedConnectionProxy(connection, *args, **kwargs) -class TracedCursor: +class CursorTracer: def __init__(self, db_api_integration: DatabaseApiIntegration): self._db_api_integration = db_api_integration def _populate_span( - self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any] + self, + span: trace_api.Span, + cursor, + *args: typing.Tuple[typing.Any, typing.Any] ): if not span.is_recording(): return - statement = args[0] if args else "" + statement = self.get_statement(cursor, args) span.set_attribute( "component", self._db_api_integration.database_component ) @@ -342,24 +351,38 @@ def _populate_span( if self._db_api_integration.capture_parameters and len(args) > 1: span.set_attribute("db.statement.parameters", str(args[1])) + def get_operation_name(self, cursor, args): # pylint: disable=no-self-use + if args and isinstance(args[0], str): + return args[0].split()[0] + return "" + + def get_statement(self, cursor, args): # pylint: disable=no-self-use + if not args: + return "" + statement = args[0] + if isinstance(statement, bytes): + return statement.decode("utf8", "replace") + return statement + def traced_execution( self, + cursor, query_method: typing.Callable[..., typing.Any], *args: typing.Tuple[typing.Any, typing.Any], **kwargs: typing.Dict[typing.Any, typing.Any] ): - name = "" - if args: - name = args[0] - elif self._db_api_integration.database: - name = self._db_api_integration.database - else: - name = self._db_api_integration.name + name = self.get_operation_name(cursor, args) + if not name: + name = ( + self._db_api_integration.database + if self._db_api_integration.database + else self._db_api_integration.name + ) with self._db_api_integration.get_tracer().start_as_current_span( name, kind=SpanKind.CLIENT ) as span: - self._populate_span(span, *args) + self._populate_span(span, cursor, *args) try: result = query_method(*args, **kwargs) return result @@ -370,7 +393,7 @@ def traced_execution( def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs): - _traced_cursor = TracedCursor(db_api_integration) + _cursor_tracer = CursorTracer(db_api_integration) # pylint: disable=abstract-method class TracedCursorProxy(wrapt.ObjectProxy): @@ -380,18 +403,18 @@ def __init__(self, cursor, *args, **kwargs): wrapt.ObjectProxy.__init__(self, cursor) def execute(self, *args, **kwargs): - return _traced_cursor.traced_execution( - self.__wrapped__.execute, *args, **kwargs + return _cursor_tracer.traced_execution( + self.__wrapped__, self.__wrapped__.execute, *args, **kwargs ) def executemany(self, *args, **kwargs): - return _traced_cursor.traced_execution( - self.__wrapped__.executemany, *args, **kwargs + return _cursor_tracer.traced_execution( + self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs ) def callproc(self, *args, **kwargs): - return _traced_cursor.traced_execution( - self.__wrapped__.callproc, *args, **kwargs + return _cursor_tracer.traced_execution( + self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs ) def __enter__(self): diff --git a/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py b/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py index e69bf60c9d..c07f44c2b4 100644 --- a/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py +++ b/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py @@ -50,7 +50,7 @@ def test_span_succeeded(self): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) span = spans_list[0] - self.assertEqual(span.name, "Test query") + self.assertEqual(span.name, "Test") self.assertIs(span.kind, trace_api.SpanKind.CLIENT) self.assertEqual(span.attributes["component"], "testcomponent") @@ -65,6 +65,27 @@ def test_span_succeeded(self): span.status.status_code, trace_api.status.StatusCode.UNSET ) + def test_span_name(self): + db_integration = dbapi.DatabaseApiIntegration( + self.tracer, "testcomponent", "testtype", {} + ) + mock_connection = db_integration.wrapped_connection( + mock_connect, {}, {} + ) + cursor = mock_connection.cursor() + cursor.execute("Test query", ("param1Value", False)) + cursor.execute( + """multi + line + query""" + ) + cursor.execute("tab\tseparated query") + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 3) + self.assertEqual(spans_list[0].name, "Test") + self.assertEqual(spans_list[1].name, "multi") + self.assertEqual(spans_list[2].name, "tab") + def test_span_succeeded_with_capture_of_statement_parameters(self): connection_props = { "database": "testdatabase", @@ -93,7 +114,7 @@ def test_span_succeeded_with_capture_of_statement_parameters(self): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) span = spans_list[0] - self.assertEqual(span.name, "Test query") + self.assertEqual(span.name, "Test") self.assertIs(span.kind, trace_api.SpanKind.CLIENT) self.assertEqual(span.attributes["component"], "testcomponent") diff --git a/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py index 4b8799402e..ef807963aa 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py @@ -39,12 +39,19 @@ --- """ +import typing + import psycopg2 +from psycopg2.extensions import ( + cursor as pg_cursor, # pylint: disable=no-name-in-module +) +from psycopg2.sql import Composed # pylint: disable=no-name-in-module from opentelemetry.instrumentation import dbapi from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.psycopg2.version import __version__ -from opentelemetry.trace import get_tracer + +_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory" class Psycopg2Instrumentor(BaseInstrumentor): @@ -62,7 +69,6 @@ def _instrument(self, **kwargs): """Integrate with PostgreSQL Psycopg library. Psycopg: http://initd.org/psycopg/ """ - tracer_provider = kwargs.get("tracer_provider") dbapi.wrap_connect( @@ -74,39 +80,101 @@ def _instrument(self, **kwargs): self._CONNECTION_ATTRIBUTES, version=__version__, tracer_provider=tracer_provider, + db_api_integration_factory=DatabaseApiIntegration, ) def _uninstrument(self, **kwargs): """"Disable Psycopg2 instrumentation""" dbapi.unwrap_connect(psycopg2, "connect") - # pylint:disable=no-self-use - def instrument_connection(self, connection): - """Enable instrumentation in a Psycopg2 connection. - - Args: - connection: The connection to instrument. + # TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql + def instrument_connection(self, connection): # pylint: disable=no-self-use + setattr( + connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory + ) + connection.cursor_factory = _new_cursor_factory() + return connection + + # TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql + def uninstrument_connection( + self, connection + ): # pylint: disable=no-self-use + connection.cursor_factory = getattr( + connection, _OTEL_CURSOR_FACTORY_KEY, None + ) + return connection + + +# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql +class DatabaseApiIntegration(dbapi.DatabaseApiIntegration): + def wrapped_connection( + self, + connect_method: typing.Callable[..., typing.Any], + args: typing.Tuple[typing.Any, typing.Any], + kwargs: typing.Dict[typing.Any, typing.Any], + ): + """Add object proxy to connection object.""" + base_cursor_factory = kwargs.pop("cursor_factory", None) + new_factory_kwargs = {"db_api": self} + if base_cursor_factory: + new_factory_kwargs["base_factory"] = base_cursor_factory + kwargs["cursor_factory"] = _new_cursor_factory(**new_factory_kwargs) + connection = connect_method(*args, **kwargs) + self.get_connection_attributes(connection) + return connection + + +class CursorTracer(dbapi.CursorTracer): + def get_operation_name(self, cursor, args): + if not args: + return "" + + statement = args[0] + if isinstance(statement, Composed): + statement = statement.as_string(cursor) + + if isinstance(statement, str): + return statement.split()[0] + + return "" + + def get_statement(self, cursor, args): + if not args: + return "" + + statement = args[0] + if isinstance(statement, Composed): + statement = statement.as_string(cursor) + return statement + + +def _new_cursor_factory(db_api=None, base_factory=None): + if not db_api: + db_api = DatabaseApiIntegration( + __name__, + Psycopg2Instrumentor._DATABASE_COMPONENT, + database_type=Psycopg2Instrumentor._DATABASE_TYPE, + connection_attributes=Psycopg2Instrumentor._CONNECTION_ATTRIBUTES, + version=__version__, + ) - Returns: - An instrumented connection. - """ - tracer = get_tracer(__name__, __version__) + base_factory = base_factory or pg_cursor + _cursor_tracer = CursorTracer(db_api) - return dbapi.instrument_connection( - tracer, - connection, - self._DATABASE_COMPONENT, - self._DATABASE_TYPE, - self._CONNECTION_ATTRIBUTES, - ) + class TracedCursorFactory(base_factory): + def execute(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self, super().execute, *args, **kwargs + ) - def uninstrument_connection(self, connection): - """Disable instrumentation in a Psycopg2 connection. + def executemany(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self, super().executemany, *args, **kwargs + ) - Args: - connection: The connection to uninstrument. + def callproc(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self, super().callproc, *args, **kwargs + ) - Returns: - An uninstrumented connection. - """ - return dbapi.uninstrument_connection(connection) + return TracedCursorFactory diff --git a/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py index cb127c7a5e..e25fd7a934 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg2/tests/test_psycopg2_integration.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import types from unittest import mock import psycopg2 @@ -22,15 +23,69 @@ from opentelemetry.test.test_base import TestBase +class MockCursor: + + execute = mock.MagicMock(spec=types.MethodType) + execute.__name__ = "execute" + + executemany = mock.MagicMock(spec=types.MethodType) + executemany.__name__ = "executemany" + + callproc = mock.MagicMock(spec=types.MethodType) + callproc.__name__ = "callproc" + + rowcount = "SomeRowCount" + + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + return self + + +class MockConnection: + + commit = mock.MagicMock(spec=types.MethodType) + commit.__name__ = "commit" + + rollback = mock.MagicMock(spec=types.MethodType) + rollback.__name__ = "rollback" + + def __init__(self, *args, **kwargs): + self.cursor_factory = kwargs.pop("cursor_factory", None) + + def cursor(self): + if self.cursor_factory: + return self.cursor_factory(self) + return MockCursor() + + def get_dsn_parameters(self): # pylint: disable=no-self-use + return dict(dbname="test") + + class TestPostgresqlIntegration(TestBase): + def setUp(self): + self.cursor_mock = mock.patch( + "opentelemetry.instrumentation.psycopg2.pg_cursor", MockCursor + ) + self.connection_mock = mock.patch("psycopg2.connect", MockConnection) + + self.cursor_mock.start() + self.connection_mock.start() + def tearDown(self): super().tearDown() + self.memory_exporter.clear() + self.cursor_mock.stop() + self.connection_mock.stop() with self.disable_logging(): Psycopg2Instrumentor().uninstrument() - @mock.patch("psycopg2.connect") # pylint: disable=unused-argument - def test_instrumentor(self, mock_connect): + def test_instrumentor(self): Psycopg2Instrumentor().instrument() cnx = psycopg2.connect(database="test") @@ -60,9 +115,8 @@ def test_instrumentor(self, mock_connect): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) - @mock.patch("psycopg2.connect") # pylint: disable=unused-argument - def test_not_recording(self, mock_connect): + def test_not_recording(self): mock_tracer = mock.Mock() mock_span = mock.Mock() mock_span.is_recording.return_value = False @@ -83,9 +137,8 @@ def test_not_recording(self, mock_connect): Psycopg2Instrumentor().uninstrument() - @mock.patch("psycopg2.connect") # pylint: disable=unused-argument - def test_custom_tracer_provider(self, mock_connect): + def test_custom_tracer_provider(self): resource = resources.Resource.create({}) result = self.create_tracer_provider(resource=resource) tracer_provider, exporter = result @@ -103,9 +156,8 @@ def test_custom_tracer_provider(self, mock_connect): self.assertIs(span.resource, resource) - @mock.patch("psycopg2.connect") # pylint: disable=unused-argument - def test_instrument_connection(self, mock_connect): + def test_instrument_connection(self): cnx = psycopg2.connect(database="test") query = "SELECT * FROM test" cursor = cnx.cursor() @@ -121,9 +173,8 @@ def test_instrument_connection(self, mock_connect): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) - @mock.patch("psycopg2.connect") # pylint: disable=unused-argument - def test_uninstrument_connection(self, mock_connect): + def test_uninstrument_connection(self): Psycopg2Instrumentor().instrument() cnx = psycopg2.connect(database="test") query = "SELECT * FROM test" diff --git a/instrumentation/opentelemetry-instrumentation-sqlite3/tests/test_sqlite3.py b/instrumentation/opentelemetry-instrumentation-sqlite3/tests/test_sqlite3.py index a4fc887061..6b8b0cb696 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlite3/tests/test_sqlite3.py +++ b/instrumentation/opentelemetry-instrumentation-sqlite3/tests/test_sqlite3.py @@ -60,7 +60,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id integer)" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_executemany(self): """Should create a child span for executemany""" @@ -68,7 +68,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = [("1",), ("2",), ("3",)] self._cursor.executemany(stmt, data) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc""" diff --git a/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py b/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py index ec6eed3132..df818ab137 100644 --- a/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py +++ b/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py @@ -81,7 +81,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id INT)" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_execute_with_connection_context_manager(self): """Should create a child span for execute with connection context""" @@ -90,7 +90,7 @@ def test_execute_with_connection_context_manager(self): with self._connection as conn: cursor = conn.cursor() cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_execute_with_cursor_context_manager(self): """Should create a child span for execute with cursor context""" @@ -98,7 +98,7 @@ def test_execute_with_cursor_context_manager(self): with self._tracer.start_as_current_span("rootSpan"): with self._connection.cursor() as cursor: cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_executemany(self): """Should create a child span for executemany""" @@ -106,7 +106,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) self._cursor.executemany(stmt, data) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc""" diff --git a/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py b/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py index 030aecc66d..423151316a 100644 --- a/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py @@ -89,7 +89,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id integer)" with self._tracer.start_as_current_span("rootSpan"): async_call(self._cursor.execute(stmt)) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_executemany(self): """Should create a child span for executemany""" @@ -98,7 +98,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) async_call(self._cursor.executemany(stmt, data)) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc""" @@ -167,7 +167,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id integer)" with self._tracer.start_as_current_span("rootSpan"): async_call(self._cursor.execute(stmt)) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_executemany(self): """Should create a child span for executemany""" @@ -176,7 +176,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) async_call(self._cursor.executemany(stmt, data)) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc""" diff --git a/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py b/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py index 76116dfd28..53fe3cacfb 100644 --- a/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py @@ -15,6 +15,7 @@ import os import psycopg2 +from psycopg2 import sql from opentelemetry import trace as trace_api from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor @@ -81,7 +82,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id integer)" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_execute_with_connection_context_manager(self): """Should create a child span for execute with connection context""" @@ -90,7 +91,7 @@ def test_execute_with_connection_context_manager(self): with self._connection as conn: cursor = conn.cursor() cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_execute_with_cursor_context_manager(self): """Should create a child span for execute with cursor context""" @@ -98,7 +99,7 @@ def test_execute_with_cursor_context_manager(self): with self._tracer.start_as_current_span("rootSpan"): with self._connection.cursor() as cursor: cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") self.assertTrue(cursor.closed) def test_executemany(self): @@ -107,7 +108,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) self._cursor.executemany(stmt, data) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc""" @@ -116,3 +117,30 @@ def test_callproc(self): ): self._cursor.callproc("test", ()) self.validate_spans("test") + + def test_register_types(self): + psycopg2.extras.register_default_jsonb( + conn_or_curs=self._cursor, loads=lambda x: x + ) + + def test_composed_queries(self): + stmt = "CREATE TABLE IF NOT EXISTS users (id integer, name varchar)" + with self._tracer.start_as_current_span("rootSpan"): + self._cursor.execute(stmt) + self.validate_spans("CREATE") + + self._cursor.execute( + sql.SQL("SELECT FROM {table} where {field}='{value}'").format( + table=sql.Identifier("users"), + field=sql.Identifier("name"), + value=sql.Identifier("abc"), + ) + ) + + spans = self.memory_exporter.get_finished_spans() + span = spans[2] + self.assertEqual(span.name, "SELECT") + self.assertEqual( + span.attributes["db.statement"], + 'SELECT FROM "users" where "name"=\'"abc"\'', + ) diff --git a/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py b/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py index b8e4404805..278bfc4d42 100644 --- a/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py +++ b/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py @@ -78,7 +78,7 @@ def test_execute(self): stmt = "CREATE TABLE IF NOT EXISTS test (id INT)" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_execute_with_cursor_context_manager(self): """Should create a child span for execute with cursor context""" @@ -86,7 +86,7 @@ def test_execute_with_cursor_context_manager(self): with self._tracer.start_as_current_span("rootSpan"): with self._connection.cursor() as cursor: cursor.execute(stmt) - self.validate_spans(stmt) + self.validate_spans("CREATE") def test_executemany(self): """Should create a child span for executemany""" @@ -94,7 +94,7 @@ def test_executemany(self): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) self._cursor.executemany(stmt, data) - self.validate_spans(stmt) + self.validate_spans("INSERT") def test_callproc(self): """Should create a child span for callproc"""