From b3234e86286b9ca6a1324edab88239843c9cb0b2 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sun, 15 Nov 2020 12:14:57 +0530 Subject: [PATCH 1/8] Update asyncpg to follow semantic conventions --- .../instrumentation/asyncpg/__init__.py | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py index 2f4ecaf3af..1e14bb2dd4 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py @@ -49,11 +49,24 @@ def _hydrate_span_from_args(connection, query, parameters) -> dict: - span_attributes = {"db.type": "sql"} + span_attributes = {"db.system": "postgresql"} params = getattr(connection, "_params", None) - span_attributes["db.instance"] = getattr(params, "database", None) - span_attributes["db.user"] = getattr(params, "user", None) + dbname = getattr(params, "database", None) + if dbname: + span_attributes["db.name"] = dbname + user = getattr(params, "user", None) + if user: + span_attributes["db.user"] = user + + addr = getattr(connection, "_addr", None) + if isinstance(addr, tuple): + span_attributes["net.peer.name"] = addr[0] + span_attributes["net.peer.ip"] = addr[1] + span_attributes["net.transport"] = "IP.TCP" + elif isinstance(addr, str): + span_attributes["net.peer.name"] = addr + span_attributes["net.transport"] = "Unix" if query is not None: span_attributes["db.statement"] = query @@ -105,16 +118,21 @@ async def _do_execute(self, func, instance, args, kwargs): tracer = getattr(asyncpg, _APPLIED) exception = None - - with tracer.start_as_current_span( - "postgresql", kind=SpanKind.CLIENT - ) as span: + span_attributes = _hydrate_span_from_args( + instance, + args[0], + args[1:] if self.capture_parameters else None, + ) + name = "" + if args[0]: + name = args[0] + elif span_attributes.get("db.name"): + name = span_attributes["db.name"] + else: + name = "postgresql" # Does it ever happen? + + with tracer.start_as_current_span(name, kind=SpanKind.CLIENT) as span: if span.is_recording(): - span_attributes = _hydrate_span_from_args( - instance, - args[0], - args[1:] if self.capture_parameters else None, - ) for attribute, value in span_attributes.items(): span.set_attribute(attribute, value) From 326269ed0cc59e2c4cb107f1ea0f029229f6744b Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sun, 15 Nov 2020 12:15:12 +0530 Subject: [PATCH 2/8] Update docker tests --- .../tests/asyncpg/test_asyncpg_functional.py | 182 ++++++------------ 1 file changed, 59 insertions(+), 123 deletions(-) diff --git a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py index 2e37efe224..151f88169c 100644 --- a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py @@ -7,11 +7,11 @@ from opentelemetry.test.test_base import TestBase from opentelemetry.trace.status import StatusCode -POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost") -POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432")) -POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests") -POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword") -POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser") +POSTGRES_HOST = os.getenv("POSTGRESQL_HOST", "localhost") +POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT", "5432")) +POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "opentelemetry-tests") +POSTGRES_PASSWORD = os.getenv("POSTGRESQL_PASSWORD", "testpassword") +POSTGRES_USER = os.getenv("POSTGRESQL_USER", "testuser") def async_call(coro): @@ -41,34 +41,27 @@ def setUpClass(cls): def tearDownClass(cls): AsyncPGInstrumentor().uninstrument() + def check_span(self, span): + self.assertEqual(span.attributes["db.system"], "postgresql") + self.assertEqual(span.attributes["db.name"], POSTGRES_DB_NAME) + self.assertEqual(span.attributes["db.user"], POSTGRES_USER) + self.assertEqual(span.attributes["net.peer.name"], POSTGRES_HOST) + self.assertEqual(span.attributes["net.peer.ip"], POSTGRES_PORT) + def test_instrumented_execute_method_without_arguments(self, *_, **__): async_call(self._connection.execute("SELECT 42;")) spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) self.assertIs(StatusCode.UNSET, spans[0].status.status_code) - self.assertEqual( - spans[0].attributes, - { - "db.type": "sql", - "db.user": POSTGRES_USER, - "db.instance": POSTGRES_DB_NAME, - "db.statement": "SELECT 42;", - }, - ) + self.check_span(spans[0]) + self.assertEqual(spans[0].attributes["db.statement"], "SELECT 42;") def test_instrumented_fetch_method_without_arguments(self, *_, **__): async_call(self._connection.fetch("SELECT 42;")) spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) - self.assertEqual( - spans[0].attributes, - { - "db.type": "sql", - "db.user": POSTGRES_USER, - "db.instance": POSTGRES_DB_NAME, - "db.statement": "SELECT 42;", - }, - ) + self.check_span(spans[0]) + self.assertEqual(spans[0].attributes["db.statement"], "SELECT 42;") def test_instrumented_transaction_method(self, *_, **__): async def _transaction_execute(): @@ -79,35 +72,16 @@ async def _transaction_execute(): spans = self.memory_exporter.get_finished_spans() self.assertEqual(3, len(spans)) - self.assertEqual( - { - "db.instance": POSTGRES_DB_NAME, - "db.user": POSTGRES_USER, - "db.type": "sql", - "db.statement": "BEGIN;", - }, - spans[0].attributes, - ) + self.check_span(spans[0]) + self.assertEqual(spans[0].attributes["db.statement"], "BEGIN;") self.assertIs(StatusCode.UNSET, spans[0].status.status_code) - self.assertEqual( - { - "db.instance": POSTGRES_DB_NAME, - "db.user": POSTGRES_USER, - "db.type": "sql", - "db.statement": "SELECT 42;", - }, - spans[1].attributes, - ) + + self.check_span(spans[1]) + self.assertEqual(spans[1].attributes["db.statement"], "SELECT 42;") self.assertIs(StatusCode.UNSET, spans[1].status.status_code) - self.assertEqual( - { - "db.instance": POSTGRES_DB_NAME, - "db.user": POSTGRES_USER, - "db.type": "sql", - "db.statement": "COMMIT;", - }, - spans[2].attributes, - ) + + self.check_span(spans[2]) + self.assertEqual(spans[2].attributes["db.statement"], "COMMIT;") self.assertIs(StatusCode.UNSET, spans[2].status.status_code) def test_instrumented_failed_transaction_method(self, *_, **__): @@ -120,37 +94,19 @@ async def _transaction_execute(): spans = self.memory_exporter.get_finished_spans() self.assertEqual(3, len(spans)) - self.assertEqual( - { - "db.instance": POSTGRES_DB_NAME, - "db.user": POSTGRES_USER, - "db.type": "sql", - "db.statement": "BEGIN;", - }, - spans[0].attributes, - ) + + self.check_span(spans[0]) + self.assertEqual(spans[0].attributes["db.statement"], "BEGIN;") self.assertIs(StatusCode.UNSET, spans[0].status.status_code) + + self.check_span(spans[1]) self.assertEqual( - { - "db.instance": POSTGRES_DB_NAME, - "db.user": POSTGRES_USER, - "db.type": "sql", - "db.statement": "SELECT 42::uuid;", - }, - spans[1].attributes, - ) - self.assertEqual( - StatusCode.ERROR, spans[1].status.status_code, - ) - self.assertEqual( - { - "db.instance": POSTGRES_DB_NAME, - "db.user": POSTGRES_USER, - "db.type": "sql", - "db.statement": "ROLLBACK;", - }, - spans[2].attributes, + spans[1].attributes["db.statement"], "SELECT 42::uuid;" ) + self.assertEqual(StatusCode.ERROR, spans[1].status.status_code) + + self.check_span(spans[2]) + self.assertEqual(spans[2].attributes["db.statement"], "ROLLBACK;") self.assertIs(StatusCode.UNSET, spans[2].status.status_code) def test_instrumented_method_doesnt_capture_parameters(self, *_, **__): @@ -158,19 +114,8 @@ def test_instrumented_method_doesnt_capture_parameters(self, *_, **__): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) self.assertIs(StatusCode.UNSET, spans[0].status.status_code) - self.assertEqual( - spans[0].attributes, - { - "db.type": "sql", - "db.user": POSTGRES_USER, - # This shouldn't be set because we don't capture parameters by - # default - # - # "db.statement.parameters": "('1',)", - "db.instance": POSTGRES_DB_NAME, - "db.statement": "SELECT $1;", - }, - ) + self.check_span(spans[0]) + self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;") class TestFunctionalAsyncPG_CaptureParameters(TestBase): @@ -197,50 +142,45 @@ def setUpClass(cls): def tearDownClass(cls): AsyncPGInstrumentor().uninstrument() + def check_span(self, span): + self.assertEqual(span.attributes["db.system"], "postgresql") + self.assertEqual(span.attributes["db.name"], POSTGRES_DB_NAME) + self.assertEqual(span.attributes["db.user"], POSTGRES_USER) + self.assertEqual(span.attributes["net.peer.name"], POSTGRES_HOST) + self.assertEqual(span.attributes["net.peer.ip"], POSTGRES_PORT) + def test_instrumented_execute_method_with_arguments(self, *_, **__): async_call(self._connection.execute("SELECT $1;", "1")) spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) self.assertIs(StatusCode.UNSET, spans[0].status.status_code) + + self.check_span(spans[0]) + self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;") self.assertEqual( - spans[0].attributes, - { - "db.type": "sql", - "db.user": POSTGRES_USER, - "db.statement.parameters": "('1',)", - "db.instance": POSTGRES_DB_NAME, - "db.statement": "SELECT $1;", - }, + spans[0].attributes["db.statement.parameters"], "('1',)" ) def test_instrumented_fetch_method_with_arguments(self, *_, **__): async_call(self._connection.fetch("SELECT $1;", "1")) spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) + + self.check_span(spans[0]) + self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;") self.assertEqual( - spans[0].attributes, - { - "db.type": "sql", - "db.user": POSTGRES_USER, - "db.statement.parameters": "('1',)", - "db.instance": POSTGRES_DB_NAME, - "db.statement": "SELECT $1;", - }, + spans[0].attributes["db.statement.parameters"], "('1',)" ) def test_instrumented_executemany_method_with_arguments(self, *_, **__): async_call(self._connection.executemany("SELECT $1;", [["1"], ["2"]])) spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) + + self.check_span(spans[0]) + self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;") self.assertEqual( - { - "db.type": "sql", - "db.statement": "SELECT $1;", - "db.statement.parameters": "([['1'], ['2']],)", - "db.user": POSTGRES_USER, - "db.instance": POSTGRES_DB_NAME, - }, - spans[0].attributes, + spans[0].attributes["db.statement.parameters"], "([['1'], ['2']],)" ) def test_instrumented_execute_interface_error_method(self, *_, **__): @@ -248,13 +188,9 @@ def test_instrumented_execute_interface_error_method(self, *_, **__): async_call(self._connection.execute("SELECT 42;", 1, 2, 3)) spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) + + self.check_span(spans[0]) + self.assertEqual(spans[0].attributes["db.statement"], "SELECT 42;") self.assertEqual( - spans[0].attributes, - { - "db.type": "sql", - "db.instance": POSTGRES_DB_NAME, - "db.user": POSTGRES_USER, - "db.statement.parameters": "(1, 2, 3)", - "db.statement": "SELECT 42;", - }, + spans[0].attributes["db.statement.parameters"], "(1, 2, 3)" ) From e6503ce9303694455fa2cd440dc56f55d496f8b9 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sun, 15 Nov 2020 12:28:52 +0530 Subject: [PATCH 3/8] Update asyncpg docker tests --- .../tests/asyncpg/test_asyncpg_functional.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py index 151f88169c..19364d7dfc 100644 --- a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py @@ -54,6 +54,7 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__): self.assertEqual(len(spans), 1) self.assertIs(StatusCode.UNSET, spans[0].status.status_code) self.check_span(spans[0]) + self.assertEqual(spans[0].name, "SELECT 42;") self.assertEqual(spans[0].attributes["db.statement"], "SELECT 42;") def test_instrumented_fetch_method_without_arguments(self, *_, **__): @@ -156,6 +157,7 @@ def test_instrumented_execute_method_with_arguments(self, *_, **__): self.assertIs(StatusCode.UNSET, spans[0].status.status_code) self.check_span(spans[0]) + self.assertEqual(spans[0].name, "SELECT $1;") self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;") self.assertEqual( spans[0].attributes["db.statement.parameters"], "('1',)" From 4816796de05ef8d7de532f414aea3c78d62f60d0 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sun, 15 Nov 2020 12:53:23 +0530 Subject: [PATCH 4/8] Lint fixes --- .../src/opentelemetry/instrumentation/asyncpg/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py index 1e14bb2dd4..2ed452ebd7 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py @@ -119,9 +119,7 @@ async def _do_execute(self, func, instance, args, kwargs): exception = None span_attributes = _hydrate_span_from_args( - instance, - args[0], - args[1:] if self.capture_parameters else None, + instance, args[0], args[1:] if self.capture_parameters else None ) name = "" if args[0]: From 0b8deb46d8b7f5606f427317d767513749cafca8 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 17 Nov 2020 19:01:45 +0530 Subject: [PATCH 5/8] Add CHANGELOG entry --- .../opentelemetry-instrumentation-asyncpg/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-asyncpg/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-asyncpg/CHANGELOG.md index 56f261c9b5..779e9f63b6 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncpg/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-asyncpg/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Update asyncpg instrumentation to follow semantic conventions + ([#188](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/188)) + ## Version 0.12b0 Released 2020-08-14 From dc98748c50c40326bb4a8b457004c7c8e082e08c Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 17 Nov 2020 21:12:19 +0530 Subject: [PATCH 6/8] Review change Don't make function call unnecessarily when span is not recording --- .../instrumentation/asyncpg/__init__.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py index 2ed452ebd7..f54d4ac8ca 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py @@ -118,19 +118,22 @@ async def _do_execute(self, func, instance, args, kwargs): tracer = getattr(asyncpg, _APPLIED) exception = None - span_attributes = _hydrate_span_from_args( - instance, args[0], args[1:] if self.capture_parameters else None - ) + params = getattr(instance, "_params", None) name = "" if args[0]: name = args[0] - elif span_attributes.get("db.name"): - name = span_attributes["db.name"] + elif params and params.get("database"): + name = params.get("database") else: name = "postgresql" # Does it ever happen? with tracer.start_as_current_span(name, kind=SpanKind.CLIENT) as span: if span.is_recording(): + span_attributes = _hydrate_span_from_args( + instance, + args[0], + args[1:] if self.capture_parameters else None, + ) for attribute, value in span_attributes.items(): span.set_attribute(attribute, value) From 5a7615bbf0983f06f8e95ffcba60efa12ef55326 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 24 Nov 2020 03:29:10 +0000 Subject: [PATCH 7/8] Update instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py Co-authored-by: (Eliseo) Nathaniel Ruiz Nowell --- .../opentelemetry/instrumentation/asyncpg/__init__.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py index f54d4ac8ca..198fe199de 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py @@ -118,14 +118,8 @@ async def _do_execute(self, func, instance, args, kwargs): tracer = getattr(asyncpg, _APPLIED) exception = None - params = getattr(instance, "_params", None) - name = "" - if args[0]: - name = args[0] - elif params and params.get("database"): - name = params.get("database") - else: - name = "postgresql" # Does it ever happen? + params = getattr(instance, "_params", {}) + name = args[0] if args[0] else params.get("database", "postgresql") with tracer.start_as_current_span(name, kind=SpanKind.CLIENT) as span: if span.is_recording(): From a46744976beb940ab90b5ed867de13c0ecfd496a Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 24 Nov 2020 11:32:33 +0530 Subject: [PATCH 8/8] Add docstring and comments --- .../src/opentelemetry/instrumentation/asyncpg/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py index 198fe199de..e78224a9ef 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py @@ -49,8 +49,12 @@ def _hydrate_span_from_args(connection, query, parameters) -> dict: + """Get network and database attributes from connection.""" span_attributes = {"db.system": "postgresql"} + # connection contains _params attribute which is a namedtuple ConnectionParameters. + # https://github.com/MagicStack/asyncpg/blob/master/asyncpg/connection.py#L68 + params = getattr(connection, "_params", None) dbname = getattr(params, "database", None) if dbname: @@ -59,6 +63,8 @@ def _hydrate_span_from_args(connection, query, parameters) -> dict: if user: span_attributes["db.user"] = user + # connection contains _addr attribute which is either a host/port tuple, or unix socket string + # https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html addr = getattr(connection, "_addr", None) if isinstance(addr, tuple): span_attributes["net.peer.name"] = addr[0]