Skip to content

Commit 2f68804

Browse files
author
sengjea
committed
fix asynchonous unary call traces
1 parent 5b125b1 commit 2f68804

File tree

3 files changed

+82
-71
lines changed

3 files changed

+82
-71
lines changed

Diff for: instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py

+60-71
Original file line numberDiff line numberDiff line change
@@ -33,27 +33,6 @@
3333
from opentelemetry.trace.status import Status, StatusCode
3434

3535

36-
class _GuardedSpan:
37-
def __init__(self, span):
38-
self.span = span
39-
self.generated_span = None
40-
self._engaged = True
41-
42-
def __enter__(self):
43-
self.generated_span = self.span.__enter__()
44-
return self
45-
46-
def __exit__(self, *args, **kwargs):
47-
if self._engaged:
48-
self.generated_span = None
49-
return self.span.__exit__(*args, **kwargs)
50-
return False
51-
52-
def release(self):
53-
self._engaged = False
54-
return self.span
55-
56-
5736
class _CarrierSetter(Setter):
5837
"""We use a custom setter in order to be able to lower case
5938
keys as is required by grpc.
@@ -68,7 +47,7 @@ def set(self, carrier: MutableMapping[str, str], key: str, value: str):
6847

6948
def _make_future_done_callback(span, rpc_info):
7049
def callback(response_future):
71-
with span:
50+
with trace.use_span(span, end_on_exit=True):
7251
code = response_future.code()
7352
if code != grpc.StatusCode.OK:
7453
rpc_info.error = code
@@ -94,17 +73,17 @@ def _start_span(self, method):
9473
SpanAttributes.RPC_SERVICE: service,
9574
}
9675

97-
return self._tracer.start_as_current_span(
98-
name=method, kind=trace.SpanKind.CLIENT, attributes=attributes
76+
return self._tracer.start_span(
77+
name=method, kind=trace.SpanKind.CLIENT, attributes=attributes,
9978
)
10079

10180
# pylint:disable=no-self-use
102-
def _trace_result(self, guarded_span, rpc_info, result):
81+
def _trace_result(self, span, rpc_info, result):
10382
# If the RPC is called asynchronously, release the guard and add a
10483
# callback so that the span can be finished once the future is done.
10584
if isinstance(result, grpc.Future):
10685
result.add_done_callback(
107-
_make_future_done_callback(guarded_span.release(), rpc_info)
86+
_make_future_done_callback(span, rpc_info)
10887
)
10988
return result
11089
response = result
@@ -115,41 +94,43 @@ def _trace_result(self, guarded_span, rpc_info, result):
11594
if isinstance(result, tuple):
11695
response = result[0]
11796
rpc_info.response = response
118-
97+
span.end()
11998
return result
12099

121-
def _start_guarded_span(self, *args, **kwargs):
122-
return _GuardedSpan(self._start_span(*args, **kwargs))
123-
124100
def intercept_unary(self, request, metadata, client_info, invoker):
125101
if not metadata:
126102
mutable_metadata = OrderedDict()
127103
else:
128104
mutable_metadata = OrderedDict(metadata)
129-
130-
with self._start_guarded_span(client_info.full_method) as guarded_span:
131-
inject(mutable_metadata, setter=_carrier_setter)
132-
metadata = tuple(mutable_metadata.items())
133-
134-
rpc_info = RpcInfo(
135-
full_method=client_info.full_method,
136-
metadata=metadata,
137-
timeout=client_info.timeout,
138-
request=request,
139-
)
140-
105+
span = self._start_span(client_info.full_method)
106+
with trace.use_span(span, record_exception=False, set_status_on_exception=False):
141107
try:
142-
result = invoker(request, metadata)
143-
except grpc.RpcError as err:
144-
guarded_span.generated_span.set_status(
145-
Status(StatusCode.ERROR)
108+
inject(mutable_metadata, setter=_carrier_setter)
109+
metadata = tuple(mutable_metadata.items())
110+
111+
rpc_info = RpcInfo(
112+
full_method=client_info.full_method,
113+
metadata=metadata,
114+
timeout=client_info.timeout,
115+
request=request,
146116
)
147-
guarded_span.generated_span.set_attribute(
148-
SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0]
149-
)
150-
raise err
151117

152-
return self._trace_result(guarded_span, rpc_info, result)
118+
result = invoker(request, metadata)
119+
except Exception as exc:
120+
if isinstance(exc, grpc.RpcError):
121+
span.set_attribute(
122+
SpanAttributes.RPC_GRPC_STATUS_CODE, exc.code().value[0]
123+
)
124+
span.set_status(
125+
Status(
126+
status_code=StatusCode.ERROR,
127+
description="{}: {}".format(type(exc).__name__, exc),
128+
)
129+
)
130+
span.record_exception(exc)
131+
span.end()
132+
raise exc
133+
return self._trace_result(span, rpc_info, result)
153134

154135
# For RPCs that stream responses, the result can be a generator. To record
155136
# the span across the generated responses and detect any errors, we wrap
@@ -162,7 +143,8 @@ def _intercept_server_stream(
162143
else:
163144
mutable_metadata = OrderedDict(metadata)
164145

165-
with self._start_span(client_info.full_method) as span:
146+
span = self._start_span(client_info.full_method)
147+
with trace.use_span(span, end_on_exit=True):
166148
inject(mutable_metadata, setter=_carrier_setter)
167149
metadata = tuple(mutable_metadata.items())
168150
rpc_info = RpcInfo(
@@ -199,27 +181,34 @@ def intercept_stream(
199181
else:
200182
mutable_metadata = OrderedDict(metadata)
201183

202-
with self._start_guarded_span(client_info.full_method) as guarded_span:
203-
inject(mutable_metadata, setter=_carrier_setter)
204-
metadata = tuple(mutable_metadata.items())
205-
rpc_info = RpcInfo(
206-
full_method=client_info.full_method,
207-
metadata=metadata,
208-
timeout=client_info.timeout,
209-
request=request_or_iterator,
210-
)
184+
span = self._start_span(client_info.full_method)
185+
with trace.use_span(span, record_exception=False, set_status_on_exception=False):
186+
try:
187+
inject(mutable_metadata, setter=_carrier_setter)
188+
metadata = tuple(mutable_metadata.items())
189+
rpc_info = RpcInfo(
190+
full_method=client_info.full_method,
191+
metadata=metadata,
192+
timeout=client_info.timeout,
193+
request=request_or_iterator,
194+
)
211195

212-
rpc_info.request = request_or_iterator
196+
rpc_info.request = request_or_iterator
213197

214-
try:
215198
result = invoker(request_or_iterator, metadata)
216-
except grpc.RpcError as err:
217-
guarded_span.generated_span.set_status(
218-
Status(StatusCode.ERROR)
199+
except Exception as exc:
200+
if isinstance(exc, grpc.RpcError):
201+
span.set_attribute(
202+
SpanAttributes.RPC_GRPC_STATUS_CODE, exc.code().value[0]
203+
)
204+
span.set_status(
205+
Status(
206+
status_code=StatusCode.ERROR,
207+
description="{}: {}".format(type(exc).__name__, exc),
208+
)
219209
)
220-
guarded_span.generated_span.set_attribute(
221-
SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0],
222-
)
223-
raise err
210+
span.record_exception(exc)
211+
span.end()
212+
raise exc
224213

225-
return self._trace_result(guarded_span, rpc_info, result)
214+
return self._trace_result(span, rpc_info, result)

Diff for: instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ def simple_method(stub, error=False):
2424
stub.SimpleMethod(request)
2525

2626

27+
def simple_method_future(stub, error=False):
28+
request = Request(
29+
client_id=CLIENT_ID, request_data="error" if error else "data"
30+
)
31+
return stub.SimpleMethod.future(request)
32+
33+
2734
def client_streaming_method(stub, error=False):
2835
# create a generator
2936
def request_messages():

Diff for: instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py

+15
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
client_streaming_method,
3737
server_streaming_method,
3838
simple_method,
39+
simple_method_future,
3940
)
4041
from ._server import create_test_server
4142
from .protobuf.test_server_pb2 import Request
@@ -100,6 +101,20 @@ def tearDown(self):
100101
self.server.stop(None)
101102
self.channel.close()
102103

104+
def test_unary_unary_future(self):
105+
simple_method_future(self._stub).result()
106+
spans = self.memory_exporter.get_finished_spans()
107+
self.assertEqual(len(spans), 1)
108+
span = spans[0]
109+
110+
self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod")
111+
self.assertIs(span.kind, trace.SpanKind.CLIENT)
112+
113+
# Check version and name in span's instrumentation info
114+
self.check_span_instrumentation_info(
115+
span, opentelemetry.instrumentation.grpc
116+
)
117+
103118
def test_unary_unary(self):
104119
simple_method(self._stub)
105120
spans = self.memory_exporter.get_finished_spans()

0 commit comments

Comments
 (0)