Skip to content

Commit 4069c37

Browse files
cnnradamslarkee
andauthored
feat: add OpenTelemetry tracing to spanner calls (#107)
* feat: add optional span creation with OpenTelemetry * bring back support for python2.7 * address comments * fix 2.7 tests * nit fixes * db.statement join with ; * Update docs/opentelemetry-tracing.rst Co-authored-by: larkee <[email protected]>
1 parent 0cade34 commit 4069c37

15 files changed

+910
-92
lines changed

docs/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ API Documentation
2323

2424
api-reference
2525
advanced-session-pool-topics
26+
opentelemetry-tracing
2627

2728
Changelog
2829
---------

docs/opentelemetry-tracing.rst

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
Tracing with OpenTelemetry
2+
==================================
3+
This library uses `OpenTelemetry <https://opentelemetry.io/>`_ to automatically generate traces providing insight on calls to Cloud Spanner.
4+
For information on the benefits and utility of tracing, see the `Cloud Trace docs <https://cloud.google.com/trace/docs/overview>`_.
5+
6+
To take advantage of these traces, we first need to install opentelemetry:
7+
8+
.. code-block:: sh
9+
10+
pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation
11+
12+
We also need to tell OpenTelemetry which exporter to use. For example, to export python-spanner traces to `Cloud Tracing <https://cloud.google.com/trace>`_, add the following lines to your application:
13+
14+
.. code:: python
15+
16+
from opentelemetry import trace
17+
from opentelemetry.sdk.trace import TracerProvider
18+
from opentelemetry.trace.sampling import ProbabilitySampler
19+
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
20+
# BatchExportSpanProcessor exports spans to Cloud Trace
21+
# in a seperate thread to not block on the main thread
22+
from opentelemetry.sdk.trace.export import BatchExportSpanProcessor
23+
24+
# Create and export one trace every 1000 requests
25+
sampler = ProbabilitySampler(1/1000)
26+
# Use the default tracer provider
27+
trace.set_tracer_provider(TracerProvider(sampler=sampler))
28+
trace.get_tracer_provider().add_span_processor(
29+
# Initialize the cloud tracing exporter
30+
BatchExportSpanProcessor(CloudTraceSpanExporter())
31+
)
32+
33+
Generated spanner traces should now be available on `Cloud Trace <https://console.cloud.google.com/traces>`_.
34+
35+
Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request.
36+
For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs <https://opentelemetry-python.readthedocs.io/en/stable/>`_
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Copyright 2020 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Manages OpenTelemetry trace creation and handling"""
16+
17+
from contextlib import contextmanager
18+
19+
from google.api_core.exceptions import GoogleAPICallError
20+
from google.cloud.spanner_v1.gapic import spanner_client
21+
22+
try:
23+
from opentelemetry import trace
24+
from opentelemetry.trace.status import Status, StatusCanonicalCode
25+
from opentelemetry.instrumentation.utils import http_status_to_canonical_code
26+
27+
HAS_OPENTELEMETRY_INSTALLED = True
28+
except ImportError:
29+
HAS_OPENTELEMETRY_INSTALLED = False
30+
31+
32+
@contextmanager
33+
def trace_call(name, session, extra_attributes=None):
34+
if not HAS_OPENTELEMETRY_INSTALLED or not session:
35+
# Empty context manager. Users will have to check if the generated value is None or a span
36+
yield None
37+
return
38+
39+
tracer = trace.get_tracer(__name__)
40+
41+
# Set base attributes that we know for every trace created
42+
attributes = {
43+
"db.type": "spanner",
44+
"db.url": spanner_client.SpannerClient.SERVICE_ADDRESS,
45+
"db.instance": session._database.name,
46+
"net.host.name": spanner_client.SpannerClient.SERVICE_ADDRESS,
47+
}
48+
49+
if extra_attributes:
50+
attributes.update(extra_attributes)
51+
52+
with tracer.start_as_current_span(
53+
name, kind=trace.SpanKind.CLIENT, attributes=attributes
54+
) as span:
55+
try:
56+
yield span
57+
except GoogleAPICallError as error:
58+
if error.code is not None:
59+
span.set_status(Status(http_status_to_canonical_code(error.code)))
60+
elif error.grpc_status_code is not None:
61+
span.set_status(
62+
# OpenTelemetry's StatusCanonicalCode maps 1-1 with grpc status codes
63+
Status(StatusCanonicalCode(error.grpc_status_code.value[0]))
64+
)
65+
raise

google/cloud/spanner_v1/batch.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from google.cloud.spanner_v1._helpers import _SessionWrapper
2323
from google.cloud.spanner_v1._helpers import _make_list_value_pbs
2424
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
25+
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
2526

2627
# pylint: enable=ungrouped-imports
2728

@@ -147,12 +148,14 @@ def commit(self):
147148
api = database.spanner_api
148149
metadata = _metadata_with_prefix(database.name)
149150
txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite())
150-
response = api.commit(
151-
self._session.name,
152-
mutations=self._mutations,
153-
single_use_transaction=txn_options,
154-
metadata=metadata,
155-
)
151+
trace_attributes = {"num_mutations": len(self._mutations)}
152+
with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
153+
response = api.commit(
154+
self._session.name,
155+
mutations=self._mutations,
156+
single_use_transaction=txn_options,
157+
metadata=metadata,
158+
)
156159
self.committed = _pb_timestamp_to_datetime(response.commit_timestamp)
157160
return self.committed
158161

google/cloud/spanner_v1/session.py

+18-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from google.cloud.spanner_v1.batch import Batch
2727
from google.cloud.spanner_v1.snapshot import Snapshot
2828
from google.cloud.spanner_v1.transaction import Transaction
29+
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
2930
import random
3031

3132
# pylint: enable=ungrouped-imports
@@ -114,7 +115,11 @@ def create(self):
114115
kw = {}
115116
if self._labels:
116117
kw = {"session": {"labels": self._labels}}
117-
session_pb = api.create_session(self._database.name, metadata=metadata, **kw)
118+
119+
with trace_call("CloudSpanner.CreateSession", self, self._labels):
120+
session_pb = api.create_session(
121+
self._database.name, metadata=metadata, **kw
122+
)
118123
self._session_id = session_pb.name.split("/")[-1]
119124

120125
def exists(self):
@@ -130,10 +135,16 @@ def exists(self):
130135
return False
131136
api = self._database.spanner_api
132137
metadata = _metadata_with_prefix(self._database.name)
133-
try:
134-
api.get_session(self.name, metadata=metadata)
135-
except NotFound:
136-
return False
138+
139+
with trace_call("CloudSpanner.GetSession", self) as span:
140+
try:
141+
api.get_session(self.name, metadata=metadata)
142+
if span:
143+
span.set_attribute("session_found", True)
144+
except NotFound:
145+
if span:
146+
span.set_attribute("session_found", False)
147+
return False
137148

138149
return True
139150

@@ -150,8 +161,8 @@ def delete(self):
150161
raise ValueError("Session ID not set by back-end")
151162
api = self._database.spanner_api
152163
metadata = _metadata_with_prefix(self._database.name)
153-
154-
api.delete_session(self.name, metadata=metadata)
164+
with trace_call("CloudSpanner.DeleteSession", self):
165+
api.delete_session(self.name, metadata=metadata)
155166

156167
def ping(self):
157168
"""Ping the session to keep it alive by executing "SELECT 1".

google/cloud/spanner_v1/snapshot.py

+50-27
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,19 @@
3030
from google.cloud.spanner_v1._helpers import _SessionWrapper
3131
from google.cloud.spanner_v1.streamed import StreamedResultSet
3232
from google.cloud.spanner_v1.types import PartitionOptions
33+
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
3334

3435

35-
def _restart_on_unavailable(restart):
36+
def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None):
3637
"""Restart iteration after :exc:`.ServiceUnavailable`.
3738
3839
:type restart: callable
3940
:param restart: curried function returning iterator
4041
"""
4142
resume_token = b""
4243
item_buffer = []
43-
iterator = restart()
44+
with trace_call(trace_name, session, attributes):
45+
iterator = restart()
4446
while True:
4547
try:
4648
for item in iterator:
@@ -50,7 +52,8 @@ def _restart_on_unavailable(restart):
5052
break
5153
except ServiceUnavailable:
5254
del item_buffer[:]
53-
iterator = restart(resume_token=resume_token)
55+
with trace_call(trace_name, session, attributes):
56+
iterator = restart(resume_token=resume_token)
5457
continue
5558

