From 03e021b3a748af073a736d9424f5bf18b6b6fe0d Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Wed, 24 Aug 2022 15:53:52 +0100 Subject: [PATCH 01/15] Add support for service-side grpc.aio This adds support for grpc.aio server interceptors. The vast majority of the code is either re-used wholesale or duplicated with slight modifications from the existing standard interceptors. --- .../setup.cfg | 1 + .../instrumentation/grpc/__init__.py | 114 +++- .../instrumentation/grpc/_aio_server.py | 105 ++++ .../tests/test_aio_server_interceptor.py | 554 ++++++++++++++++++ 4 files changed, 773 insertions(+), 1 deletion(-) create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index 4d20e572bd..9f1d94ab52 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -58,3 +58,4 @@ where = src opentelemetry_instrumentor = grpc_client = opentelemetry.instrumentation.grpc:GrpcInstrumentorClient grpc_server = opentelemetry.instrumentation.grpc:GrpcInstrumentorServer + grpc_aio_server = opentelemetry.instrumentation.grpc:GrpcAioInstrumentorServer diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 177bfe67b5..edfd3f640e 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -108,7 +108,7 @@ def serve(): logging.basicConfig() serve() -You can also add the instrumentor manually, rather than using +You can also add the interceptor manually, rather than using :py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorServer`: .. code-block:: python @@ -118,6 +118,64 @@ def serve(): server = grpc.server(futures.ThreadPoolExecutor(), interceptors = [server_interceptor()]) +Usage Aio Server +------------ +.. code-block:: python + + import logging + import asyncio + + import grpc + + from opentelemetry import trace + from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorServer + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, + ) + + try: + from .gen import helloworld_pb2, helloworld_pb2_grpc + except ImportError: + from gen import helloworld_pb2, helloworld_pb2_grpc + + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + grpc_server_instrumentor = GrpcAioInstrumentorServer() + grpc_server_instrumentor.instrument() + + class Greeter(helloworld_pb2_grpc.GreeterServicer): + async def SayHello(self, request, context): + return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name) + + + async def serve(): + + server = grpc.aio.server() + + helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) + server.add_insecure_port("[::]:50051") + await server.start() + await server.wait_for_termination() + + + if __name__ == "__main__": + logging.basicConfig() + asyncio.run(serve()) + +You can also add the interceptor manually, rather than using +:py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorServer`: + +.. code-block:: python + + from opentelemetry.instrumentation.grpc import aio_server_interceptor + + server = grpc.aio.server(interceptors = [aio_server_interceptor()]) + """ from typing import Collection @@ -174,6 +232,44 @@ def _uninstrument(self, **kwargs): grpc.server = self._original_func +class GrpcAioInstrumentorServer(BaseInstrumentor): + """ + Globally instrument the grpc.aio server. + + Usage:: + + grpc_aio_server_instrumentor = GrpcAioInstrumentorServer() + grpc_aio_server_instrumentor.instrument() + + """ + + # pylint:disable=attribute-defined-outside-init, redefined-outer-name + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs): + self._original_func = grpc.aio.server + tracer_provider = kwargs.get("tracer_provider") + + def server(*args, **kwargs): + if "interceptors" in kwargs: + # add our interceptor as the first + kwargs["interceptors"].insert( + 0, aio_server_interceptor(tracer_provider=tracer_provider) + ) + else: + kwargs["interceptors"] = [ + aio_server_interceptor(tracer_provider=tracer_provider) + ] + return self._original_func(*args, **kwargs) + + grpc.aio.server = server + + def _uninstrument(self, **kwargs): + grpc.aio.server = self._original_func + + class GrpcInstrumentorClient(BaseInstrumentor): """ Globally instrument the grpc client @@ -255,3 +351,19 @@ def server_interceptor(tracer_provider=None): tracer = trace.get_tracer(__name__, __version__, tracer_provider) return _server.OpenTelemetryServerInterceptor(tracer) + + +def aio_server_interceptor(tracer_provider=None): + """Create a gRPC aio server interceptor. + + Args: + tracer: The tracer to use to create server-side spans. + + Returns: + A service-side interceptor object. + """ + from . import _aio_server + + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + + return _aio_server.OpenTelemetryAioServerInterceptor(tracer) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py new file mode 100644 index 0000000000..0909d623db --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -0,0 +1,105 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc.aio + +from ._server import ( + OpenTelemetryServerInterceptor, + _wrap_rpc_behavior, + _OpenTelemetryServicerContext, +) + + +class OpenTelemetryAioServerInterceptor( + grpc.aio.ServerInterceptor, OpenTelemetryServerInterceptor +): + """ + An AsyncIO gRPC server interceptor, to add OpenTelemetry. + Usage:: + tracer = some OpenTelemetry tracer + interceptors = [ + AsyncOpenTelemetryServerInterceptor(tracer), + ] + server = aio.server( + futures.ThreadPoolExecutor(max_workers=concurrency), + interceptors = (interceptors,)) + """ + + async def intercept_service(self, continuation, handler_call_details): + def telemetry_wrapper(behavior, request_streaming, response_streaming): + # handle streaming responses specially + if response_streaming: + return self._intercept_server_stream( + behavior, + handler_call_details, + ) + else: + return self._intercept_server_unary( + behavior, + handler_call_details, + ) + + next_handler = await continuation(handler_call_details) + + return _wrap_rpc_behavior(next_handler, telemetry_wrapper) + + def _intercept_server_unary(self, behavior, handler_call_details): + async def _unary_interceptor(request_or_iterator, context): + with self._set_remote_context(context): + with self._start_span( + handler_call_details, + context, + set_status_on_exception=False, + ) as span: + # wrap the context + context = _OpenTelemetryServicerContext(context, span) + + # And now we run the actual RPC. + try: + return await behavior(request_or_iterator, context) + + except Exception as error: + # Bare exceptions are likely to be gRPC aborts, which + # we handle in our context wrapper. + # Here, we're interested in uncaught exceptions. + # pylint:disable=unidiomatic-typecheck + if type(error) != Exception: + span.record_exception(error) + raise error + + return _unary_interceptor + + def _intercept_server_stream(self, behavior, handler_call_details): + async def _stream_interceptor(request_or_iterator, context): + with self._set_remote_context(context): + with self._start_span( + handler_call_details, + context, + set_status_on_exception=False, + ) as span: + context = _OpenTelemetryServicerContext(context, span) + + try: + async for response in behavior( + request_or_iterator, context + ): + yield response + + except Exception as error: + # pylint:disable=unidiomatic-typecheck + if type(error) != Exception: + span.record_exception(error) + raise error + + return _stream_interceptor diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py new file mode 100644 index 0000000000..0f6ecd8747 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py @@ -0,0 +1,554 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=unused-argument +# pylint:disable=no-self-use +import asyncio +import grpc +import grpc.aio +from concurrent.futures.thread import ThreadPoolExecutor + +from time import sleep +from opentelemetry.test.test_base import TestBase +from opentelemetry import trace +import opentelemetry.instrumentation.grpc +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.trace import StatusCode + +from .protobuf.test_server_pb2 import Request, Response +from .protobuf.test_server_pb2_grpc import ( + GRPCTestServerServicer, + add_GRPCTestServerServicer_to_server, +) +from opentelemetry.instrumentation.grpc import ( + GrpcAioInstrumentorServer, + aio_server_interceptor, +) + + +class Servicer(GRPCTestServerServicer): + """Our test servicer""" + + # pylint:disable=C0103 + async def SimpleMethod(self, request, context): + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + # pylint:disable=C0103 + async def ServerStreamingMethod(self, request, context): + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, + response_data=data, + ) + + +def run_with_test_server(runnable, servicer=Servicer(), add_interceptor=True): + if add_interceptor: + interceptors = [aio_server_interceptor()] + server = grpc.aio.server(interceptors=interceptors) + else: + server = grpc.aio.server() + + add_GRPCTestServerServicer_to_server(servicer, server) + + port = server.add_insecure_port("[::]:0") + channel = grpc.aio.insecure_channel(f"localhost:{port:d}") + + async def do_request(): + await server.start() + resp = await runnable(channel) + await server.stop(1000) + return resp + + loop = asyncio.get_event_loop_policy().get_event_loop() + return loop.run_until_complete(do_request()) + + +class TestOpenTelemetryAioServerInterceptor(TestBase): + def test_instrumentor(self): + """Check that automatic instrumentation configures the interceptor""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + grpc_aio_server_instrumentor = GrpcAioInstrumentorServer() + grpc_aio_server_instrumentor.instrument() + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + run_with_test_server(request, add_interceptor=False) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + grpc_aio_server_instrumentor.uninstrument() + + + def test_uninstrument(self): + """Check that uninstrument removes the interceptor""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + grpc_aio_server_instrumentor = GrpcAioInstrumentorServer() + grpc_aio_server_instrumentor.instrument() + grpc_aio_server_instrumentor.uninstrument() + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + run_with_test_server(request, add_interceptor=False) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + + def test_create_span(self): + """Check that the interceptor wraps calls with spans server-side.""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + run_with_test_server(request) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + def test_create_two_spans(self): + """Verify that the interceptor captures sub spans within the given + trace""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + class TwoSpanServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def SimpleMethod(self, request, context): + + # create another span + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("child") as child: + child.add_event("child event") + + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + run_with_test_server(request, servicer=TwoSpanServicer()) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 2) + child_span = spans_list[0] + parent_span = spans_list[1] + + self.assertEqual(parent_span.name, rpc_call) + self.assertIs(parent_span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + parent_span, opentelemetry.instrumentation.grpc + ) + + # Check attributes + self.assertSpanHasAttributes( + parent_span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + # Check the child span + self.assertEqual(child_span.name, "child") + self.assertEqual( + parent_span.context.trace_id, child_span.context.trace_id + ) + + def test_create_span_streaming(self): + """Check that the interceptor wraps calls with spans server-side, on a + streaming call.""" + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + async for response in channel.unary_stream(rpc_call)(msg): + print(response) + + run_with_test_server(request) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + def test_create_two_spans_streaming(self): + """Verify that the interceptor captures sub spans within the given + trace""" + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + + class TwoSpanServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def ServerStreamingMethod(self, request, context): + # create another span + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("child") as child: + child.add_event("child event") + + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, + response_data=data, + ) + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + async for response in channel.unary_stream(rpc_call)(msg): + print(response) + + run_with_test_server(request, servicer=TwoSpanServicer()) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 2) + child_span = spans_list[0] + parent_span = spans_list[1] + + self.assertEqual(parent_span.name, rpc_call) + self.assertIs(parent_span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + parent_span, opentelemetry.instrumentation.grpc + ) + + # Check attributes + self.assertSpanHasAttributes( + parent_span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + # Check the child span + self.assertEqual(child_span.name, "child") + self.assertEqual( + parent_span.context.trace_id, child_span.context.trace_id + ) + + def test_span_lifetime(self): + """Verify that the interceptor captures sub spans within the given + trace""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + class SpanLifetimeServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def SimpleMethod(self, request, context): + self.span = trace.get_current_span() + + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + lifetime_servicer = SpanLifetimeServicer() + active_span_before_call = trace.get_current_span() + + run_with_test_server(request, servicer=lifetime_servicer) + + active_span_in_handler = lifetime_servicer.span + active_span_after_call = trace.get_current_span() + + self.assertEqual(active_span_before_call, trace.INVALID_SPAN) + self.assertEqual(active_span_after_call, trace.INVALID_SPAN) + self.assertIsInstance(active_span_in_handler, trace_sdk.Span) + self.assertIsNone(active_span_in_handler.parent) + + def test_sequential_server_spans(self): + """Check that sequential RPCs get separate server spans.""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + async def sequential_requests(channel): + await request(channel) + await request(channel) + + run_with_test_server(sequential_requests) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 2) + + span1, span2 = spans_list + # Spans should belong to separate traces + self.assertNotEqual(span1.context.span_id, span2.context.span_id) + self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) + + for span in (span1, span2): + # each should be a root span + self.assertIsNone(span2.parent) + + # check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + def test_concurrent_server_spans(self): + """Check that concurrent RPC calls don't interfere with each other. + + This is the same check as test_sequential_server_spans except that the + RPCs are concurrent. Two handlers are invoked at the same time on two + separate threads. Each one should see a different active span and + context. + """ + rpc_call = "/GRPCTestServer/SimpleMethod" + latch = get_latch(2) + + class LatchedServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def SimpleMethod(self, request, context): + await latch() + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + async def concurrent_requests(channel): + await asyncio.gather(request(channel), request(channel)) + + run_with_test_server(concurrent_requests, servicer=LatchedServicer()) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 2) + + span1, span2 = spans_list + # Spans should belong to separate traces + self.assertNotEqual(span1.context.span_id, span2.context.span_id) + self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) + + for span in (span1, span2): + # each should be a root span + self.assertIsNone(span2.parent) + + # check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + def test_abort(self): + """Check that we can catch an abort properly""" + rpc_call = "/GRPCTestServer/SimpleMethod" + failure_message = "failure message" + + class AbortServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def SimpleMethod(self, request, context): + await context.abort( + grpc.StatusCode.FAILED_PRECONDITION, failure_message + ) + + testcase = self + + async def request(channel): + request = Request(client_id=1, request_data=failure_message) + msg = request.SerializeToString() + + with testcase.assertRaises(Exception): + await channel.unary_unary(rpc_call)(msg) + + run_with_test_server(request, servicer=AbortServicer()) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + child_span = spans_list[0] + + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # make sure this span errored, with the right status and detail + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual( + span.status.description, + f"{grpc.StatusCode.FAILED_PRECONDITION}:{failure_message}", + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[ + 0 + ], + }, + ) + + +def get_latch(num): + """Get a countdown latch function for use in n threads.""" + cv = asyncio.Condition() + count = 0 + + async def countdown_latch(): + """Block until n-1 other threads have called.""" + nonlocal count + async with cv: + count += 1 + cv.notify() + + async with cv: + while count < num: + await cv.wait() + + return countdown_latch From ff4403cbdb79255354d9889493613cce317835da Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Tue, 30 Aug 2022 16:41:34 +0100 Subject: [PATCH 02/15] Add grpc.aio client-side support This adds support for instrumenting grpc.aio channels with spans and telemetry. The instrumentation needed to work differently that the standard grpc channel support but is functionally the same. --- .../setup.cfg | 1 + .../instrumentation/grpc/__init__.py | 126 ++++++ .../instrumentation/grpc/_aio_client.py | 219 +++++++++++ .../tests/_aio_client.py | 56 +++ .../tests/test_aio_client_interceptor.py | 361 ++++++++++++++++++ .../tests/test_aio_server_interceptor.py | 65 ++-- tox.ini | 1 + 7 files changed, 798 insertions(+), 31 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/_aio_client.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index 9f1d94ab52..2f5731df08 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -58,4 +58,5 @@ where = src opentelemetry_instrumentor = grpc_client = opentelemetry.instrumentation.grpc:GrpcInstrumentorClient grpc_server = opentelemetry.instrumentation.grpc:GrpcInstrumentorServer + grpc_aio_client = opentelemetry.instrumentation.grpc:GrpcAioInstrumentorClient grpc_aio_server = opentelemetry.instrumentation.grpc:GrpcAioInstrumentorServer diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index edfd3f640e..189cc45f9e 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -118,6 +118,59 @@ def serve(): server = grpc.server(futures.ThreadPoolExecutor(), interceptors = [server_interceptor()]) +Usage Aio Client +------------ +.. code-block:: python + + import logging + import asyncio + + import grpc + + from opentelemetry import trace + from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorClient + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, + ) + + try: + from .gen import helloworld_pb2, helloworld_pb2_grpc + except ImportError: + from gen import helloworld_pb2, helloworld_pb2_grpc + + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + grpc_client_instrumentor = GrpcAioInstrumentorClient() + grpc_client_instrumentor.instrument() + + async def run(): + with grpc.aio.insecure_channel("localhost:50051") as channel: + + stub = helloworld_pb2_grpc.GreeterStub(channel) + response = await stub.SayHello(helloworld_pb2.HelloRequest(name="YOU")) + + print("Greeter client received: " + response.message) + + + if __name__ == "__main__": + logging.basicConfig() + asyncio.run(run()) + +You can also add the interceptor manually, rather than using +:py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorClient`: + +.. code-block:: python + + from opentelemetry.instrumentation.grpc import aio_client_interceptors + + channel = grpc.aio.insecure_channel("localhost:12345", interceptors=aio_client_interceptors()) + + Usage Aio Server ------------ .. code-block:: python @@ -321,6 +374,58 @@ def wrapper_fn(self, original_func, instance, args, kwargs): ) +class GrpcAioInstrumentorClient(BaseInstrumentor): + """ + Globally instrument the grpc.aio client. + + Usage:: + + grpc_aio_client_instrumentor = GrpcAioInstrumentorClient() + grpc_aio_client_instrumentor.instrument() + + """ + + # pylint:disable=attribute-defined-outside-init, redefined-outer-name + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _add_interceptors(self, tracer_provider, kwargs): + if "interceptors" in kwargs and kwargs["interceptors"]: + kwargs["interceptors"] = ( + aio_client_interceptors(tracer_provider=tracer_provider) + + kwargs["interceptors"] + ) + else: + kwargs["interceptors"] = aio_client_interceptors( + tracer_provider=tracer_provider + ) + + return kwargs + + def _instrument(self, **kwargs): + self._original_insecure = grpc.aio.insecure_channel + self._original_secure = grpc.aio.secure_channel + tracer_provider = kwargs.get("tracer_provider") + + def insecure(*args, **kwargs): + kwargs = self._add_interceptors(tracer_provider, kwargs) + + return self._original_insecure(*args, **kwargs) + + def secure(*args, **kwargs): + kwargs = self._add_interceptors(tracer_provider, kwargs) + + return self._original_secure(*args, **kwargs) + + grpc.aio.insecure_channel = insecure + grpc.aio.secure_channel = secure + + def _uninstrument(self, **kwargs): + grpc.aio.insecure_channel = self._original_insecure + grpc.aio.secure_channel = self._original_secure + + def client_interceptor(tracer_provider=None): """Create a gRPC client channel interceptor. @@ -353,6 +458,27 @@ def server_interceptor(tracer_provider=None): return _server.OpenTelemetryServerInterceptor(tracer) +def aio_client_interceptors(tracer_provider=None): + """Create a gRPC client channel interceptor. + + Args: + tracer: The tracer to use to create client-side spans. + + Returns: + An invocation-side interceptor object. + """ + from . import _aio_client + + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + + return [ + _aio_client.UnaryUnaryAioClientInterceptor(tracer), + _aio_client.UnaryStreamAioClientInterceptor(tracer), + _aio_client.StreamUnaryAioClientInterceptor(tracer), + _aio_client.StreamStreamAioClientInterceptor(tracer), + ] + + def aio_server_interceptor(tracer_provider=None): """Create a gRPC aio server interceptor. diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py new file mode 100644 index 0000000000..df5ed15c8a --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -0,0 +1,219 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import OrderedDict +import functools + +import grpc +from grpc.aio import ClientCallDetails + +from opentelemetry import context, trace +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.propagate import inject +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.instrumentation.grpc.version import __version__ + +from opentelemetry.trace.status import Status, StatusCode + +from opentelemetry.instrumentation.grpc._client import ( + OpenTelemetryClientInterceptor, + _carrier_setter, +) + + +def _unary_done_callback(span, code, details): + def callback(call): + try: + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, + code.value[0], + ) + if code != grpc.StatusCode.OK: + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=details, + ) + ) + finally: + span.end() + + return callback + + +class _BaseAioClientInterceptor(OpenTelemetryClientInterceptor): + @staticmethod + def propagate_trace_in_details(client_call_details): + method = client_call_details.method.decode("utf-8") + metadata = client_call_details.metadata + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + inject(mutable_metadata, setter=_carrier_setter) + metadata = tuple(mutable_metadata.items()) + + return ClientCallDetails( + client_call_details.method, + client_call_details.timeout, + metadata, + client_call_details.credentials, + client_call_details.wait_for_ready, + ) + + @staticmethod + def add_error_details_to_span(span, exc): + if isinstance(exc, grpc.RpcError): + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, + exc.code().value[0], + ) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{type(exc).__name__}: {exc}", + ) + ) + span.record_exception(exc) + + async def _wrap_unary_response(self, continuation, span): + try: + call = await continuation() + + # code and details are both coroutines that need to be await-ed, + # the callbacks added with add_done_callback do not allow async + # code so we need to get the code and details here then pass them + # to the callback. + code = await call.code() + details = await call.details() + + call.add_done_callback(_unary_done_callback(span, code, details)) + + return call + except grpc.aio.AioRpcError as exc: + self.add_error_details_to_span(span, exc) + raise exc + + async def _wrap_stream_response(self, span, call): + try: + async for response in call: + yield response + except Exception as exc: + self.add_error_details_to_span(span, exc) + raise exc + finally: + span.end() + + +class UnaryUnaryAioClientInterceptor( + grpc.aio.UnaryUnaryClientInterceptor, + _BaseAioClientInterceptor, +): + async def intercept_unary_unary( + self, continuation, client_call_details, request + ): + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await continuation(client_call_details, request) + + method = client_call_details.method.decode("utf-8") + with self._start_span( + method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) + + continuation_with_args = functools.partial( + continuation, new_details, request + ) + return await self._wrap_unary_response( + continuation_with_args, span + ) + + +class UnaryStreamAioClientInterceptor( + grpc.aio.UnaryStreamClientInterceptor, + _BaseAioClientInterceptor, +): + async def intercept_unary_stream( + self, continuation, client_call_details, request + ): + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await continuation(client_call_details, request) + + method = client_call_details.method.decode("utf-8") + with self._start_span( + method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) + + resp = await continuation(new_details, request) + + return self._wrap_stream_response(span, resp) + + +class StreamUnaryAioClientInterceptor( + grpc.aio.StreamUnaryClientInterceptor, + _BaseAioClientInterceptor, +): + async def intercept_stream_unary( + self, continuation, client_call_details, request_iterator + ): + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await continuation(client_call_details, request_iterator) + + method = client_call_details.method.decode("utf-8") + with self._start_span( + method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) + + continuation_with_args = functools.partial( + continuation, new_details, request_iterator + ) + return await self._wrap_unary_response( + continuation_with_args, span + ) + + +class StreamStreamAioClientInterceptor( + grpc.aio.StreamStreamClientInterceptor, + _BaseAioClientInterceptor, +): + async def intercept_stream_stream( + self, continuation, client_call_details, request_iterator + ): + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await continuation(client_call_details, request_iterator) + + method = client_call_details.method.decode("utf-8") + with self._start_span( + method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) + + resp = await continuation(new_details, request_iterator) + + return self._wrap_stream_response(span, resp) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_aio_client.py new file mode 100644 index 0000000000..9658df1587 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_aio_client.py @@ -0,0 +1,56 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .protobuf.test_server_pb2 import Request + +CLIENT_ID = 1 + + +async def simple_method(stub, error=False): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + return await stub.SimpleMethod(request) + + +async def client_streaming_method(stub, error=False): + # create a generator + def request_messages(): + for _ in range(5): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + yield request + + return await stub.ClientStreamingMethod(request_messages()) + + +def server_streaming_method(stub, error=False): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + + return stub.ServerStreamingMethod(request) + + +def bidirectional_streaming_method(stub, error=False): + # create a generator + def request_messages(): + for _ in range(5): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + yield request + + return stub.BidirectionalStreamingMethod(request_messages()) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py new file mode 100644 index 0000000000..ea956057bb --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py @@ -0,0 +1,361 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest +from unittest import IsolatedAsyncioTestCase + +import asyncio +import grpc +from grpc.aio import ClientCallDetails + +import opentelemetry.instrumentation.grpc +from opentelemetry import context, trace +from opentelemetry.instrumentation.grpc import ( + aio_client_interceptors, + GrpcAioInstrumentorClient, +) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.propagate import get_global_textmap, set_global_textmap +from opentelemetry.semconv.trace import SpanAttributes + +from opentelemetry.test.mock_textmap import MockTextMapPropagator +from opentelemetry.test.test_base import TestBase + +from tests.protobuf import ( # pylint: disable=no-name-in-module + test_server_pb2_grpc, + test_server_pb2, +) +from .protobuf.test_server_pb2 import Request + +from ._aio_client import ( + simple_method, + server_streaming_method, + client_streaming_method, + bidirectional_streaming_method, +) +from ._server import create_test_server + +from opentelemetry.instrumentation.grpc._aio_client import ( + UnaryUnaryAioClientInterceptor, +) + + +class RecordingInterceptor(grpc.aio.UnaryUnaryClientInterceptor): + recorded_details = None + + async def intercept_unary_unary( + self, continuation, client_call_details, request + ): + self.recorded_details = client_call_details + return await continuation(client_call_details, request) + + +@pytest.mark.asyncio +class TestAioClientInterceptor(TestBase, IsolatedAsyncioTestCase): + def setUp(self): + super().setUp() + self.server = create_test_server(25565) + self.server.start() + + interceptors = aio_client_interceptors() + self._channel = grpc.aio.insecure_channel( + "localhost:25565", interceptors=interceptors + ) + + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self._channel) + + def tearDown(self): + super().tearDown() + self.server.stop(1000) + + async def asyncTearDown(self): + await self._channel.close() + + async def test_instrument(self): + instrumentor = GrpcAioInstrumentorClient() + + try: + instrumentor.instrument() + + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + response = await simple_method(stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + finally: + instrumentor.uninstrument() + + async def test_uninstrument(self): + instrumentor = GrpcAioInstrumentorClient() + + instrumentor.instrument() + instrumentor.uninstrument() + + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + response = await simple_method(stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + async def test_unary_unary(self): + response = await simple_method(self._stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + async def test_unary_stream(self): + async for response in server_streaming_method(self._stub): + self.assertEqual(response.response_data, "data") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + async def test_stream_unary(self): + response = await client_streaming_method(self._stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ClientStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "ClientStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + async def test_stream_stream(self): + async for response in bidirectional_streaming_method(self._stub): + self.assertEqual(response.response_data, "data") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual( + span.name, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "BidirectionalStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + + async def test_error_simple(self): + with self.assertRaises(grpc.RpcError): + await simple_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs( + span.status.status_code, + trace.StatusCode.ERROR, + ) + + async def test_error_unary_stream(self): + with self.assertRaises(grpc.RpcError): + async for _ in server_streaming_method(self._stub, error=True): + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs( + span.status.status_code, + trace.StatusCode.ERROR, + ) + + async def test_error_stream_unary(self): + with self.assertRaises(grpc.RpcError): + await client_streaming_method(self._stub, error=True) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs( + span.status.status_code, + trace.StatusCode.ERROR, + ) + + async def test_error_stream_stream(self): + with self.assertRaises(grpc.RpcError): + async for _ in bidirectional_streaming_method( + self._stub, error=True + ): + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIs( + span.status.status_code, + trace.StatusCode.ERROR, + ) + + async def test_client_interceptor_trace_context_propagation(self): + """ensure that client interceptor correctly inject trace context into all outgoing requests.""" + + previous_propagator = get_global_textmap() + + try: + set_global_textmap(MockTextMapPropagator()) + + interceptor = UnaryUnaryAioClientInterceptor(trace.NoOpTracer()) + recording_interceptor = RecordingInterceptor() + interceptors = [interceptor, recording_interceptor] + + channel = grpc.aio.insecure_channel( + "localhost:25565", interceptors=interceptors + ) + + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + await simple_method(stub) + + metadata = recording_interceptor.recorded_details.metadata + assert len(metadata) == 2 + assert metadata[0][0] == "mock-traceid" + assert metadata[0][1] == "0" + assert metadata[1][0] == "mock-spanid" + assert metadata[1][1] == "0" + finally: + set_global_textmap(previous_propagator) + + async def test_unary_unary_with_suppress_key(self): + token = context.attach( + context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) + ) + try: + response = await simple_method(self._stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + finally: + context.detach(token) + + async def test_unary_stream_with_suppress_key(self): + token = context.attach( + context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) + ) + try: + async for response in server_streaming_method(self._stub): + self.assertEqual(response.response_data, "data") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + finally: + context.detach(token) + + async def test_stream_unary_with_suppress_key(self): + token = context.attach( + context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) + ) + try: + response = await client_streaming_method(self._stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + finally: + context.detach(token) + + async def test_stream_unary_with_suppress_key(self): + token = context.attach( + context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) + ) + try: + async for response in bidirectional_streaming_method(self._stub): + self.assertEqual(response.response_data, "data") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + finally: + context.detach(token) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py index 0f6ecd8747..48cc6105c8 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py @@ -14,6 +14,9 @@ # pylint:disable=unused-argument # pylint:disable=no-self-use +import pytest +from unittest import IsolatedAsyncioTestCase + import asyncio import grpc import grpc.aio @@ -57,7 +60,9 @@ async def ServerStreamingMethod(self, request, context): ) -def run_with_test_server(runnable, servicer=Servicer(), add_interceptor=True): +async def run_with_test_server( + runnable, servicer=Servicer(), add_interceptor=True +): if add_interceptor: interceptors = [aio_server_interceptor()] server = grpc.aio.server(interceptors=interceptors) @@ -69,18 +74,16 @@ def run_with_test_server(runnable, servicer=Servicer(), add_interceptor=True): port = server.add_insecure_port("[::]:0") channel = grpc.aio.insecure_channel(f"localhost:{port:d}") - async def do_request(): - await server.start() - resp = await runnable(channel) - await server.stop(1000) - return resp + await server.start() + resp = await runnable(channel) + await server.stop(1000) - loop = asyncio.get_event_loop_policy().get_event_loop() - return loop.run_until_complete(do_request()) + return resp -class TestOpenTelemetryAioServerInterceptor(TestBase): - def test_instrumentor(self): +@pytest.mark.asyncio +class TestOpenTelemetryAioServerInterceptor(TestBase, IsolatedAsyncioTestCase): + async def test_instrumentor(self): """Check that automatic instrumentation configures the interceptor""" rpc_call = "/GRPCTestServer/SimpleMethod" @@ -92,7 +95,7 @@ async def request(channel): msg = request.SerializeToString() return await channel.unary_unary(rpc_call)(msg) - run_with_test_server(request, add_interceptor=False) + await run_with_test_server(request, add_interceptor=False) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) @@ -123,8 +126,7 @@ async def request(channel): grpc_aio_server_instrumentor.uninstrument() - - def test_uninstrument(self): + async def test_uninstrument(self): """Check that uninstrument removes the interceptor""" rpc_call = "/GRPCTestServer/SimpleMethod" @@ -137,13 +139,12 @@ async def request(channel): msg = request.SerializeToString() return await channel.unary_unary(rpc_call)(msg) - run_with_test_server(request, add_interceptor=False) + await run_with_test_server(request, add_interceptor=False) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 0) - - def test_create_span(self): + async def test_create_span(self): """Check that the interceptor wraps calls with spans server-side.""" rpc_call = "/GRPCTestServer/SimpleMethod" @@ -152,7 +153,7 @@ async def request(channel): msg = request.SerializeToString() return await channel.unary_unary(rpc_call)(msg) - run_with_test_server(request) + await run_with_test_server(request) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) @@ -181,7 +182,7 @@ async def request(channel): }, ) - def test_create_two_spans(self): + async def test_create_two_spans(self): """Verify that the interceptor captures sub spans within the given trace""" rpc_call = "/GRPCTestServer/SimpleMethod" @@ -205,7 +206,7 @@ async def request(channel): msg = request.SerializeToString() return await channel.unary_unary(rpc_call)(msg) - run_with_test_server(request, servicer=TwoSpanServicer()) + await run_with_test_server(request, servicer=TwoSpanServicer()) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 2) @@ -241,7 +242,7 @@ async def request(channel): parent_span.context.trace_id, child_span.context.trace_id ) - def test_create_span_streaming(self): + async def test_create_span_streaming(self): """Check that the interceptor wraps calls with spans server-side, on a streaming call.""" rpc_call = "/GRPCTestServer/ServerStreamingMethod" @@ -252,7 +253,7 @@ async def request(channel): async for response in channel.unary_stream(rpc_call)(msg): print(response) - run_with_test_server(request) + await run_with_test_server(request) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) @@ -281,7 +282,7 @@ async def request(channel): }, ) - def test_create_two_spans_streaming(self): + async def test_create_two_spans_streaming(self): """Verify that the interceptor captures sub spans within the given trace""" rpc_call = "/GRPCTestServer/ServerStreamingMethod" @@ -306,7 +307,7 @@ async def request(channel): async for response in channel.unary_stream(rpc_call)(msg): print(response) - run_with_test_server(request, servicer=TwoSpanServicer()) + await run_with_test_server(request, servicer=TwoSpanServicer()) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 2) @@ -342,7 +343,7 @@ async def request(channel): parent_span.context.trace_id, child_span.context.trace_id ) - def test_span_lifetime(self): + async def test_span_lifetime(self): """Verify that the interceptor captures sub spans within the given trace""" rpc_call = "/GRPCTestServer/SimpleMethod" @@ -365,7 +366,7 @@ async def request(channel): lifetime_servicer = SpanLifetimeServicer() active_span_before_call = trace.get_current_span() - run_with_test_server(request, servicer=lifetime_servicer) + await run_with_test_server(request, servicer=lifetime_servicer) active_span_in_handler = lifetime_servicer.span active_span_after_call = trace.get_current_span() @@ -375,7 +376,7 @@ async def request(channel): self.assertIsInstance(active_span_in_handler, trace_sdk.Span) self.assertIsNone(active_span_in_handler.parent) - def test_sequential_server_spans(self): + async def test_sequential_server_spans(self): """Check that sequential RPCs get separate server spans.""" rpc_call = "/GRPCTestServer/SimpleMethod" @@ -388,7 +389,7 @@ async def sequential_requests(channel): await request(channel) await request(channel) - run_with_test_server(sequential_requests) + await run_with_test_server(sequential_requests) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 2) @@ -417,7 +418,7 @@ async def sequential_requests(channel): }, ) - def test_concurrent_server_spans(self): + async def test_concurrent_server_spans(self): """Check that concurrent RPC calls don't interfere with each other. This is the same check as test_sequential_server_spans except that the @@ -445,7 +446,9 @@ async def request(channel): async def concurrent_requests(channel): await asyncio.gather(request(channel), request(channel)) - run_with_test_server(concurrent_requests, servicer=LatchedServicer()) + await run_with_test_server( + concurrent_requests, servicer=LatchedServicer() + ) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 2) @@ -474,7 +477,7 @@ async def concurrent_requests(channel): }, ) - def test_abort(self): + async def test_abort(self): """Check that we can catch an abort properly""" rpc_call = "/GRPCTestServer/SimpleMethod" failure_message = "failure message" @@ -495,7 +498,7 @@ async def request(channel): with testcase.assertRaises(Exception): await channel.unary_unary(rpc_call)(msg) - run_with_test_server(request, servicer=AbortServicer()) + await run_with_test_server(request, servicer=AbortServicer()) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) diff --git a/tox.ini b/tox.ini index a4cea4a49a..699d732614 100644 --- a/tox.ini +++ b/tox.ini @@ -229,6 +229,7 @@ deps = falcon1: falcon ==1.4.1 falcon2: falcon >=2.0.0,<3.0.0 falcon3: falcon >=3.0.0,<4.0.0 + grpc: pytest-asyncio sqlalchemy11: sqlalchemy>=1.1,<1.2 sqlalchemy14: aiosqlite sqlalchemy14: sqlalchemy~=1.4 From a9552186e8f11220798de1672f424bf5328d7105 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Fri, 2 Sep 2022 16:41:32 +0100 Subject: [PATCH 03/15] Fix assorted CI issues This fixes assorted issues highlighted by CI, like unused imports, import ordering, "malformed" docstrings, etc --- .../instrumentation/grpc/__init__.py | 4 +- .../instrumentation/grpc/_aio_client.py | 16 +++----- .../instrumentation/grpc/_aio_server.py | 18 ++++----- .../tests/test_aio_client_interceptor.py | 31 ++++++--------- .../tests/test_aio_server_interceptor.py | 38 +++++++++---------- 5 files changed, 47 insertions(+), 60 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 189cc45f9e..3e52ddb753 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -119,7 +119,7 @@ def serve(): interceptors = [server_interceptor()]) Usage Aio Client ------------- +---------------- .. code-block:: python import logging @@ -172,7 +172,7 @@ async def run(): Usage Aio Server ------------- +---------------- .. code-block:: python import logging diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index df5ed15c8a..6d8e0b4af5 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -12,24 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import OrderedDict import functools +from collections import OrderedDict import grpc from grpc.aio import ClientCallDetails -from opentelemetry import context, trace -from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.propagate import inject -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY -from opentelemetry.instrumentation.grpc.version import __version__ - -from opentelemetry.trace.status import Status, StatusCode - +from opentelemetry import context from opentelemetry.instrumentation.grpc._client import ( OpenTelemetryClientInterceptor, _carrier_setter, ) +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.propagate import inject +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace.status import Status, StatusCode def _unary_done_callback(span, code, details): @@ -55,7 +52,6 @@ def callback(call): class _BaseAioClientInterceptor(OpenTelemetryClientInterceptor): @staticmethod def propagate_trace_in_details(client_call_details): - method = client_call_details.method.decode("utf-8") metadata = client_call_details.metadata if not metadata: mutable_metadata = OrderedDict() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index 0909d623db..10e4c8c62f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -16,8 +16,8 @@ from ._server import ( OpenTelemetryServerInterceptor, - _wrap_rpc_behavior, _OpenTelemetryServicerContext, + _wrap_rpc_behavior, ) @@ -40,21 +40,21 @@ async def intercept_service(self, continuation, handler_call_details): def telemetry_wrapper(behavior, request_streaming, response_streaming): # handle streaming responses specially if response_streaming: - return self._intercept_server_stream( - behavior, - handler_call_details, - ) - else: - return self._intercept_server_unary( + return self._intercept_aio_server_stream( behavior, handler_call_details, ) + return self._intercept_aio_server_unary( + behavior, + handler_call_details, + ) + next_handler = await continuation(handler_call_details) return _wrap_rpc_behavior(next_handler, telemetry_wrapper) - def _intercept_server_unary(self, behavior, handler_call_details): + def _intercept_aio_server_unary(self, behavior, handler_call_details): async def _unary_interceptor(request_or_iterator, context): with self._set_remote_context(context): with self._start_span( @@ -80,7 +80,7 @@ async def _unary_interceptor(request_or_iterator, context): return _unary_interceptor - def _intercept_server_stream(self, behavior, handler_call_details): + def _intercept_aio_server_stream(self, behavior, handler_call_details): async def _stream_interceptor(request_or_iterator, context): with self._set_remote_context(context): with self._start_span( diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py index ea956057bb..ecac8d6b00 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py @@ -11,44 +11,34 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import pytest from unittest import IsolatedAsyncioTestCase -import asyncio import grpc -from grpc.aio import ClientCallDetails +import pytest import opentelemetry.instrumentation.grpc from opentelemetry import context, trace from opentelemetry.instrumentation.grpc import ( - aio_client_interceptors, GrpcAioInstrumentorClient, + aio_client_interceptors, +) +from opentelemetry.instrumentation.grpc._aio_client import ( + UnaryUnaryAioClientInterceptor, ) -from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.propagate import get_global_textmap, set_global_textmap from opentelemetry.semconv.trace import SpanAttributes - from opentelemetry.test.mock_textmap import MockTextMapPropagator from opentelemetry.test.test_base import TestBase -from tests.protobuf import ( # pylint: disable=no-name-in-module - test_server_pb2_grpc, - test_server_pb2, -) -from .protobuf.test_server_pb2 import Request - from ._aio_client import ( - simple_method, - server_streaming_method, - client_streaming_method, bidirectional_streaming_method, + client_streaming_method, + server_streaming_method, + simple_method, ) from ._server import create_test_server - -from opentelemetry.instrumentation.grpc._aio_client import ( - UnaryUnaryAioClientInterceptor, -) +from .protobuf import test_server_pb2_grpc # pylint: disable=no-name-in-module class RecordingInterceptor(grpc.aio.UnaryUnaryClientInterceptor): @@ -280,6 +270,7 @@ async def test_error_stream_stream(self): trace.StatusCode.ERROR, ) + # pylint:disable=no-self-use async def test_client_interceptor_trace_context_propagation(self): """ensure that client interceptor correctly inject trace context into all outgoing requests.""" @@ -347,7 +338,7 @@ async def test_stream_unary_with_suppress_key(self): finally: context.detach(token) - async def test_stream_unary_with_suppress_key(self): + async def test_stream_stream_with_suppress_key(self): token = context.attach( context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True) ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py index 48cc6105c8..4b6d5dd0a5 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py @@ -11,23 +11,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -# pylint:disable=unused-argument -# pylint:disable=no-self-use -import pytest +import asyncio from unittest import IsolatedAsyncioTestCase -import asyncio import grpc import grpc.aio -from concurrent.futures.thread import ThreadPoolExecutor +import pytest -from time import sleep -from opentelemetry.test.test_base import TestBase -from opentelemetry import trace import opentelemetry.instrumentation.grpc -from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry import trace +from opentelemetry.instrumentation.grpc import ( + GrpcAioInstrumentorServer, + aio_server_interceptor, +) from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase from opentelemetry.trace import StatusCode from .protobuf.test_server_pb2 import Request, Response @@ -35,10 +34,9 @@ GRPCTestServerServicer, add_GRPCTestServerServicer_to_server, ) -from opentelemetry.instrumentation.grpc import ( - GrpcAioInstrumentorServer, - aio_server_interceptor, -) + +# pylint:disable=unused-argument +# pylint:disable=no-self-use class Servicer(GRPCTestServerServicer): @@ -351,6 +349,7 @@ async def test_span_lifetime(self): class SpanLifetimeServicer(GRPCTestServerServicer): # pylint:disable=C0103 async def SimpleMethod(self, request, context): + # pylint:disable=attribute-defined-outside-init self.span = trace.get_current_span() return Response( @@ -394,7 +393,9 @@ async def sequential_requests(channel): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 2) - span1, span2 = spans_list + span1 = spans_list[0] + span2 = spans_list[1] + # Spans should belong to separate traces self.assertNotEqual(span1.context.span_id, span2.context.span_id) self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) @@ -453,7 +454,9 @@ async def concurrent_requests(channel): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 2) - span1, span2 = spans_list + span1 = spans_list[0] + span2 = spans_list[1] + # Spans should belong to separate traces self.assertNotEqual(span1.context.span_id, span2.context.span_id) self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) @@ -501,9 +504,6 @@ async def request(channel): await run_with_test_server(request, servicer=AbortServicer()) spans_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans_list), 1) - child_span = spans_list[0] - self.assertEqual(len(spans_list), 1) span = spans_list[0] From 74c857c731a639997339010a378c08a8aeb2df37 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Fri, 2 Sep 2022 16:44:42 +0100 Subject: [PATCH 04/15] Add changelog entry --- CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a2b306157..35213c733d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - restoring metrics in django framework ([#1208](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1208)) - `opentelemetry-instrumentation-aiohttp-client` Fix producing additional spans with each newly created ClientSession -- ([#1246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1246)) + ([#1246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1246)) + +### Added +- `opentelemetry-instrumentation-grpc` Add grpc.aio support + ([#1245](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1245)) ## [1.12.0-0.33b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0-0.33b0) - 2022-08-08 From 780200115115c99192e728901313be07f8f23d36 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Wed, 21 Sep 2022 10:44:55 +0100 Subject: [PATCH 05/15] Fix conflict properly I missed a bit from previous merge conflict resolution.. --- .../src/opentelemetry/instrumentation/grpc/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 21bd9681a1..d55e99fff1 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -553,7 +553,6 @@ def server_interceptor(tracer_provider=None, filter_=None): tracer = trace.get_tracer(__name__, __version__, tracer_provider) -<<<<<<< HEAD return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_) From 9cc91b4f131d3f71746dab79bb64b00c459ec757 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Wed, 21 Sep 2022 21:38:22 +0100 Subject: [PATCH 06/15] Skip grpc.aio tests in Python 3.7 unittest.IsolatedAsyncioTestCase was introduced in Python 3.8. It's use simplifies the grpc.aio tests. Without it, the amount of test code increases significantly, with most of the additional code handling the asyncio set up. --- .../tests/test_aio_client_interceptor.py | 16 +++++++++++++++- .../tests/test_aio_server_interceptor.py | 17 ++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py index ecac8d6b00..6ca5ce92d5 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor.py @@ -11,7 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from unittest import IsolatedAsyncioTestCase +try: + from unittest import IsolatedAsyncioTestCase +except ImportError: + # unittest.IsolatedAsyncioTestCase was introduced in Python 3.8. It's use + # simplifies the following tests. Without it, the amount of test code + # increases significantly, with most of the additional code handling + # the asyncio set up. + from unittest import TestCase + + class IsolatedAsyncioTestCase(TestCase): + def run(self, result=None): + self.skipTest( + "This test requires Python 3.8 for unittest.IsolatedAsyncioTestCase" + ) + import grpc import pytest diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py index 4b6d5dd0a5..013c23944c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py @@ -12,7 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio -from unittest import IsolatedAsyncioTestCase + +try: + from unittest import IsolatedAsyncioTestCase +except ImportError: + # unittest.IsolatedAsyncioTestCase was introduced in Python 3.8. It's use + # simplifies the following tests. Without it, the amount of test code + # increases significantly, with most of the additional code handling + # the asyncio set up. + from unittest import TestCase + + class IsolatedAsyncioTestCase(TestCase): + def run(self, result=None): + self.skipTest( + "This test requires Python 3.8 for unittest.IsolatedAsyncioTestCase" + ) + import grpc import grpc.aio From 6c1d0a7912c9f5c028ccd714bf149c49d478d7a7 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Fri, 7 Oct 2022 20:37:31 +0100 Subject: [PATCH 07/15] Update filters to handle grpc.aio.ClientCallDetails This is the precursor for using the filter mechanisms with the aio interceptors. There's currently a bug in grpc python that means the the ClientCallDetails.method field is populated with bytes instead of a string. This code handles both cases so that it's forward-compatible with a future fixed grpc version. --- .../instrumentation/grpc/filters/__init__.py | 13 +++++++- .../tests/test_filters.py | 33 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py index 905bb8d696..728d850148 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py @@ -18,7 +18,10 @@ import grpc TCallDetails = TypeVar( - "TCallDetails", grpc.HandlerCallDetails, grpc.ClientCallDetails + "TCallDetails", + grpc.HandlerCallDetails, + grpc.ClientCallDetails, + grpc.aio.ClientCallDetails, ) Condition = Callable[[TCallDetails], bool] @@ -27,6 +30,14 @@ def _full_method(metadata): name = "" if isinstance(metadata, grpc.HandlerCallDetails): name = metadata.method + elif isinstance(metadata, grpc.aio.ClientCallDetails): + name = metadata.method + # name _should_ be a string here but due to a bug in grpc, it is + # populated with a bytes object. Handle both cases such that we + # are forward-compatible with a fixed version of grpc + # More info: https://github.com/grpc/grpc/issues/31092 + if isinstance(name, bytes): + name = name.decode() # NOTE: replace here if there's better way to match cases to handle # grpcext._interceptor._UnaryClientInfo/_StreamClientInfo elif hasattr(metadata, "full_method"): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_filters.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_filters.py index f7d69074ac..81cc689edd 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_filters.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_filters.py @@ -59,6 +59,39 @@ class _StreamClientInfo( invocation_metadata=[("tracer", "foo"), ("caller", "bar")], ), ), + ( + True, + "SimpleMethod", + grpc.aio.ClientCallDetails( + method="SimpleMethod", + timeout=3000, + metadata=None, + credentials=None, + wait_for_ready=None, + ), + ), + ( + True, + "SimpleMethod", + grpc.aio.ClientCallDetails( + method=b"SimpleMethod", + timeout=3000, + metadata=None, + credentials=None, + wait_for_ready=None, + ), + ), + ( + False, + "SimpleMethod", + grpc.aio.ClientCallDetails( + method="NotSimpleMethod", + timeout=3000, + metadata=None, + credentials=None, + wait_for_ready=None, + ), + ), ( False, "SimpleMethod", From cf01d06cb280f69abd5f07ed9ae013585c153ec4 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Fri, 7 Oct 2022 20:41:17 +0100 Subject: [PATCH 08/15] Update grpc.aio client interceptor to handle grpc bug ClientCallDetails.method _should_ be a string here but due to a bug in grpc, it is populated with a bytes object. Handle both cases such that we are forward-compatible with a fixed version of grpc More info: https://github.com/grpc/grpc/issues/31092 --- .../instrumentation/grpc/_aio_client.py | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index 6d8e0b4af5..de7272e757 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -84,6 +84,21 @@ def add_error_details_to_span(span, exc): ) span.record_exception(exc) + def _start_interceptor_span(self, method): + # method _should_ be a string here but due to a bug in grpc, it is + # populated with a bytes object. Handle both cases such that we + # are forward-compatible with a fixed version of grpc + # More info: https://github.com/grpc/grpc/issues/31092 + if isinstance(method, bytes): + method = method.decode() + + return self._start_span( + method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False + ) + async def _wrap_unary_response(self, continuation, span): try: call = await continuation() @@ -123,12 +138,8 @@ async def intercept_unary_unary( if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await continuation(client_call_details, request) - method = client_call_details.method.decode("utf-8") - with self._start_span( - method, - end_on_exit=False, - record_exception=False, - set_status_on_exception=False, + with self._start_interceptor_span( + client_call_details.method, ) as span: new_details = self.propagate_trace_in_details(client_call_details) @@ -150,12 +161,8 @@ async def intercept_unary_stream( if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await continuation(client_call_details, request) - method = client_call_details.method.decode("utf-8") - with self._start_span( - method, - end_on_exit=False, - record_exception=False, - set_status_on_exception=False, + with self._start_interceptor_span( + client_call_details.method, ) as span: new_details = self.propagate_trace_in_details(client_call_details) @@ -174,12 +181,8 @@ async def intercept_stream_unary( if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await continuation(client_call_details, request_iterator) - method = client_call_details.method.decode("utf-8") - with self._start_span( - method, - end_on_exit=False, - record_exception=False, - set_status_on_exception=False, + with self._start_interceptor_span( + client_call_details.method, ) as span: new_details = self.propagate_trace_in_details(client_call_details) @@ -201,12 +204,8 @@ async def intercept_stream_stream( if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await continuation(client_call_details, request_iterator) - method = client_call_details.method.decode("utf-8") - with self._start_span( - method, - end_on_exit=False, - record_exception=False, - set_status_on_exception=False, + with self._start_interceptor_span( + client_call_details.method, ) as span: new_details = self.propagate_trace_in_details(client_call_details) From 600ca83a9f8d6e886bfb8ce9ee24fa4a4fb7103b Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Mon, 10 Oct 2022 20:10:13 +0100 Subject: [PATCH 09/15] Add filter support to grpc.aio client interceptor --- .../instrumentation/grpc/__init__.py | 35 ++-- .../instrumentation/grpc/_aio_client.py | 17 +- .../test_aio_client_interceptor_filter.py | 174 ++++++++++++++++++ 3 files changed, 209 insertions(+), 17 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index d55e99fff1..94b33da9e7 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -477,18 +477,31 @@ class GrpcAioInstrumentorClient(BaseInstrumentor): # pylint:disable=attribute-defined-outside-init, redefined-outer-name + def __init__(self, filter_=None): + excluded_service_filter = _excluded_service_filter() + if excluded_service_filter is not None: + if filter_ is None: + filter_ = excluded_service_filter + else: + filter_ = any_of(filter_, excluded_service_filter) + self._filter = filter_ + def instrumentation_dependencies(self) -> Collection[str]: return _instruments def _add_interceptors(self, tracer_provider, kwargs): if "interceptors" in kwargs and kwargs["interceptors"]: kwargs["interceptors"] = ( - aio_client_interceptors(tracer_provider=tracer_provider) + aio_client_interceptors( + tracer_provider=tracer_provider, + filter_=self._filter + ) + kwargs["interceptors"] ) else: kwargs["interceptors"] = aio_client_interceptors( - tracer_provider=tracer_provider + tracer_provider=tracer_provider, + filter_=self._filter ) return kwargs @@ -556,7 +569,7 @@ def server_interceptor(tracer_provider=None, filter_=None): return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_) -def aio_client_interceptors(tracer_provider=None): +def aio_client_interceptors(tracer_provider=None, filter_=None): """Create a gRPC client channel interceptor. Args: @@ -570,10 +583,10 @@ def aio_client_interceptors(tracer_provider=None): tracer = trace.get_tracer(__name__, __version__, tracer_provider) return [ - _aio_client.UnaryUnaryAioClientInterceptor(tracer), - _aio_client.UnaryStreamAioClientInterceptor(tracer), - _aio_client.StreamUnaryAioClientInterceptor(tracer), - _aio_client.StreamStreamAioClientInterceptor(tracer), + _aio_client.UnaryUnaryAioClientInterceptor(tracer, filter_=filter_), + _aio_client.UnaryStreamAioClientInterceptor(tracer, filter_=filter_), + _aio_client.StreamUnaryAioClientInterceptor(tracer, filter_=filter_), + _aio_client.StreamStreamAioClientInterceptor(tracer, filter_=filter_), ] @@ -594,9 +607,7 @@ def aio_server_interceptor(tracer_provider=None): def _excluded_service_filter() -> Union[Callable[[object], bool], None]: - services = _parse_services( - os.environ.get("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES", "") - ) + services = _parse_services(os.environ.get("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES", "")) if len(services) == 0: return None filters = (service_name(srv) for srv in services) @@ -605,9 +616,7 @@ def _excluded_service_filter() -> Union[Callable[[object], bool], None]: def _parse_services(excluded_services: str) -> List[str]: if excluded_services != "": - excluded_service_list = [ - s.strip() for s in excluded_services.split(",") - ] + excluded_service_list = [s.strip() for s in excluded_services.split(",")] else: excluded_service_list = [] return excluded_service_list diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index de7272e757..47d16b4ed0 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -128,6 +128,15 @@ async def _wrap_stream_response(self, span, call): span.end() + def tracing_skipped(self, client_call_details): + return context.get_value( + _SUPPRESS_INSTRUMENTATION_KEY + ) or not self.rpc_matches_filters(client_call_details) + + def rpc_matches_filters(self, client_call_details): + return self._filter is None or self._filter(client_call_details) + + class UnaryUnaryAioClientInterceptor( grpc.aio.UnaryUnaryClientInterceptor, _BaseAioClientInterceptor, @@ -135,7 +144,7 @@ class UnaryUnaryAioClientInterceptor( async def intercept_unary_unary( self, continuation, client_call_details, request ): - if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request) with self._start_interceptor_span( @@ -158,7 +167,7 @@ class UnaryStreamAioClientInterceptor( async def intercept_unary_stream( self, continuation, client_call_details, request ): - if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request) with self._start_interceptor_span( @@ -178,7 +187,7 @@ class StreamUnaryAioClientInterceptor( async def intercept_stream_unary( self, continuation, client_call_details, request_iterator ): - if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request_iterator) with self._start_interceptor_span( @@ -201,7 +210,7 @@ class StreamStreamAioClientInterceptor( async def intercept_stream_stream( self, continuation, client_call_details, request_iterator ): - if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request_iterator) with self._start_interceptor_span( diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py new file mode 100644 index 0000000000..3ac6cce28c --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py @@ -0,0 +1,174 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +try: + from unittest import IsolatedAsyncioTestCase +except ImportError: + # unittest.IsolatedAsyncioTestCase was introduced in Python 3.8. It's use + # simplifies the following tests. Without it, the amount of test code + # increases significantly, with most of the additional code handling + # the asyncio set up. + from unittest import TestCase + + class IsolatedAsyncioTestCase(TestCase): + def run(self, result=None): + self.skipTest( + "This test requires Python 3.8 for unittest.IsolatedAsyncioTestCase" + ) + +from unittest import mock + +import os +import grpc +import pytest + +import opentelemetry.instrumentation.grpc +from opentelemetry import context, trace +from opentelemetry.instrumentation.grpc import ( + GrpcAioInstrumentorClient, + aio_client_interceptors, + filters, +) +from opentelemetry.instrumentation.grpc._aio_client import ( + UnaryUnaryAioClientInterceptor, +) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase + +from ._aio_client import ( + bidirectional_streaming_method, + client_streaming_method, + server_streaming_method, + simple_method, +) +from ._server import create_test_server +from .protobuf import test_server_pb2_grpc # pylint: disable=no-name-in-module + + +@pytest.mark.asyncio +class TestAioClientInterceptorFiltered(TestBase, IsolatedAsyncioTestCase): + def setUp(self): + super().setUp() + self.server = create_test_server(25565) + self.server.start() + + interceptors = aio_client_interceptors( + filter_=filters.method_name("NotSimpleMethod") + ) + self._channel = grpc.aio.insecure_channel( + "localhost:25565", interceptors=interceptors + ) + + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self._channel) + + def tearDown(self): + super().tearDown() + self.server.stop(1000) + + async def asyncTearDown(self): + await self._channel.close() + + async def test_instrument_filtered(self): + instrumentor = GrpcAioInstrumentorClient( + filter_=filters.method_name("NotSimpleMethod") + ) + + try: + instrumentor.instrument() + + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + response = await simple_method(stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + finally: + instrumentor.uninstrument() + + async def test_instrument_filtered_env(self): + with mock.patch.dict( + os.environ, + { + "OTEL_PYTHON_GRPC_EXCLUDED_SERVICES": "GRPCMockServer,GRPCTestServer" + }, + ): + instrumentor = GrpcAioInstrumentorClient() + + try: + instrumentor.instrument() + + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + response = await simple_method(stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + finally: + instrumentor.uninstrument() + + async def test_instrument_filtered_env_and_option(self): + with mock.patch.dict( + os.environ, + { + "OTEL_PYTHON_GRPC_EXCLUDED_SERVICES": "GRPCMockServer" + }, + ): + instrumentor = GrpcAioInstrumentorClient( + filter_=filters.service_prefix("GRPCTestServer") + ) + + try: + instrumentor.instrument() + + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + response = await simple_method(stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + finally: + instrumentor.uninstrument() + + async def test_unary_unary_filtered(self): + response = await simple_method(self._stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + async def test_unary_stream_filtered(self): + async for response in server_streaming_method(self._stub): + self.assertEqual(response.response_data, "data") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + async def test_stream_unary_filtered(self): + response = await client_streaming_method(self._stub) + assert response.response_data == "data" + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) + + async def test_stream_stream_filtered(self): + async for response in bidirectional_streaming_method(self._stub): + self.assertEqual(response.response_data, "data") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 0) From 66227743c198582961d84c5a3bbf49f4347e234d Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Mon, 10 Oct 2022 20:49:14 +0100 Subject: [PATCH 10/15] Add basic server filtering test --- .../instrumentation/grpc/__init__.py | 17 +++- .../instrumentation/grpc/_aio_server.py | 3 + .../tests/test_aio_server_interceptor.py | 62 +++++++------- .../test_aio_server_interceptor_filter.py | 82 +++++++++++++++++++ 4 files changed, 130 insertions(+), 34 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 94b33da9e7..b2bcabd0b6 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -368,6 +368,15 @@ class GrpcAioInstrumentorServer(BaseInstrumentor): # pylint:disable=attribute-defined-outside-init, redefined-outer-name + def __init__(self, filter_=None): + excluded_service_filter = _excluded_service_filter() + if excluded_service_filter is not None: + if filter_ is None: + filter_ = excluded_service_filter + else: + filter_ = any_of(filter_, excluded_service_filter) + self._filter = filter_ + def instrumentation_dependencies(self) -> Collection[str]: return _instruments @@ -379,11 +388,11 @@ def server(*args, **kwargs): if "interceptors" in kwargs: # add our interceptor as the first kwargs["interceptors"].insert( - 0, aio_server_interceptor(tracer_provider=tracer_provider) + 0, aio_server_interceptor(tracer_provider=tracer_provider, filter_=self._filter) ) else: kwargs["interceptors"] = [ - aio_server_interceptor(tracer_provider=tracer_provider) + aio_server_interceptor(tracer_provider=tracer_provider, filter_=self._filter) ] return self._original_func(*args, **kwargs) @@ -590,7 +599,7 @@ def aio_client_interceptors(tracer_provider=None, filter_=None): ] -def aio_server_interceptor(tracer_provider=None): +def aio_server_interceptor(tracer_provider=None, filter_=None): """Create a gRPC aio server interceptor. Args: @@ -603,7 +612,7 @@ def aio_server_interceptor(tracer_provider=None): tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return _aio_server.OpenTelemetryAioServerInterceptor(tracer) + return _aio_server.OpenTelemetryAioServerInterceptor(tracer, filter_=filter_) def _excluded_service_filter() -> Union[Callable[[object], bool], None]: diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index 10e4c8c62f..4835fb1226 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -37,6 +37,9 @@ class OpenTelemetryAioServerInterceptor( """ async def intercept_service(self, continuation, handler_call_details): + if self._filter is not None and not self._filter(handler_call_details): + return continuation(handler_call_details) + def telemetry_wrapper(behavior, request_streaming, response_streaming): # handle streaming responses specially if response_streaming: diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py index 013c23944c..a4075fe727 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py @@ -101,43 +101,45 @@ async def test_instrumentor(self): rpc_call = "/GRPCTestServer/SimpleMethod" grpc_aio_server_instrumentor = GrpcAioInstrumentorServer() - grpc_aio_server_instrumentor.instrument() + try: + grpc_aio_server_instrumentor.instrument() - async def request(channel): - request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() - return await channel.unary_unary(rpc_call)(msg) + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) - await run_with_test_server(request, add_interceptor=False) + await run_with_test_server(request, add_interceptor=False) - spans_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans_list), 1) - span = spans_list[0] + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] - self.assertEqual(span.name, rpc_call) - self.assertIs(span.kind, trace.SpanKind.SERVER) + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) - # Check version and name in span's instrumentation info - self.assertEqualSpanInstrumentationInfo( - span, opentelemetry.instrumentation.grpc - ) + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) - # Check attributes - self.assertSpanHasAttributes( - span, - { - SpanAttributes.NET_PEER_IP: "[::1]", - SpanAttributes.NET_PEER_NAME: "localhost", - SpanAttributes.RPC_METHOD: "SimpleMethod", - SpanAttributes.RPC_SERVICE: "GRPCTestServer", - SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ - 0 - ], - }, - ) + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) - grpc_aio_server_instrumentor.uninstrument() + finally: + grpc_aio_server_instrumentor.uninstrument() async def test_uninstrument(self): """Check that uninstrument removes the interceptor""" diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py new file mode 100644 index 0000000000..3dca431ef2 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py @@ -0,0 +1,82 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio + +try: + from unittest import IsolatedAsyncioTestCase +except ImportError: + # unittest.IsolatedAsyncioTestCase was introduced in Python 3.8. It's use + # simplifies the following tests. Without it, the amount of test code + # increases significantly, with most of the additional code handling + # the asyncio set up. + from unittest import TestCase + + class IsolatedAsyncioTestCase(TestCase): + def run(self, result=None): + self.skipTest( + "This test requires Python 3.8 for unittest.IsolatedAsyncioTestCase" + ) + + +import grpc +import grpc.aio +import pytest + +import opentelemetry.instrumentation.grpc +from opentelemetry import trace +from opentelemetry.instrumentation.grpc import ( + GrpcAioInstrumentorServer, + aio_server_interceptor, + filters, +) +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import StatusCode + +from .protobuf.test_server_pb2 import Request, Response +from .protobuf.test_server_pb2_grpc import ( + GRPCTestServerServicer, + add_GRPCTestServerServicer_to_server, +) + +from .test_aio_server_interceptor import Servicer, run_with_test_server + +# pylint:disable=unused-argument +# pylint:disable=no-self-use + +@pytest.mark.asyncio +class TestOpenTelemetryAioServerInterceptor(TestBase, IsolatedAsyncioTestCase): + async def test_instrumentor(self): + """Check that automatic instrumentation configures the interceptor""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + grpc_aio_server_instrumentor = GrpcAioInstrumentorServer( + filter_=filters.method_name("NotSimpleMethod") + ) + try: + grpc_aio_server_instrumentor.instrument() + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + await run_with_test_server(request, add_interceptor=False) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + finally: + grpc_aio_server_instrumentor.uninstrument() From db4f86e945ba068021c578d4718480f8ff14f453 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Mon, 10 Oct 2022 20:50:25 +0100 Subject: [PATCH 11/15] Format code with black --- .../instrumentation/grpc/__init__.py | 27 ++++++++++++------- .../instrumentation/grpc/_aio_client.py | 3 +-- .../test_aio_client_interceptor_filter.py | 5 ++-- .../test_aio_server_interceptor_filter.py | 1 + 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index b2bcabd0b6..25010e147b 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -388,11 +388,16 @@ def server(*args, **kwargs): if "interceptors" in kwargs: # add our interceptor as the first kwargs["interceptors"].insert( - 0, aio_server_interceptor(tracer_provider=tracer_provider, filter_=self._filter) + 0, + aio_server_interceptor( + tracer_provider=tracer_provider, filter_=self._filter + ), ) else: kwargs["interceptors"] = [ - aio_server_interceptor(tracer_provider=tracer_provider, filter_=self._filter) + aio_server_interceptor( + tracer_provider=tracer_provider, filter_=self._filter + ) ] return self._original_func(*args, **kwargs) @@ -502,15 +507,13 @@ def _add_interceptors(self, tracer_provider, kwargs): if "interceptors" in kwargs and kwargs["interceptors"]: kwargs["interceptors"] = ( aio_client_interceptors( - tracer_provider=tracer_provider, - filter_=self._filter + tracer_provider=tracer_provider, filter_=self._filter ) + kwargs["interceptors"] ) else: kwargs["interceptors"] = aio_client_interceptors( - tracer_provider=tracer_provider, - filter_=self._filter + tracer_provider=tracer_provider, filter_=self._filter ) return kwargs @@ -612,11 +615,15 @@ def aio_server_interceptor(tracer_provider=None, filter_=None): tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return _aio_server.OpenTelemetryAioServerInterceptor(tracer, filter_=filter_) + return _aio_server.OpenTelemetryAioServerInterceptor( + tracer, filter_=filter_ + ) def _excluded_service_filter() -> Union[Callable[[object], bool], None]: - services = _parse_services(os.environ.get("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES", "")) + services = _parse_services( + os.environ.get("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES", "") + ) if len(services) == 0: return None filters = (service_name(srv) for srv in services) @@ -625,7 +632,9 @@ def _excluded_service_filter() -> Union[Callable[[object], bool], None]: def _parse_services(excluded_services: str) -> List[str]: if excluded_services != "": - excluded_service_list = [s.strip() for s in excluded_services.split(",")] + excluded_service_list = [ + s.strip() for s in excluded_services.split(",") + ] else: excluded_service_list = [] return excluded_service_list diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index 47d16b4ed0..c7630bfe9f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -96,7 +96,7 @@ def _start_interceptor_span(self, method): method, end_on_exit=False, record_exception=False, - set_status_on_exception=False + set_status_on_exception=False, ) async def _wrap_unary_response(self, continuation, span): @@ -127,7 +127,6 @@ async def _wrap_stream_response(self, span, call): finally: span.end() - def tracing_skipped(self, client_call_details): return context.get_value( _SUPPRESS_INSTRUMENTATION_KEY diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py index 3ac6cce28c..ac1928aeea 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py @@ -26,6 +26,7 @@ def run(self, result=None): "This test requires Python 3.8 for unittest.IsolatedAsyncioTestCase" ) + from unittest import mock import os @@ -123,9 +124,7 @@ async def test_instrument_filtered_env(self): async def test_instrument_filtered_env_and_option(self): with mock.patch.dict( os.environ, - { - "OTEL_PYTHON_GRPC_EXCLUDED_SERVICES": "GRPCMockServer" - }, + {"OTEL_PYTHON_GRPC_EXCLUDED_SERVICES": "GRPCMockServer"}, ): instrumentor = GrpcAioInstrumentorClient( filter_=filters.service_prefix("GRPCTestServer") diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py index 3dca431ef2..5966c1e107 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py @@ -56,6 +56,7 @@ def run(self, result=None): # pylint:disable=unused-argument # pylint:disable=no-self-use + @pytest.mark.asyncio class TestOpenTelemetryAioServerInterceptor(TestBase, IsolatedAsyncioTestCase): async def test_instrumentor(self): From 5de1a4b3bac4fa3df8318c22e9a8bc147235b8d8 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Mon, 10 Oct 2022 22:44:49 +0100 Subject: [PATCH 12/15] Re-order imports with isort This _should_ allow CI to pass now.. --- .../tests/test_aio_client_interceptor_filter.py | 2 +- .../tests/test_aio_server_interceptor_filter.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py index ac1928aeea..00f865d7dd 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py @@ -27,9 +27,9 @@ def run(self, result=None): ) +import os from unittest import mock -import os import grpc import pytest diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py index 5966c1e107..3ab883e10b 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py @@ -50,7 +50,6 @@ def run(self, result=None): GRPCTestServerServicer, add_GRPCTestServerServicer_to_server, ) - from .test_aio_server_interceptor import Servicer, run_with_test_server # pylint:disable=unused-argument From 48248229cc13d5ee34685fabc30f6bb05f2b7dd4 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Wed, 12 Oct 2022 14:46:55 +0000 Subject: [PATCH 13/15] Add additional tests for aio server filtering --- .../instrumentation/grpc/_aio_server.py | 2 +- .../instrumentation/grpc/filters/__init__.py | 7 ++ .../test_aio_server_interceptor_filter.py | 68 +++++++++++++++++-- 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index 4835fb1226..d64dcf000b 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -38,7 +38,7 @@ class OpenTelemetryAioServerInterceptor( async def intercept_service(self, continuation, handler_call_details): if self._filter is not None and not self._filter(handler_call_details): - return continuation(handler_call_details) + return await continuation(handler_call_details) def telemetry_wrapper(behavior, request_streaming, response_streaming): # handle streaming responses specially diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py index 728d850148..8100a2d17f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/filters/__init__.py @@ -42,6 +42,13 @@ def _full_method(metadata): # grpcext._interceptor._UnaryClientInfo/_StreamClientInfo elif hasattr(metadata, "full_method"): name = metadata.full_method + # NOTE: this is to handle the grpc.aio Server case. The type interface + # indicates that metadata should be a grpc.HandlerCallDetails and be + # matched prior to this but it is in fact an internal C-extension level + # object. + elif hasattr(metadata, "method"): + name = metadata.method + return name diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py index 3ab883e10b..e26b3f6cfe 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import asyncio - try: from unittest import IsolatedAsyncioTestCase except ImportError: @@ -28,12 +26,10 @@ def run(self, result=None): "This test requires Python 3.8 for unittest.IsolatedAsyncioTestCase" ) - import grpc import grpc.aio import pytest -import opentelemetry.instrumentation.grpc from opentelemetry import trace from opentelemetry.instrumentation.grpc import ( GrpcAioInstrumentorServer, @@ -50,12 +46,33 @@ def run(self, result=None): GRPCTestServerServicer, add_GRPCTestServerServicer_to_server, ) -from .test_aio_server_interceptor import Servicer, run_with_test_server +from .test_aio_server_interceptor import Servicer # pylint:disable=unused-argument # pylint:disable=no-self-use +async def run_with_test_server( + runnable, filter_=None, servicer=Servicer(), add_interceptor=True +): + if add_interceptor: + interceptors = [aio_server_interceptor(filter_=filter_)] + server = grpc.aio.server(interceptors=interceptors) + else: + server = grpc.aio.server() + + add_GRPCTestServerServicer_to_server(servicer, server) + + port = server.add_insecure_port("[::]:0") + channel = grpc.aio.insecure_channel(f"localhost:{port:d}") + + await server.start() + resp = await runnable(channel) + await server.stop(1000) + + return resp + + @pytest.mark.asyncio class TestOpenTelemetryAioServerInterceptor(TestBase, IsolatedAsyncioTestCase): async def test_instrumentor(self): @@ -80,3 +97,44 @@ async def request(channel): finally: grpc_aio_server_instrumentor.uninstrument() + + async def test_create_span(self): + """ + Check that the interceptor wraps calls with spans server-side when filter + passed and RPC matches the filter. + """ + rpc_call = "/GRPCTestServer/SimpleMethod" + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + await run_with_test_server( + request, + filter_=filters.method_name("SimpleMethod"), + ) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + async def test_create_span_filtered(self): + """Check that the interceptor wraps calls with spans server-side.""" + rpc_call = "/GRPCTestServer/SimpleMethod" + + async def request(channel): + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + return await channel.unary_unary(rpc_call)(msg) + + await run_with_test_server( + request, + filter_=filters.method_name("NotSimpleMethod"), + ) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) From 94ba2f7f631f727c4bd73db18228b202e55369c5 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Thu, 13 Oct 2022 12:44:40 +0000 Subject: [PATCH 14/15] Reformat code to pass linting --- .../tests/test_aio_client_interceptor_filter.py | 6 ------ .../tests/test_aio_server_interceptor_filter.py | 11 +++-------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py index 00f865d7dd..b8c408c6cf 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_filter.py @@ -33,17 +33,11 @@ def run(self, result=None): import grpc import pytest -import opentelemetry.instrumentation.grpc -from opentelemetry import context, trace from opentelemetry.instrumentation.grpc import ( GrpcAioInstrumentorClient, aio_client_interceptors, filters, ) -from opentelemetry.instrumentation.grpc._aio_client import ( - UnaryUnaryAioClientInterceptor, -) -from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase from ._aio_client import ( diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py index e26b3f6cfe..837d9c7618 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_filter.py @@ -26,6 +26,7 @@ def run(self, result=None): "This test requires Python 3.8 for unittest.IsolatedAsyncioTestCase" ) + import grpc import grpc.aio import pytest @@ -36,16 +37,10 @@ def run(self, result=None): aio_server_interceptor, filters, ) -from opentelemetry.sdk import trace as trace_sdk -from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase -from opentelemetry.trace import StatusCode -from .protobuf.test_server_pb2 import Request, Response -from .protobuf.test_server_pb2_grpc import ( - GRPCTestServerServicer, - add_GRPCTestServerServicer_to_server, -) +from .protobuf.test_server_pb2 import Request +from .protobuf.test_server_pb2_grpc import add_GRPCTestServerServicer_to_server from .test_aio_server_interceptor import Servicer # pylint:disable=unused-argument From fe3cb189537adbd2be9e77f2210f7cf0dbe484bf Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Mon, 31 Oct 2022 15:25:10 +0000 Subject: [PATCH 15/15] Update CHANGELOG.md Add note about adding grpc.aio support --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cbadf6ae16..1fab9ff23d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1413](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1413)) - `opentelemetry-instrumentation-pyramid` Add support for regular expression matching and sanitization of HTTP headers. ([#1414](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1414)) +- `opentelemetry-instrumentation-grpc` Add support for grpc.aio Clients and Servers + ([#1245](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1245)) ### Fixed