Skip to content

Commit 039ff4f

Browse files
committed
feat: add optional span creation with OpenTelemetry
1 parent 1e178c5 commit 039ff4f

15 files changed

+864
-89
lines changed

docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ API Documentation
2323

2424
api-reference
2525
advanced-session-pool-topics
26+
opentelemetry-tracing
2627

2728
Changelog
2829
---------

docs/opentelemetry-tracing.rst

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
Tracing with OpenTelemetry
2+
==================================
3+
Python-spanner uses `OpenTelemetry <https://opentelemetry.io/>`_ to automatically generates traces providing insight on calls to Cloud Spanner.
4+
For information on the benefits and utility of tracing, see the `Cloud Trace docs <https://cloud.google.com/trace/docs/overview>`_.
5+
6+
To take advantage of these traces, we first need to install opentelemetry:
7+
8+
.. code-block:: sh
9+
10+
pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation
11+
12+
We also need to tell OpenTelemetry which exporter to use. For example, to export python-spanner traces to `Cloud Tracing <https://cloud.google.com/trace>`_, add the following lines to your application:
13+
14+
.. code:: python
15+
16+
from opentelemetry import trace
17+
from opentelemetry.sdk.trace import TracerProvider
18+
from opentelemetry.trace.sampling import ProbabilitySampler
19+
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
20+
# BatchExportSpanProcessor exports spans to Cloud Trace
21+
# in a seperate thread to not block on the main thread
22+
from opentelemetry.sdk.trace.export import BatchExportSpanProcessor
23+
24+
# create and export one trace every 1000 requests
25+
sampler = ProbabilitySampler(1/1000)
26+
# Uses the default tracer provider
27+
trace.set_tracer_provider(TracerProvider(sampler=sampler))
28+
trace.get_tracer_provider().add_span_processor(
29+
# initialize the cloud tracing exporter
30+
BatchExportSpanProcessor(CloudTraceSpanExporter())
31+
)
32+
33+
Generated spanner traces should now be available on `Cloud Trace <https://console.cloud.google.com/traces>`_.
34+
35+
Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request.
36+
For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs <https://opentelemetry-python.readthedocs.io/en/stable/>`_
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Copyright 2016 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Manages OpenTelemetry trace creation and handling"""
16+
17+
from contextlib import contextmanager
18+
19+
from google.api_core.exceptions import GoogleAPICallError
20+
from google.cloud.spanner_v1.gapic import spanner_client
21+
22+
try:
23+
from opentelemetry import trace
24+
from opentelemetry.trace.status import Status, StatusCanonicalCode
25+
from opentelemetry.instrumentation.utils import http_status_to_canonical_code
26+
27+
HAS_OPENTELEMETRY_INSTALLED = True
28+
except (ImportError, ModuleNotFoundError):
29+
HAS_OPENTELEMETRY_INSTALLED = False
30+
31+
32+
@contextmanager
33+
def trace_call(name, session, extra_attributes=None):
34+
if not HAS_OPENTELEMETRY_INSTALLED:
35+
# empty context manager. users will have to check if the generated value is None or a span
36+
yield None
37+
return
38+
39+
tracer = trace.get_tracer(__name__)
40+
41+
# base attributes that we know for every trace created
42+
attributes = {
43+
"db.type": "spanner",
44+
"db.url": spanner_client.SpannerClient.SERVICE_ADDRESS,
45+
"db.instance": session._database.name,
46+
"net.host.name": spanner_client.SpannerClient.SERVICE_ADDRESS,
47+
}
48+
49+
if extra_attributes:
50+
attributes.update(extra_attributes)
51+
52+
with tracer.start_as_current_span(
53+
name, kind=trace.SpanKind.CLIENT, attributes=attributes
54+
) as span:
55+
try:
56+
yield span
57+
except GoogleAPICallError as error:
58+
if error.code is not None:
59+
span.set_status(Status(http_status_to_canonical_code(error.code)))
60+
elif error.grpc_status_code is not None:
61+
span.set_status(
62+
# OpenTelemetry's StatusCanonicalCode maps 1-1 with grpc status codes
63+
Status(StatusCanonicalCode(error.grpc_status_code.value[0]))
64+
)
65+
raise

google/cloud/spanner_v1/batch.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from google.cloud.spanner_v1._helpers import _SessionWrapper
2323
from google.cloud.spanner_v1._helpers import _make_list_value_pbs
2424
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
25+
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
2526

2627
# pylint: enable=ungrouped-imports
2728

@@ -147,12 +148,14 @@ def commit(self):
147148
api = database.spanner_api
148149
metadata = _metadata_with_prefix(database.name)
149150
txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite())
150-
response = api.commit(
151-
self._session.name,
152-
mutations=self._mutations,
153-
single_use_transaction=txn_options,
154-
metadata=metadata,
155-
)
151+
trace_attributes = {"num_mutations": len(self._mutations)}
152+
with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
153+
response = api.commit(
154+
self._session.name,
155+
mutations=self._mutations,
156+
single_use_transaction=txn_options,
157+
metadata=metadata,
158+
)
156159
self.committed = _pb_timestamp_to_datetime(response.commit_timestamp)
157160
return self.committed
158161

google/cloud/spanner_v1/session.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from google.cloud.spanner_v1.batch import Batch
2727
from google.cloud.spanner_v1.snapshot import Snapshot
2828
from google.cloud.spanner_v1.transaction import Transaction
29+
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
2930
import random
3031

3132
# pylint: enable=ungrouped-imports
@@ -114,7 +115,11 @@ def create(self):
114115
kw = {}
115116
if self._labels:
116117
kw = {"session": {"labels": self._labels}}
117-
session_pb = api.create_session(self._database.name, metadata=metadata, **kw)
118+
119+
with trace_call("CloudSpanner.CreateSession", self, self._labels):
120+
session_pb = api.create_session(
121+
self._database.name, metadata=metadata, **kw
122+
)
118123
self._session_id = session_pb.name.split("/")[-1]
119124

120125
def exists(self):
@@ -130,10 +135,14 @@ def exists(self):
130135
return False
131136
api = self._database.spanner_api
132137
metadata = _metadata_with_prefix(self._database.name)
133-
try:
134-
api.get_session(self.name, metadata=metadata)
135-
except NotFound:
136-
return False
138+
139+
with trace_call("CloudSpanner.GetSession", self) as span:
140+
try:
141+
api.get_session(self.name, metadata=metadata)
142+
span.set_attribute("session_found", True)
143+
except NotFound:
144+
span.set_attribute("session_found", False)
145+
return False
137146

138147
return True
139148

@@ -150,8 +159,8 @@ def delete(self):
150159
raise ValueError("Session ID not set by back-end")
151160
api = self._database.spanner_api
152161
metadata = _metadata_with_prefix(self._database.name)
153-
154-
api.delete_session(self.name, metadata=metadata)
162+
with trace_call("CloudSpanner.DeleteSession", self):
163+
api.delete_session(self.name, metadata=metadata)
155164

156165
def ping(self):
157166
"""Ping the session to keep it alive by executing "SELECT 1".