5659
if len(item_buffer) == 0:
@@ -143,7 +146,10 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
143146
metadata=metadata,
144147
)
145148

146-
iterator = _restart_on_unavailable(restart)
149+
trace_attributes = {"table_id": table, "columns": columns}
150+
iterator = _restart_on_unavailable(
151+
restart, "CloudSpanner.ReadOnlyTransaction", self._session, trace_attributes
152+
)
147153

148154
self._read_request_count += 1
149155

@@ -243,7 +249,13 @@ def execute_sql(
243249
timeout=timeout,
244250
)
245251

246-
iterator = _restart_on_unavailable(restart)
252+
trace_attributes = {"db.statement": sql}
253+
iterator = _restart_on_unavailable(
254+
restart,
255+
"CloudSpanner.ReadWriteTransaction",
256+
self._session,
257+
trace_attributes,
258+
)
247259

248260
self._read_request_count += 1
249261
self._execute_sql_count += 1
@@ -309,16 +321,20 @@ def partition_read(
309321
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
310322
)
311323

312-
response = api.partition_read(
313-
session=self._session.name,
314-
table=table,
315-
columns=columns,
316-
key_set=keyset._to_pb(),
317-
transaction=transaction,
318-
index=index,
319-
partition_options=partition_options,
320-
metadata=metadata,
321-
)
324+
trace_attributes = {"table_id": table, "columns": columns}
325+
with trace_call(
326+
"CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes
327+
):
328+
response = api.partition_read(
329+
session=self._session.name,
330+
table=table,
331+
columns=columns,
332+
key_set=keyset._to_pb(),
333+
transaction=transaction,
334+
index=index,
335+
partition_options=partition_options,
336+
metadata=metadata,
337+
)
322338

