Skip to content

Commit 5fc26d3

Browse files
authored
gRPC streaming bugfix (#260)
1 parent 7bd1d90 commit 5fc26d3

File tree

3 files changed

+259
-10
lines changed

3 files changed

+259
-10
lines changed

CHANGELOG.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1616
([#312](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/312))
1717
- `opentelemetry-instrumentation-boto` updated to set span attributes instead of overriding the resource.
1818
([#310](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/310))
19+
- `opentelemetry-instrumentation-grpc` Fix issue tracking child spans in streaming responses
20+
([#260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/260))
21+
- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs
22+
([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269))
1923

2024
## [0.17b0](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.17b0) - 2021-01-20
2125

@@ -80,8 +84,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
8084
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
8185
- Update TraceState to adhere to specs
8286
([#276](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/276))
83-
- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs
84-
([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269))
8587

8688
### Removed
8789
- Remove Configuration

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py

+30
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,15 @@ def intercept_service(self, continuation, handler_call_details):
239239
def telemetry_wrapper(behavior, request_streaming, response_streaming):
240240
def telemetry_interceptor(request_or_iterator, context):
241241

242+
# handle streaming responses specially
243+
if response_streaming:
244+
return self._intercept_server_stream(
245+
behavior,
246+
handler_call_details,
247+
request_or_iterator,
248+
context,
249+
)
250+
242251
with self._set_remote_context(context):
243252
with self._start_span(
244253
handler_call_details, context
@@ -249,6 +258,7 @@ def telemetry_interceptor(request_or_iterator, context):
249258
# And now we run the actual RPC.
250259
try:
251260
return behavior(request_or_iterator, context)
261+
252262
except Exception as error:
253263
# Bare exceptions are likely to be gRPC aborts, which
254264
# we handle in our context wrapper.
@@ -263,3 +273,23 @@ def telemetry_interceptor(request_or_iterator, context):
263273
return _wrap_rpc_behavior(
264274
continuation(handler_call_details), telemetry_wrapper
265275
)
276+
277+
# Handle streaming responses separately - we have to do this
278+
# to return a *new* generator or various upstream things
279+
# get confused, or we'll lose the consistent trace
280+
def _intercept_server_stream(
281+
self, behavior, handler_call_details, request_or_iterator, context
282+
):
283+
284+
with self._set_remote_context(context):
285+
with self._start_span(handler_call_details, context) as span:
286+
context = _OpenTelemetryServicerContext(context, span)
287+
288+
try:
289+
yield from behavior(request_or_iterator, context)
290+
291+
except Exception as error:
292+
# pylint:disable=unidiomatic-typecheck
293+
if type(error) != Exception:
294+
span.record_exception(error)
295+
raise error

instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py

+225-8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@
3030
from opentelemetry.test.test_base import TestBase
3131
from opentelemetry.trace.status import StatusCode
3232

33+
from .protobuf.test_server_pb2 import Request, Response
34+
from .protobuf.test_server_pb2_grpc import (
35+
GRPCTestServerServicer,
36+
add_GRPCTestServerServicer_to_server,
37+
)
38+
3339

3440
class UnaryUnaryMethodHandler(grpc.RpcMethodHandler):
3541
def __init__(self, handler):
@@ -51,6 +57,23 @@ def service(self, handler_call_details):
5157
return UnaryUnaryMethodHandler(self._unary_unary_handler)
5258

5359

60+
class Servicer(GRPCTestServerServicer):
61+
"""Our test servicer"""
62+
63+
# pylint:disable=C0103
64+
def SimpleMethod(self, request, context):
65+
return Response(
66+
server_id=request.client_id, response_data=request.request_data,
67+
)
68+
69+
# pylint:disable=C0103
70+
def ServerStreamingMethod(self, request, context):
71+
for data in ("one", "two", "three"):
72+
yield Response(
73+
server_id=request.client_id, response_data=data,
74+
)
75+
76+
5477
class TestOpenTelemetryServerInterceptor(TestBase):
5578
def test_instrumentor(self):
5679
def handler(request, context):
@@ -134,25 +157,146 @@ def test_create_span(self):
134157
# Intercept gRPC calls...
135158
interceptor = server_interceptor()
136159

137-
# No-op RPC handler
138-
def handler(request, context):
139-
return b""
160+
server = grpc.server(
161+
futures.ThreadPoolExecutor(max_workers=1),
162+
options=(("grpc.so_reuseport", 0),),
163+
interceptors=[interceptor],
164+
)
165+
add_GRPCTestServerServicer_to_server(Servicer(), server)
166+
port = server.add_insecure_port("[::]:0")
167+
channel = grpc.insecure_channel("localhost:{:d}".format(port))
168+
169+
rpc_call = "/GRPCTestServer/SimpleMethod"
170+
request = Request(client_id=1, request_data="test")
171+
msg = request.SerializeToString()
172+
try:
173+
server.start()
174+
channel.unary_unary(rpc_call)(msg)
175+
finally:
176+
server.stop(None)
177+
178+
spans_list = self.memory_exporter.get_finished_spans()
179+
self.assertEqual(len(spans_list), 1)
180+
span = spans_list[0]
181+
182+
self.assertEqual(span.name, rpc_call)
183+
self.assertIs(span.kind, trace.SpanKind.SERVER)
184+
185+
# Check version and name in span's instrumentation info
186+
self.check_span_instrumentation_info(
187+
span, opentelemetry.instrumentation.grpc
188+
)
140189

190+
# Check attributes
191+
self.assert_span_has_attributes(
192+
span,
193+
{
194+
"net.peer.ip": "[::1]",
195+
"net.peer.name": "localhost",
196+
"rpc.method": "SimpleMethod",
197+
"rpc.service": "GRPCTestServer",
198+
"rpc.system": "grpc",
199+
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
200+
},
201+
)
202+
203+
def test_create_two_spans(self):
204+
"""Verify that the interceptor captures sub spans within the given
205+
trace"""
206+
207+
class TwoSpanServicer(GRPCTestServerServicer):
208+
# pylint:disable=C0103
209+
def SimpleMethod(self, request, context):
210+
211+
# create another span
212+
tracer = trace.get_tracer(__name__)
213+
with tracer.start_as_current_span("child") as child:
214+
child.add_event("child event")
215+
216+
return Response(
217+
server_id=request.client_id,
218+
response_data=request.request_data,
219+
)
220+
221+
# Intercept gRPC calls...
222+
interceptor = server_interceptor()
223+
224+
# setup the server
141225
server = grpc.server(
142226
futures.ThreadPoolExecutor(max_workers=1),
143227
options=(("grpc.so_reuseport", 0),),
144228
interceptors=[interceptor],
145229
)
230+
add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server)
231+
port = server.add_insecure_port("[::]:0")
232+
channel = grpc.insecure_channel("localhost:{:d}".format(port))
146233

147-
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
234+
# setup the RPC
235+
rpc_call = "/GRPCTestServer/SimpleMethod"
236+
request = Request(client_id=1, request_data="test")
237+
msg = request.SerializeToString()
238+
try:
239+
server.start()
240+
channel.unary_unary(rpc_call)(msg)
241+
finally:
242+
server.stop(None)
243+
244+
spans_list = self.memory_exporter.get_finished_spans()
245+
self.assertEqual(len(spans_list), 2)
246+
child_span = spans_list[0]
247+
parent_span = spans_list[1]
248+
249+
self.assertEqual(parent_span.name, rpc_call)
250+
self.assertIs(parent_span.kind, trace.SpanKind.SERVER)
251+
252+
# Check version and name in span's instrumentation info
253+
self.check_span_instrumentation_info(
254+
parent_span, opentelemetry.instrumentation.grpc
255+
)
148256

257+
# Check attributes
258+
self.assert_span_has_attributes(
259+
parent_span,
260+
{
261+
"net.peer.ip": "[::1]",
262+
"net.peer.name": "localhost",
263+
"rpc.method": "SimpleMethod",
264+
"rpc.service": "GRPCTestServer",
265+
"rpc.system": "grpc",
266+
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
267+
},
268+
)
269+
270+
# Check the child span
271+
self.assertEqual(child_span.name, "child")
272+
self.assertEqual(
273+
parent_span.context.trace_id, child_span.context.trace_id
274+
)
275+
276+
def test_create_span_streaming(self):
277+
"""Check that the interceptor wraps calls with spans server-side, on a
278+
streaming call."""
279+
280+
# Intercept gRPC calls...
281+
interceptor = server_interceptor()
282+
283+
# setup the server
284+
server = grpc.server(
285+
futures.ThreadPoolExecutor(max_workers=1),
286+
options=(("grpc.so_reuseport", 0),),
287+
interceptors=[interceptor],
288+
)
289+
add_GRPCTestServerServicer_to_server(Servicer(), server)
149290
port = server.add_insecure_port("[::]:0")
150291
channel = grpc.insecure_channel("localhost:{:d}".format(port))
151292

152-
rpc_call = "TestServicer/handler"
293+
# setup the RPC
294+
rpc_call = "/GRPCTestServer/ServerStreamingMethod"
295+
request = Request(client_id=1, request_data="test")
296+
msg = request.SerializeToString()
153297
try:
154298
server.start()
155-
channel.unary_unary(rpc_call)(b"")
299+
list(channel.unary_stream(rpc_call)(msg))
156300
finally:
157301
server.stop(None)
158302

@@ -174,13 +318,86 @@ def handler(request, context):
174318
{
175319
"net.peer.ip": "[::1]",
176320
"net.peer.name": "localhost",
177-
"rpc.method": "handler",
178-
"rpc.service": "TestServicer",
321+
"rpc.method": "ServerStreamingMethod",
322+
"rpc.service": "GRPCTestServer",
323+
"rpc.system": "grpc",
324+
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
325+
},
326+
)
327+
328+
def test_create_two_spans_streaming(self):
329+
"""Verify that the interceptor captures sub spans in a
330+
streaming call, within the given trace"""
331+
332+
class TwoSpanServicer(GRPCTestServerServicer):
333+
# pylint:disable=C0103
334+
def ServerStreamingMethod(self, request, context):
335+
336+
# create another span
337+
tracer = trace.get_tracer(__name__)
338+
with tracer.start_as_current_span("child") as child:
339+
child.add_event("child event")
340+
341+
for data in ("one", "two", "three"):
342+
yield Response(
343+
server_id=request.client_id, response_data=data,
344+
)
345+
346+
# Intercept gRPC calls...
347+
interceptor = server_interceptor()
348+
349+
# setup the server
350+
server = grpc.server(
351+
futures.ThreadPoolExecutor(max_workers=1),
352+
options=(("grpc.so_reuseport", 0),),
353+
interceptors=[interceptor],
354+
)
355+
add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server)
356+
port = server.add_insecure_port("[::]:0")
357+
channel = grpc.insecure_channel("localhost:{:d}".format(port))
358+
359+
# setup the RPC
360+
rpc_call = "/GRPCTestServer/ServerStreamingMethod"
361+
request = Request(client_id=1, request_data="test")
362+
msg = request.SerializeToString()
363+
try:
364+
server.start()
365+
list(channel.unary_stream(rpc_call)(msg))
366+
finally:
367+
server.stop(None)
368+
369+
spans_list = self.memory_exporter.get_finished_spans()
370+
self.assertEqual(len(spans_list), 2)
371+
child_span = spans_list[0]
372+
parent_span = spans_list[1]
373+
374+
self.assertEqual(parent_span.name, rpc_call)
375+
self.assertIs(parent_span.kind, trace.SpanKind.SERVER)
376+
377+
# Check version and name in span's instrumentation info
378+
self.check_span_instrumentation_info(
379+
parent_span, opentelemetry.instrumentation.grpc
380+
)
381+
382+
# Check attributes
383+
self.assert_span_has_attributes(
384+
parent_span,
385+
{
386+
"net.peer.ip": "[::1]",
387+
"net.peer.name": "localhost",
388+
"rpc.method": "ServerStreamingMethod",
389+
"rpc.service": "GRPCTestServer",
179390
"rpc.system": "grpc",
180391
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
181392
},
182393
)
183394

395+
# Check the child span
396+
self.assertEqual(child_span.name, "child")
397+
self.assertEqual(
398+
parent_span.context.trace_id, child_span.context.trace_id
399+
)
400+
184401
def test_span_lifetime(self):
185402
"""Check that the span is active for the duration of the call."""
186403

0 commit comments

Comments
 (0)