google/cloud/spanner_v1/snapshot.py

Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,22 @@
3030
from google.cloud.spanner_v1._helpers import _SessionWrapper
3131
from google.cloud.spanner_v1.streamed import StreamedResultSet
3232
from google.cloud.spanner_v1.types import PartitionOptions
33+
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
3334

3435

35-
def _restart_on_unavailable(restart):
36+
def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None):
3637
"""Restart iteration after :exc:`.ServiceUnavailable`.
3738
3839
:type restart: callable
3940
:param restart: curried function returning iterator
4041
"""
4142
resume_token = b""
4243
item_buffer = []
43-
iterator = restart()
44+
if trace_name and session:
45+
with trace_call(trace_name, session, attributes):
46+
iterator = restart()
47+
else:
48+
iterator = restart()
4449
while True:
4550
try:
4651
for item in iterator:
@@ -50,7 +55,11 @@ def _restart_on_unavailable(restart):
5055
break
5156
except ServiceUnavailable:
5257
del item_buffer[:]
53-
iterator = restart(resume_token=resume_token)
58+
if trace_name and session:
59+
with trace_call(trace_name, session, attributes):
60+
iterator = restart(resume_token=resume_token)
61+
else:
62+
iterator = restart(resume_token=resume_token)
5463
continue
5564

