Skip to content

Commit 4336dc7

Browse files
Fix arity of context.abort for AIO RPCs (#2066)
1 parent b29682b commit 4336dc7

File tree

3 files changed

+136
-11
lines changed

3 files changed

+136
-11
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
([#1987](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1987))
1414
- `opentelemetry-instrumentation-httpx` Fix mixing async and non async hooks
1515
([#1920](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1920))
16+
- `opentelemetry-instrument-grpc` Fix arity of context.abort for AIO RPCs
17+
([#2066](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2066))
1618

1719
### Fixed
1820

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py

+58-8
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,63 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import grpc
1516
import grpc.aio
16-
17-
from ._server import (
18-
OpenTelemetryServerInterceptor,
19-
_OpenTelemetryServicerContext,
20-
_wrap_rpc_behavior,
21-
)
17+
import wrapt
18+
19+
from opentelemetry.semconv.trace import SpanAttributes
20+
from opentelemetry.trace.status import Status, StatusCode
21+
22+
from ._server import OpenTelemetryServerInterceptor, _wrap_rpc_behavior
23+
24+
25+
# pylint:disable=abstract-method
26+
class _OpenTelemetryAioServicerContext(wrapt.ObjectProxy):
27+
def __init__(self, servicer_context, active_span):
28+
super().__init__(servicer_context)
29+
self._self_active_span = active_span
30+
self._self_code = grpc.StatusCode.OK
31+
self._self_details = None
32+
33+
async def abort(self, code, details="", trailing_metadata=tuple()):
34+
self._self_code = code
35+
self._self_details = details
36+
self._self_active_span.set_attribute(
37+
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
38+
)
39+
self._self_active_span.set_status(
40+
Status(
41+
status_code=StatusCode.ERROR,
42+
description=f"{code}:{details}",
43+
)
44+
)
45+
return await self.__wrapped__.abort(code, details, trailing_metadata)
46+
47+
def set_code(self, code):
48+
self._self_code = code
49+
details = self._self_details or code.value[1]
50+
self._self_active_span.set_attribute(
51+
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
52+
)
53+
if code != grpc.StatusCode.OK:
54+
self._self_active_span.set_status(
55+
Status(
56+
status_code=StatusCode.ERROR,
57+
description=f"{code}:{details}",
58+
)
59+
)
60+
return self.__wrapped__.set_code(code)
61+
62+
def set_details(self, details):
63+
self._self_details = details
64+
if self._self_code != grpc.StatusCode.OK:
65+
self._self_active_span.set_status(
66+
Status(
67+
status_code=StatusCode.ERROR,
68+
description=f"{self._self_code}:{details}",
69+
)
70+
)
71+
return self.__wrapped__.set_details(details)
2272

2373

2474
class OpenTelemetryAioServerInterceptor(
@@ -66,7 +116,7 @@ async def _unary_interceptor(request_or_iterator, context):
66116
set_status_on_exception=False,
67117
) as span:
68118
# wrap the context
69-
context = _OpenTelemetryServicerContext(context, span)
119+
context = _OpenTelemetryAioServicerContext(context, span)
70120

71121
# And now we run the actual RPC.
72122
try:
@@ -91,7 +141,7 @@ async def _stream_interceptor(request_or_iterator, context):
91141
context,
92142
set_status_on_exception=False,
93143
) as span:
94-
context = _OpenTelemetryServicerContext(context, span)
144+
context = _OpenTelemetryAioServicerContext(context, span)
95145

96146
try:
97147
async for response in behavior(

instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py

+76-3
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,11 @@ async def run_with_test_server(
8888
channel = grpc.aio.insecure_channel(f"localhost:{port:d}")
8989

9090
await server.start()
91-
resp = await runnable(channel)
92-
await server.stop(1000)
91+
92+
try:
93+
resp = await runnable(channel)
94+
finally:
95+
await server.stop(1000)
9396

9497
return resp
9598

@@ -514,9 +517,79 @@ async def request(channel):
514517
request = Request(client_id=1, request_data=failure_message)
515518
msg = request.SerializeToString()
516519

517-
with testcase.assertRaises(Exception):
520+
with testcase.assertRaises(grpc.RpcError) as cm:
521+
await channel.unary_unary(rpc_call)(msg)
522+
523+
self.assertEqual(
524+
cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION
525+
)
526+
self.assertEqual(cm.exception.details(), failure_message)
527+
528+
await run_with_test_server(request, servicer=AbortServicer())
529+
530+
spans_list = self.memory_exporter.get_finished_spans()
531+
self.assertEqual(len(spans_list), 1)
532+
span = spans_list[0]
533+
534+
self.assertEqual(span.name, rpc_call)
535+
self.assertIs(span.kind, trace.SpanKind.SERVER)
536+
537+
# Check version and name in span's instrumentation info
538+
self.assertEqualSpanInstrumentationInfo(
539+
span, opentelemetry.instrumentation.grpc
540+
)
541+
542+
# make sure this span errored, with the right status and detail
543+
self.assertEqual(span.status.status_code, StatusCode.ERROR)
544+
self.assertEqual(
545+
span.status.description,
546+
f"{grpc.StatusCode.FAILED_PRECONDITION}:{failure_message}",
547+
)
548+
549+
# Check attributes
550+
self.assertSpanHasAttributes(
551+
span,
552+
{
553+
SpanAttributes.NET_PEER_IP: "[::1]",
554+
SpanAttributes.NET_PEER_NAME: "localhost",
555+
SpanAttributes.RPC_METHOD: "SimpleMethod",
556+
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
557+
SpanAttributes.RPC_SYSTEM: "grpc",
558+
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[
559+
0
560+
],
561+
},
562+
)
563+
564+
async def test_abort_with_trailing_metadata(self):
565+
"""Check that we can catch an abort properly when trailing_metadata provided"""
566+
rpc_call = "/GRPCTestServer/SimpleMethod"
567+
failure_message = "failure message"
568+
569+
class AbortServicer(GRPCTestServerServicer):
570+
# pylint:disable=C0103
571+
async def SimpleMethod(self, request, context):
572+
metadata = (("meta", "data"),)
573+
await context.abort(
574+
grpc.StatusCode.FAILED_PRECONDITION,
575+
failure_message,
576+
trailing_metadata=metadata,
577+
)
578+
579+
testcase = self
580+
581+
async def request(channel):
582+
request = Request(client_id=1, request_data=failure_message)
583+
msg = request.SerializeToString()
584+
585+
with testcase.assertRaises(grpc.RpcError) as cm:
518586
await channel.unary_unary(rpc_call)(msg)
519587

588+
self.assertEqual(
589+
cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION
590+
)
591+
self.assertEqual(cm.exception.details(), failure_message)
592+
520593
await run_with_test_server(request, servicer=AbortServicer())
521594

522595
spans_list = self.memory_exporter.get_finished_spans()

0 commit comments

Comments
 (0)