323339
return [partition.partition_token for partition in response.partitions]
324340

@@ -385,15 +401,21 @@ def partition_query(
385401
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
386402
)
387403

388-
response = api.partition_query(
389-
session=self._session.name,
390-
sql=sql,
391-
transaction=transaction,
392-
params=params_pb,
393-
param_types=param_types,
394-
partition_options=partition_options,
395-
metadata=metadata,
396-
)
404+
trace_attributes = {"db.statement": sql}
405+
with trace_call(
406+
"CloudSpanner.PartitionReadWriteTransaction",
407+
self._session,
408+
trace_attributes,
409+
):
410+
response = api.partition_query(
411+
session=self._session.name,
412+
sql=sql,
413+
transaction=transaction,
414+
params=params_pb,
415+
param_types=param_types,
416+
partition_options=partition_options,
417+
metadata=metadata,
418+
)
397419

398420
return [partition.partition_token for partition in response.partitions]
399421

@@ -515,8 +537,9 @@ def begin(self):
515537
api = database.spanner_api
516538
metadata = _metadata_with_prefix(database.name)
517539
txn_selector = self._make_txn_selector()
518-
response = api.begin_transaction(
519-
self._session.name, txn_selector.begin, metadata=metadata
520-
)
540+
with trace_call("CloudSpanner.BeginTransaction", self._session):
541+
response = api.begin_transaction(
542+
self._session.name, txn_selector.begin, metadata=metadata
543+
)
521544
self._transaction_id = response.id
522545
return self._transaction_id

0 commit comments

Comments
 (0)