5665
if len(item_buffer) == 0:
@@ -143,7 +152,10 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
143152
metadata=metadata,
144153
)
145154

146-
iterator = _restart_on_unavailable(restart)
155+
trace_attributes = {"table_id": table, "columns": columns}
156+
iterator = _restart_on_unavailable(
157+
restart, "CloudSpanner.ReadOnlyTransaction", self._session, trace_attributes
158+
)
147159

148160
self._read_request_count += 1
149161

@@ -243,7 +255,13 @@ def execute_sql(
243255
timeout=timeout,
244256
)
245257

246-
iterator = _restart_on_unavailable(restart)
258+
trace_attributes = {"db.statement": sql}
259+
iterator = _restart_on_unavailable(
260+
restart,
261+
"CloudSpanner.ReadWriteTransaction",
262+
self._session,
263+
trace_attributes,
264+
)
247265

248266
self._read_request_count += 1
249267
self._execute_sql_count += 1
@@ -309,16 +327,20 @@ def partition_read(
309327
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
310328
)
311329

312-
response = api.partition_read(
313-
session=self._session.name,
314-
table=table,
315-
columns=columns,
316-
key_set=keyset._to_pb(),
317-
transaction=transaction,
318-
index=index,
319-
partition_options=partition_options,
320-
metadata=metadata,
321-
)
330+
trace_attributes = {"table_id": table, "columns": columns}
331+
with trace_call(
332+
"CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes
333+
):
334+
response = api.partition_read(
335+
session=self._session.name,
336+
table=table,
337+
columns=columns,
338+
key_set=keyset._to_pb(),
339+
transaction=transaction,
340+
index=index,
341+
partition_options=partition_options,
342+
metadata=metadata,
343+
)
322344

323345
return [partition.partition_token for partition in response.partitions]
324346

@@ -385,15 +407,21 @@ def partition_query(
385407
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
386408
)
387409

388-
response = api.partition_query(
389-
session=self._session.name,
390-
sql=sql,
391-
transaction=transaction,
392-
params=params_pb,
393-
param_types=param_types,
394-
partition_options=partition_options,
395-
metadata=metadata,
396-
)
410+
trace_attributes = {"db.statement": sql}
411+
with trace_call(
412+
"CloudSpanner.PartitionReadWriteTransaction",
413+
self._session,
414+
trace_attributes,
415+
):
416+
response = api.partition_query(
417+
session=self._session.name,
418+
sql=sql,
419+
transaction=transaction,
420+
params=params_pb,
421+
param_types=param_types,
422+
partition_options=partition_options,
423+
metadata=metadata,
424+
)
397425

398426
return [partition.partition_token for partition in response.partitions]
399427

@@ -515,8 +543,9 @@ def begin(self):
515543
api = database.spanner_api
516544
metadata = _metadata_with_prefix(database.name)
517545
txn_selector = self._make_txn_selector()
518-
response = api.begin_transaction(
519-
self._session.name, txn_selector.begin, metadata=metadata
520-
)
546+
with trace_call("CloudSpanner.BeginTransaction", self._session):
547+
response = api.begin_transaction(
548+
self._session.name, txn_selector.begin, metadata=metadata
549+
)
521550
self._transaction_id = response.id
522551
return self._transaction_id

0 commit comments

Comments
 (0)