From e9546addfb40af4c58eb0a2421164c1a2eccb674 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Thu, 24 Sep 2020 23:55:30 -0400 Subject: [PATCH 01/17] Initial rewrite of the grpc interceptor --- .../instrumentation/grpc/__init__.py | 37 +-- .../instrumentation/grpc/_server.py | 245 ++++++++++-------- .../instrumentation/grpc/grpcext/__init__.py | 203 ++++----------- .../grpc/grpcext/_interceptor.py | 177 ------------- 4 files changed, 202 insertions(+), 460 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 58d2ebbb1ee..a62132c5319 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -77,8 +77,7 @@ def run(): import grpc from opentelemetry import trace - from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer, server_interceptor - from opentelemetry.instrumentation.grpc.grpcext import intercept_server + from opentelemetry.instrumentation.grpc import server_interceptor from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( ConsoleSpanExporter, @@ -94,8 +93,6 @@ def run(): trace.get_tracer_provider().add_span_processor( SimpleExportSpanProcessor(ConsoleSpanExporter()) ) - grpc_server_instrumentor = GrpcInstrumentorServer() - grpc_server_instrumentor.instrument() class Greeter(helloworld_pb2_grpc.GreeterServicer): @@ -105,8 +102,8 @@ def SayHello(self, request, context): def serve(): - server = grpc.server(futures.ThreadPoolExecutor()) - server = intercept_server(server, server_interceptor()) + server = grpc.server(futures.ThreadPoolExecutor(), + interceptors = [server_interceptor()]) helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) server.add_insecure_port("[::]:50051") @@ -118,17 +115,14 @@ def serve(): logging.basicConfig() serve() """ -from contextlib import contextmanager from functools import partial import grpc from wrapt import wrap_function_wrapper as _wrap from opentelemetry import trace -from opentelemetry.instrumentation.grpc.grpcext import ( - intercept_channel, - intercept_server, -) +from opentelemetry.instrumentation.grpc._server import OpenTelemetryServerInterceptor +from opentelemetry.instrumentation.grpc.grpcext import intercept_channel from opentelemetry.instrumentation.grpc.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap @@ -138,17 +132,12 @@ def serve(): # pylint:disable=unused-argument # isort:skip - -class GrpcInstrumentorServer(BaseInstrumentor): - def _instrument(self, **kwargs): - _wrap("grpc", "server", self.wrapper_fn) - - def _uninstrument(self, **kwargs): - unwrap(grpc, "server") - - def wrapper_fn(self, original_func, instance, args, kwargs): - server = original_func(*args, **kwargs) - return intercept_server(server, server_interceptor()) +__all__ = [ + "GrpcInstrumentorClient", + "OpenTelemetryServerInterceptor", + "client_interceptor", + "server_interceptor", +] class GrpcInstrumentorClient(BaseInstrumentor): @@ -218,8 +207,6 @@ def server_interceptor(tracer_provider=None): Returns: A service-side interceptor object. """ - from . import _server - tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return _server.OpenTelemetryServerInterceptor(tracer) + return OpenTelemetryServerInterceptor(tracer) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index cb0e997d367..f4f2607b6d2 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -17,10 +17,8 @@ # pylint:disable=no-member # pylint:disable=signature-differs -"""Implementation of the service-side open-telemetry interceptor. - -This library borrows heavily from the OpenTracing gRPC integration: -https://github.com/opentracing-contrib/python-grpc +""" +Implementation of the service-side open-telemetry interceptor. """ from contextlib import contextmanager @@ -30,9 +28,33 @@ from opentelemetry import propagators, trace from opentelemetry.context import attach, detach - -from . import grpcext -from ._utilities import RpcInfo +from opentelemetry.trace.status import Status, StatusCanonicalCode + + +# wrap an RPC call +# see https://github.com/grpc/grpc/issues/18191 +def _wrap_rpc_behavior(handler, fn): + if handler is None: + return None + + if handler.request_streaming and handler.response_streaming: + behavior_fn = handler.stream_stream + handler_factory = grpc.stream_stream_rpc_method_handler + elif handler.request_streaming and not handler.response_streaming: + behavior_fn = handler.stream_unary + handler_factory = grpc.stream_unary_rpc_method_handler + elif not handler.request_streaming and handler.response_streaming: + behavior_fn = handler.unary_stream + handler_factory = grpc.unary_stream_rpc_method_handler + else: + behavior_fn = handler.unary_unary + handler_factory = grpc.unary_unary_rpc_method_handler + + return handler_factory( + fn(behavior_fn, handler.request_streaming, handler.response_streaming), + request_deserializer=handler.request_deserializer, + response_serializer=handler.response_serializer, + ) # pylint:disable=abstract-method @@ -42,7 +64,7 @@ def __init__(self, servicer_context, active_span): self._active_span = active_span self.code = grpc.StatusCode.OK self.details = None - super(_OpenTelemetryServicerContext, self).__init__() + super().__init__() def is_active(self, *args, **kwargs): return self._servicer_context.is_active(*args, **kwargs) @@ -56,20 +78,26 @@ def cancel(self, *args, **kwargs): def add_callback(self, *args, **kwargs): return self._servicer_context.add_callback(*args, **kwargs) + def disable_next_message_compression(self): + return self._service_context.disable_next_message_compression() + def invocation_metadata(self, *args, **kwargs): return self._servicer_context.invocation_metadata(*args, **kwargs) - def peer(self, *args, **kwargs): - return self._servicer_context.peer(*args, **kwargs) + def peer(self): + return self._servicer_context.peer() + + def peer_identities(self): + return self._servicer_context.peer_identities() - def peer_identities(self, *args, **kwargs): - return self._servicer_context.peer_identities(*args, **kwargs) + def peer_identity_key(self): + return self._servicer_context.peer_identity_key() - def peer_identity_key(self, *args, **kwargs): - return self._servicer_context.peer_identity_key(*args, **kwargs) + def auth_context(self): + return self._servicer_context.auth_context() - def auth_context(self, *args, **kwargs): - return self._servicer_context.auth_context(*args, **kwargs) + def set_compression(self, compression): + return self._servicer_context.set_compression(compression) def send_initial_metadata(self, *args, **kwargs): return self._servicer_context.send_initial_metadata(*args, **kwargs) @@ -77,47 +105,58 @@ def send_initial_metadata(self, *args, **kwargs): def set_trailing_metadata(self, *args, **kwargs): return self._servicer_context.set_trailing_metadata(*args, **kwargs) - def abort(self, *args, **kwargs): - if not hasattr(self._servicer_context, "abort"): - raise RuntimeError( - "abort() is not supported with the installed version of grpcio" - ) - return self._servicer_context.abort(*args, **kwargs) - - def abort_with_status(self, *args, **kwargs): - if not hasattr(self._servicer_context, "abort_with_status"): - raise RuntimeError( - "abort_with_status() is not supported with the installed " - "version of grpcio" - ) - return self._servicer_context.abort_with_status(*args, **kwargs) + def abort(self, code, details): + self._active_span.set_attribute("grpc.status_code", code.name) + self._active_span.set_attribute("grpc.details", details) + self._active_span.set_status( + Status(canonical_code=StatusCanonicalCode(code.value[0]), description=details) + ) + return self._servicer_context.abort(code, details) + + def abort_with_status(self, status): + return self._servicer_context.abort_with_status(status) def set_code(self, code): self.code = code + # use details if we already have it, otherwise the status description + details = self.details or code.value[1] + self._active_span.set_attribute("grpc.status_code", code.name) + self._active_span.set_status( + Status(canonical_code=StatusCanonicalCode(code.value[0]), description=details) + ) return self._servicer_context.set_code(code) def set_details(self, details): self.details = details + self._active_span.set_attribute("grpc.details", details) + self._active_span.set_status( + Status(canonical_code=StatusCanonicalCode(self.code.value[0]), description=details) + ) return self._servicer_context.set_details(details) -# On the service-side, errors can be signaled either by exceptions or by -# calling `set_code` on the `servicer_context`. This function checks for the -# latter and updates the span accordingly. -# pylint:disable=unused-argument -def _check_error_code(span, servicer_context, rpc_info): - if servicer_context.code != grpc.StatusCode.OK: - rpc_info.error = servicer_context.code +class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): + """ + A gRPC server interceptor, to add OpenTelemetry. + + Usage:: + + tracer = some OpenTelemetry tracer + interceptors = [ + OpenTelemetryServerInterceptor(tracer), + ] + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=concurrency), + interceptors = interceptors) + + """ -class OpenTelemetryServerInterceptor( - grpcext.UnaryServerInterceptor, grpcext.StreamServerInterceptor -): def __init__(self, tracer): self._tracer = tracer @contextmanager - # pylint:disable=no-self-use def _set_remote_context(self, servicer_context): metadata = servicer_context.invocation_metadata() if metadata: @@ -136,74 +175,58 @@ def get_from_grpc_metadata(metadata, key) -> List[str]: else: yield - def _start_span(self, method): - span = self._tracer.start_as_current_span( - name=method, kind=trace.SpanKind.SERVER + def _start_span(self, handler_call_details, context): + + attributes = { + "grpc.method": handler_call_details.method, + } + + metadata = dict(context.invocation_metadata()) + if 'user-agent' in metadata: + attributes["grpc.user_agent"] = metadata['user-agent'] + + # Split up the peer to keep with how other telemetry sources + # do it. This looks like ipv6:[::1]:57284 or ipv4:127.0.0.1:57284. + peer = context.peer().split(':', 1)[1] + host, port = peer.rsplit(':', 1) + + # other telemetry sources convert this, so we will too + if host == "[::1]" or host == "127.0.0.1": + host = "localhost" + + attributes.update({ + "net.peer.name": host, + "net.peer.port": port, + }) + + return self._tracer.start_as_current_span( + name=handler_call_details.method, + kind=trace.SpanKind.SERVER, + attributes=attributes, + ) + + def intercept_service(self, continuation, handler_call_details): + def telemetry_wrapper(behavior, request_streaming, response_streaming): + def telemetry_interceptor(request_or_iterator, context): + + with self._set_remote_context(context): + with self._start_span(handler_call_details, context) as span: + # wrap the context + context = _OpenTelemetryServicerContext(context, span) + + # And now we run the actual RPC. + try: + return behavior(request_or_iterator, context) + except Exception as e: + # Bare exceptions are likely to be gRPC aborts, which + # we handle in our context wrapper. + # Here, we're interested in uncaught exceptions. + if type(e) != Exception: + span.set_attribute("error", repr(e)) + raise e + + return telemetry_interceptor + + return _wrap_rpc_behavior( + continuation(handler_call_details), telemetry_wrapper ) - return span - - def intercept_unary(self, request, servicer_context, server_info, handler): - - with self._set_remote_context(servicer_context): - with self._start_span(server_info.full_method) as span: - rpc_info = RpcInfo( - full_method=server_info.full_method, - metadata=servicer_context.invocation_metadata(), - timeout=servicer_context.time_remaining(), - request=request, - ) - servicer_context = _OpenTelemetryServicerContext( - servicer_context, span - ) - response = handler(request, servicer_context) - - _check_error_code(span, servicer_context, rpc_info) - - rpc_info.response = response - - return response - - # For RPCs that stream responses, the result can be a generator. To record - # the span across the generated responses and detect any errors, we wrap - # the result in a new generator that yields the response values. - def _intercept_server_stream( - self, request_or_iterator, servicer_context, server_info, handler - ): - with self._set_remote_context(servicer_context): - with self._start_span(server_info.full_method) as span: - rpc_info = RpcInfo( - full_method=server_info.full_method, - metadata=servicer_context.invocation_metadata(), - timeout=servicer_context.time_remaining(), - ) - if not server_info.is_client_stream: - rpc_info.request = request_or_iterator - servicer_context = _OpenTelemetryServicerContext( - servicer_context, span - ) - result = handler(request_or_iterator, servicer_context) - for response in result: - yield response - _check_error_code(span, servicer_context, rpc_info) - - def intercept_stream( - self, request_or_iterator, servicer_context, server_info, handler - ): - if server_info.is_server_stream: - return self._intercept_server_stream( - request_or_iterator, servicer_context, server_info, handler - ) - with self._set_remote_context(servicer_context): - with self._start_span(server_info.full_method) as span: - rpc_info = RpcInfo( - full_method=server_info.full_method, - metadata=servicer_context.invocation_metadata(), - timeout=servicer_context.time_remaining(), - ) - servicer_context = _OpenTelemetryServicerContext( - servicer_context, span - ) - response = handler(request_or_iterator, servicer_context) - _check_error_code(span, servicer_context, rpc_info) - rpc_info.response = response - return response diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py index fe83467a70a..d5e2549bab8 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py @@ -21,32 +21,32 @@ class UnaryClientInfo(abc.ABC): """Consists of various information about a unary RPC on the - invocation-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled, or None if this method should block until - the computation is terminated or is cancelled no matter how long that - takes. - """ + invocation-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + """ class StreamClientInfo(abc.ABC): """Consists of various information about a stream RPC on the - invocation-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - is_client_stream: Indicates whether the RPC is client-streaming. - is_server_stream: Indicates whether the RPC is server-streaming. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled, or None if this method should block until - the computation is terminated or is cancelled no matter how long that - takes. - """ + invocation-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + is_client_stream: Indicates whether the RPC is client-streaming. + is_server_stream: Indicates whether the RPC is server-streaming. + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + """ class UnaryClientInterceptor(abc.ABC): @@ -56,18 +56,18 @@ class UnaryClientInterceptor(abc.ABC): def intercept_unary(self, request, metadata, client_info, invoker): """Intercepts unary-unary RPCs on the invocation-side. - Args: - request: The request value for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - client_info: A UnaryClientInfo containing various information about - the RPC. - invoker: The handler to complete the RPC on the client. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling invoker(request, metadata). - """ + Args: + request: The request value for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + client_info: A UnaryClientInfo containing various information about + the RPC. + invoker: The handler to complete the RPC on the client. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling invoker(request, metadata). + """ raise NotImplementedError() @@ -80,137 +80,46 @@ def intercept_stream( ): """Intercepts stream RPCs on the invocation-side. - Args: - request_or_iterator: The request value for the RPC if - `client_info.is_client_stream` is `false`; otherwise, an iterator of - request values. - metadata: Optional :term:`metadata` to be transmitted to the service-side - of the RPC. - client_info: A StreamClientInfo containing various information about - the RPC. - invoker: The handler to complete the RPC on the client. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling invoker(metadata). - """ + Args: + request_or_iterator: The request value for the RPC if + `client_info.is_client_stream` is `false`; otherwise, an iterator of + request values. + metadata: Optional :term:`metadata` to be transmitted to the service-side + of the RPC. + client_info: A StreamClientInfo containing various information about + the RPC. + invoker: The handler to complete the RPC on the client. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling invoker(metadata). + """ raise NotImplementedError() def intercept_channel(channel, *interceptors): """Creates an intercepted channel. - Args: - channel: A Channel. - interceptors: Zero or more UnaryClientInterceptors or - StreamClientInterceptors - - Returns: - A Channel. - - Raises: - TypeError: If an interceptor derives from neither UnaryClientInterceptor - nor StreamClientInterceptor. - """ - from . import _interceptor - - return _interceptor.intercept_channel(channel, *interceptors) - - -class UnaryServerInfo(abc.ABC): - """Consists of various information about a unary RPC on the service-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - """ - - -class StreamServerInfo(abc.ABC): - """Consists of various information about a stream RPC on the service-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - is_client_stream: Indicates whether the RPC is client-streaming. - is_server_stream: Indicates whether the RPC is server-streaming. - """ - - -class UnaryServerInterceptor(abc.ABC): - """Affords intercepting unary-unary RPCs on the service-side.""" - - @abc.abstractmethod - def intercept_unary(self, request, servicer_context, server_info, handler): - """Intercepts unary-unary RPCs on the service-side. - Args: - request: The request value for the RPC. - servicer_context: A ServicerContext. - server_info: A UnaryServerInfo containing various information about - the RPC. - handler: The handler to complete the RPC on the server. It is the - interceptor's responsibility to call it. + channel: A Channel. + interceptors: Zero or more UnaryClientInterceptors or + StreamClientInterceptors Returns: - The result from calling handler(request, servicer_context). - """ - raise NotImplementedError() - - -class StreamServerInterceptor(abc.ABC): - """Affords intercepting stream RPCs on the service-side.""" + A Channel. - @abc.abstractmethod - def intercept_stream( - self, request_or_iterator, servicer_context, server_info, handler - ): - """Intercepts stream RPCs on the service-side. - - Args: - request_or_iterator: The request value for the RPC if - `server_info.is_client_stream` is `False`; otherwise, an iterator of - request values. - servicer_context: A ServicerContext. - server_info: A StreamServerInfo containing various information about - the RPC. - handler: The handler to complete the RPC on the server. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling handler(servicer_context). + Raises: + TypeError: If an interceptor derives from neither UnaryClientInterceptor + nor StreamClientInterceptor. """ - raise NotImplementedError() - - -def intercept_server(server, *interceptors): - """Creates an intercepted server. - - Args: - server: A Server. - interceptors: Zero or more UnaryServerInterceptors or - StreamServerInterceptors - - Returns: - A Server. - - Raises: - TypeError: If an interceptor derives from neither UnaryServerInterceptor - nor StreamServerInterceptor. - """ from . import _interceptor - return _interceptor.intercept_server(server, *interceptors) + return _interceptor.intercept_channel(channel, *interceptors) __all__ = ( "UnaryClientInterceptor", "StreamClientInfo", "StreamClientInterceptor", - "UnaryServerInfo", - "StreamServerInfo", - "UnaryServerInterceptor", - "StreamServerInterceptor", "intercept_channel", - "intercept_server", ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py index 74861913b92..b9f74fff805 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py @@ -252,180 +252,3 @@ def intercept_channel(channel, *interceptors): ) result = _InterceptorChannel(result, interceptor) return result - - -class _UnaryServerInfo( - collections.namedtuple("_UnaryServerInfo", ("full_method",)) -): - pass - - -class _StreamServerInfo( - collections.namedtuple( - "_StreamServerInfo", - ("full_method", "is_client_stream", "is_server_stream"), - ) -): - pass - - -class _InterceptorRpcMethodHandler(grpc.RpcMethodHandler): - def __init__(self, rpc_method_handler, method, interceptor): - self._rpc_method_handler = rpc_method_handler - self._method = method - self._interceptor = interceptor - - @property - def request_streaming(self): - return self._rpc_method_handler.request_streaming - - @property - def response_streaming(self): - return self._rpc_method_handler.response_streaming - - @property - def request_deserializer(self): - return self._rpc_method_handler.request_deserializer - - @property - def response_serializer(self): - return self._rpc_method_handler.response_serializer - - @property - def unary_unary(self): - if not isinstance(self._interceptor, grpcext.UnaryServerInterceptor): - return self._rpc_method_handler.unary_unary - - def adaptation(request, servicer_context): - def handler(request, servicer_context): - return self._rpc_method_handler.unary_unary( - request, servicer_context - ) - - return self._interceptor.intercept_unary( - request, - servicer_context, - _UnaryServerInfo(self._method), - handler, - ) - - return adaptation - - @property - def unary_stream(self): - if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): - return self._rpc_method_handler.unary_stream - - def adaptation(request, servicer_context): - def handler(request, servicer_context): - return self._rpc_method_handler.unary_stream( - request, servicer_context - ) - - return self._interceptor.intercept_stream( - request, - servicer_context, - _StreamServerInfo(self._method, False, True), - handler, - ) - - return adaptation - - @property - def stream_unary(self): - if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): - return self._rpc_method_handler.stream_unary - - def adaptation(request_iterator, servicer_context): - def handler(request_iterator, servicer_context): - return self._rpc_method_handler.stream_unary( - request_iterator, servicer_context - ) - - return self._interceptor.intercept_stream( - request_iterator, - servicer_context, - _StreamServerInfo(self._method, True, False), - handler, - ) - - return adaptation - - @property - def stream_stream(self): - if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): - return self._rpc_method_handler.stream_stream - - def adaptation(request_iterator, servicer_context): - def handler(request_iterator, servicer_context): - return self._rpc_method_handler.stream_stream( - request_iterator, servicer_context - ) - - return self._interceptor.intercept_stream( - request_iterator, - servicer_context, - _StreamServerInfo(self._method, True, True), - handler, - ) - - return adaptation - - -class _InterceptorGenericRpcHandler(grpc.GenericRpcHandler): - def __init__(self, generic_rpc_handler, interceptor): - self.generic_rpc_handler = generic_rpc_handler - self._interceptor = interceptor - - def service(self, handler_call_details): - result = self.generic_rpc_handler.service(handler_call_details) - if result: - result = _InterceptorRpcMethodHandler( - result, handler_call_details.method, self._interceptor - ) - return result - - -class _InterceptorServer(grpc.Server): - def __init__(self, server, interceptor): - self._server = server - self._interceptor = interceptor - - def add_generic_rpc_handlers(self, generic_rpc_handlers): - generic_rpc_handlers = [ - _InterceptorGenericRpcHandler( - generic_rpc_handler, self._interceptor - ) - for generic_rpc_handler in generic_rpc_handlers - ] - return self._server.add_generic_rpc_handlers(generic_rpc_handlers) - - def add_insecure_port(self, *args, **kwargs): - return self._server.add_insecure_port(*args, **kwargs) - - def add_secure_port(self, *args, **kwargs): - return self._server.add_secure_port(*args, **kwargs) - - def start(self, *args, **kwargs): - return self._server.start(*args, **kwargs) - - def stop(self, *args, **kwargs): - return self._server.stop(*args, **kwargs) - - def wait_for_termination(self, *args, **kwargs): - return self._server.wait_for_termination(*args, **kwargs) - - -def intercept_server(server, *interceptors): - result = server - for interceptor in interceptors: - if not isinstance( - interceptor, grpcext.UnaryServerInterceptor - ) and not isinstance(interceptor, grpcext.StreamServerInterceptor): - raise TypeError( - "interceptor must be either a " - "grpcext.UnaryServerInterceptor or a " - "grpcext.StreamServerInterceptor" - ) - result = _InterceptorServer(result, interceptor) - return result From 383458c7b59063ccb948c30262a89203750e6679 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Fri, 25 Sep 2020 01:23:23 -0400 Subject: [PATCH 02/17] Go back a bit and use the instrumentor I really dislike this pattern, changing other libraries, but it's the way that most of these things work, so I'll stick with it. One can still use the Interceptor directly if desired. Also reworked the wrapped server so it's no longer a wrapper, but simply adds on our interceptor. This allows adding any custom interceptors as well. A caveat here is that if you need to specifically order interceptors, you'll need to do it manually rather than using the instrumentor-enable thing. --- .../instrumentation/grpc/__init__.py | 56 +++++++++++++++---- .../instrumentation/grpc/_server.py | 34 +++++++---- 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index a62132c5319..b4f7112deb5 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -77,7 +77,7 @@ def run(): import grpc from opentelemetry import trace - from opentelemetry.instrumentation.grpc import server_interceptor + from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer, server_interceptor from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( ConsoleSpanExporter, @@ -94,6 +94,8 @@ def run(): SimpleExportSpanProcessor(ConsoleSpanExporter()) ) + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() class Greeter(helloworld_pb2_grpc.GreeterServicer): def SayHello(self, request, context): @@ -102,8 +104,7 @@ def SayHello(self, request, context): def serve(): - server = grpc.server(futures.ThreadPoolExecutor(), - interceptors = [server_interceptor()]) + server = grpc.server(futures.ThreadPoolExecutor()) helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) server.add_insecure_port("[::]:50051") @@ -114,6 +115,17 @@ def serve(): if __name__ == "__main__": logging.basicConfig() serve() + +You can also add the instrumentor manually, rather than using +:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorServer`: + +.. code-block:: python + + from opentelemetry.instrumentation.grpc import server_interceptor + + server = grpc.server(futures.ThreadPoolExecutor(), + interceptors = [server_interceptor()]) + """ from functools import partial @@ -121,7 +133,6 @@ def serve(): from wrapt import wrap_function_wrapper as _wrap from opentelemetry import trace -from opentelemetry.instrumentation.grpc._server import OpenTelemetryServerInterceptor from opentelemetry.instrumentation.grpc.grpcext import intercept_channel from opentelemetry.instrumentation.grpc.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -132,12 +143,33 @@ def serve(): # pylint:disable=unused-argument # isort:skip -__all__ = [ - "GrpcInstrumentorClient", - "OpenTelemetryServerInterceptor", - "client_interceptor", - "server_interceptor", -] + +class GrpcInstrumentorServer(BaseInstrumentor): + """ + Globally instrument the grpc server. + + Usage:: + + grpc_server_instrumentor = GrpcInstrumentorServer() + grpc_server_instrumentor.instrument() + + """ + def _instrument(self, **kwargs): + self._original_func = grpc.server + + def server(*args, **kwargs): + if 'interceptors' in kwargs: + # add our interceptor as the first + kwargs['interceptors'].insert(0, server_interceptor()) + else: + kwargs['interceptors'] = [server_interceptor()] + return self._original_func(*args, **kwargs) + + grpc.server = server + + + def _uninstrument(self, **kwargs): + unwrap(grpc, "server") class GrpcInstrumentorClient(BaseInstrumentor): @@ -207,6 +239,8 @@ def server_interceptor(tracer_provider=None): Returns: A service-side interceptor object. """ + from . import _server + tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return OpenTelemetryServerInterceptor(tracer) + return _server.OpenTelemetryServerInterceptor(tracer) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index f4f2607b6d2..c5442205d8e 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -109,7 +109,10 @@ def abort(self, code, details): self._active_span.set_attribute("grpc.status_code", code.name) self._active_span.set_attribute("grpc.details", details) self._active_span.set_status( - Status(canonical_code=StatusCanonicalCode(code.value[0]), description=details) + Status( + canonical_code=StatusCanonicalCode(code.value[0]), + description=details, + ) ) return self._servicer_context.abort(code, details) @@ -122,7 +125,10 @@ def set_code(self, code): details = self.details or code.value[1] self._active_span.set_attribute("grpc.status_code", code.name) self._active_span.set_status( - Status(canonical_code=StatusCanonicalCode(code.value[0]), description=details) + Status( + canonical_code=StatusCanonicalCode(code.value[0]), + description=details, + ) ) return self._servicer_context.set_code(code) @@ -130,7 +136,10 @@ def set_details(self, details): self.details = details self._active_span.set_attribute("grpc.details", details) self._active_span.set_status( - Status(canonical_code=StatusCanonicalCode(self.code.value[0]), description=details) + Status( + canonical_code=StatusCanonicalCode(self.code.value[0]), + description=details, + ) ) return self._servicer_context.set_details(details) @@ -182,22 +191,21 @@ def _start_span(self, handler_call_details, context): } metadata = dict(context.invocation_metadata()) - if 'user-agent' in metadata: - attributes["grpc.user_agent"] = metadata['user-agent'] + if "user-agent" in metadata: + attributes["grpc.user_agent"] = metadata["user-agent"] # Split up the peer to keep with how other telemetry sources # do it. This looks like ipv6:[::1]:57284 or ipv4:127.0.0.1:57284. - peer = context.peer().split(':', 1)[1] - host, port = peer.rsplit(':', 1) + peer = context.peer().split(":", 1)[1] + host, port = peer.rsplit(":", 1) # other telemetry sources convert this, so we will too if host == "[::1]" or host == "127.0.0.1": host = "localhost" - attributes.update({ - "net.peer.name": host, - "net.peer.port": port, - }) + attributes.update( + {"net.peer.name": host, "net.peer.port": port,} + ) return self._tracer.start_as_current_span( name=handler_call_details.method, @@ -210,7 +218,9 @@ def telemetry_wrapper(behavior, request_streaming, response_streaming): def telemetry_interceptor(request_or_iterator, context): with self._set_remote_context(context): - with self._start_span(handler_call_details, context) as span: + with self._start_span( + handler_call_details, context + ) as span: # wrap the context context = _OpenTelemetryServicerContext(context, span) From 4bfa1db220b35b422a0ef507b23dcee308061ad8 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Fri, 25 Sep 2020 01:29:21 -0400 Subject: [PATCH 03/17] black --- .../src/opentelemetry/instrumentation/grpc/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index b4f7112deb5..d67d6c417e0 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -154,20 +154,20 @@ class GrpcInstrumentorServer(BaseInstrumentor): grpc_server_instrumentor.instrument() """ + def _instrument(self, **kwargs): self._original_func = grpc.server def server(*args, **kwargs): - if 'interceptors' in kwargs: + if "interceptors" in kwargs: # add our interceptor as the first - kwargs['interceptors'].insert(0, server_interceptor()) + kwargs["interceptors"].insert(0, server_interceptor()) else: - kwargs['interceptors'] = [server_interceptor()] + kwargs["interceptors"] = [server_interceptor()] return self._original_func(*args, **kwargs) grpc.server = server - def _uninstrument(self, **kwargs): unwrap(grpc, "server") From bb062f9c58ac8546f74fd465a7156ba7b2616e53 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Fri, 25 Sep 2020 01:34:39 -0400 Subject: [PATCH 04/17] Black screwed this one up, not me. --- .../src/opentelemetry/instrumentation/grpc/_server.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index c5442205d8e..0d2fd221233 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -203,9 +203,7 @@ def _start_span(self, handler_call_details, context): if host == "[::1]" or host == "127.0.0.1": host = "localhost" - attributes.update( - {"net.peer.name": host, "net.peer.port": port,} - ) + attributes.update({"net.peer.name": host, "net.peer.port": port}) return self._tracer.start_as_current_span( name=handler_call_details.method, From a8aa2af3e537af2be218a059320f35aac3cd56ca Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Sun, 27 Sep 2020 18:36:05 -0400 Subject: [PATCH 05/17] Appease the linters --- .../grpc/hello_world_server.py | 6 ++--- .../grpc/route_guide_server.py | 7 +++--- .../setup.cfg | 1 - .../instrumentation/grpc/__init__.py | 2 ++ .../instrumentation/grpc/_server.py | 23 +++++++++++-------- .../tests/test_server_interceptor.py | 12 ++++------ 6 files changed, 28 insertions(+), 23 deletions(-) diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/hello_world_server.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/hello_world_server.py index ae4562e5817..e43d38d2842 100755 --- a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/hello_world_server.py +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/hello_world_server.py @@ -43,7 +43,6 @@ from opentelemetry import trace from opentelemetry.instrumentation.grpc import server_interceptor -from opentelemetry.instrumentation.grpc.grpcext import intercept_server from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( ConsoleSpanExporter, @@ -73,8 +72,9 @@ def SayHello(self, request, context): def serve(): - server = grpc.server(futures.ThreadPoolExecutor()) - server = intercept_server(server, server_interceptor()) + server = grpc.server( + futures.ThreadPoolExecutor(), interceptors=[server_interceptor()], + ) helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) server.add_insecure_port("[::]:50051") diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_server.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_server.py index a8b2a95e819..74edbd65b3f 100755 --- a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_server.py +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_server.py @@ -46,7 +46,6 @@ from opentelemetry import trace from opentelemetry.instrumentation.grpc import server_interceptor -from opentelemetry.instrumentation.grpc.grpcext import intercept_server from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( ConsoleSpanExporter, @@ -162,8 +161,10 @@ def RouteChat(self, request_iterator, context): def serve(): - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - server = intercept_server(server, server_interceptor()) + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10), + interceptors=[server_interceptor()], + ) route_guide_pb2_grpc.add_RouteGuideServicer_to_server( RouteGuideServicer(), server diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index 5d39574bfd0..dfc7aabce86 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -55,4 +55,3 @@ where = src [options.entry_points] opentelemetry_instrumentor = grpc_client = opentelemetry.instrumentation.grpc:GrpcInstrumentorClient - grpc_server = opentelemetry.instrumentation.grpc:GrpcInstrumentorServer diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index d67d6c417e0..2d43d7d4556 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -155,6 +155,8 @@ class GrpcInstrumentorServer(BaseInstrumentor): """ + # pylint:disable=attribute-defined-outside-init + def _instrument(self, **kwargs): self._original_func = grpc.server diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 0d2fd221233..70ab378967f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -33,7 +33,7 @@ # wrap an RPC call # see https://github.com/grpc/grpc/issues/18191 -def _wrap_rpc_behavior(handler, fn): +def _wrap_rpc_behavior(handler, continuation): if handler is None: return None @@ -51,7 +51,9 @@ def _wrap_rpc_behavior(handler, fn): handler_factory = grpc.unary_unary_rpc_method_handler return handler_factory( - fn(behavior_fn, handler.request_streaming, handler.response_streaming), + continuation( + behavior_fn, handler.request_streaming, handler.response_streaming + ), request_deserializer=handler.request_deserializer, response_serializer=handler.response_serializer, ) @@ -144,6 +146,9 @@ def set_details(self, details): return self._servicer_context.set_details(details) +# pylint:disable=abstract-method +# pylint:disable=no-self-use +# pylint:disable=unused-argument class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): """ A gRPC server interceptor, to add OpenTelemetry. @@ -196,11 +201,10 @@ def _start_span(self, handler_call_details, context): # Split up the peer to keep with how other telemetry sources # do it. This looks like ipv6:[::1]:57284 or ipv4:127.0.0.1:57284. - peer = context.peer().split(":", 1)[1] - host, port = peer.rsplit(":", 1) + host, port = context.peer().split(":", 1)[1].rsplit(":", 1) # other telemetry sources convert this, so we will too - if host == "[::1]" or host == "127.0.0.1": + if host in ("[::1]", "127.0.0.1"): host = "localhost" attributes.update({"net.peer.name": host, "net.peer.port": port}) @@ -225,13 +229,14 @@ def telemetry_interceptor(request_or_iterator, context): # And now we run the actual RPC. try: return behavior(request_or_iterator, context) - except Exception as e: + except Exception as error: # Bare exceptions are likely to be gRPC aborts, which # we handle in our context wrapper. # Here, we're interested in uncaught exceptions. - if type(e) != Exception: - span.set_attribute("error", repr(e)) - raise e + # pylint:disable=unidiomatic-typecheck + if type(error) != Exception: + span.set_attribute("error", repr(error)) + raise error return telemetry_interceptor diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index a41da47ae92..13b535d8414 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -26,7 +26,6 @@ GrpcInstrumentorServer, server_interceptor, ) -from opentelemetry.instrumentation.grpc.grpcext import intercept_server from opentelemetry.sdk import trace as trace_sdk from opentelemetry.test.test_base import TestBase @@ -123,10 +122,9 @@ def handler(request, context): server = grpc.server( futures.ThreadPoolExecutor(max_workers=1), options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], ) - # FIXME: grpcext interceptor doesn't apply to handlers passed to server - # init, should use intercept_service API instead. - server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) port = server.add_insecure_port("[::]:0") @@ -166,8 +164,8 @@ def handler(request, context): server = grpc.server( futures.ThreadPoolExecutor(max_workers=1), options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], ) - server = intercept_server(server, interceptor) server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) port = server.add_insecure_port("[::]:0") @@ -201,8 +199,8 @@ def handler(request, context): server = grpc.server( futures.ThreadPoolExecutor(max_workers=1), options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], ) - server = intercept_server(server, interceptor) server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) port = server.add_insecure_port("[::]:0") @@ -248,8 +246,8 @@ def handler(request, context): server = grpc.server( futures.ThreadPoolExecutor(max_workers=2), options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], ) - server = intercept_server(server, interceptor) server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) port = server.add_insecure_port("[::]:0") From ed68c41795d44e561fd63619ab78d80951e1c387 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Sun, 27 Sep 2020 19:54:16 -0400 Subject: [PATCH 06/17] missed that one --- .../src/opentelemetry/instrumentation/grpc/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 2d43d7d4556..5f1fa8cdddc 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -171,7 +171,7 @@ def server(*args, **kwargs): grpc.server = server def _uninstrument(self, **kwargs): - unwrap(grpc, "server") + grpc.server = self._original_func class GrpcInstrumentorClient(BaseInstrumentor): From a17cce1abb951cd0da37d585c8ef10419bc16c29 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Thu, 22 Oct 2020 09:25:15 -0400 Subject: [PATCH 07/17] Use record_exception Based on a suggestion by @owais - thanks! --- .../src/opentelemetry/instrumentation/grpc/_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 70ab378967f..eb092c1562f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -235,7 +235,7 @@ def telemetry_interceptor(request_or_iterator, context): # Here, we're interested in uncaught exceptions. # pylint:disable=unidiomatic-typecheck if type(error) != Exception: - span.set_attribute("error", repr(error)) + span.record_exception(error) raise error return telemetry_interceptor From 813cea7d9bcb56aaa578fc86df9ad718bdea867b Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Thu, 22 Oct 2020 09:47:47 -0400 Subject: [PATCH 08/17] Update peer parsing, wrap with a try Apparently the peer value can include a comma-separated address, see the tests here: https://github.com/grpc/grpc/blob/master/test/core/client_channel/resolvers/sockaddr_resolver_test.cc#L90 This updates the parsing, and adds a try/catch block with a warning log. --- .../instrumentation/grpc/_server.py | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index eb092c1562f..4c3df29976c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -25,11 +25,14 @@ from typing import List import grpc +import logging from opentelemetry import propagators, trace from opentelemetry.context import attach, detach from opentelemetry.trace.status import Status, StatusCanonicalCode +_log = logging.getLogger(__name__) + # wrap an RPC call # see https://github.com/grpc/grpc/issues/18191 @@ -200,14 +203,21 @@ def _start_span(self, handler_call_details, context): attributes["grpc.user_agent"] = metadata["user-agent"] # Split up the peer to keep with how other telemetry sources - # do it. This looks like ipv6:[::1]:57284 or ipv4:127.0.0.1:57284. - host, port = context.peer().split(":", 1)[1].rsplit(":", 1) - - # other telemetry sources convert this, so we will too - if host in ("[::1]", "127.0.0.1"): - host = "localhost" - - attributes.update({"net.peer.name": host, "net.peer.port": port}) + # do it. This looks like: + # * ipv6:[::1]:57284 + # * ipv4:127.0.0.1:57284 + # * ipv4:10.2.1.1:57284,127.0.0.1:57284 + # + try: + host, port = context.peer().split(',')[0].split(":", 1)[1].rsplit(":", 1) + + # other telemetry sources convert this, so we will too + if host in ("[::1]", "127.0.0.1"): + host = "localhost" + + attributes.update({"net.peer.name": host, "net.peer.port": port}) + except IndexError: + _log.warning("Failed to parse peer address '%s'", context.peer()) return self._tracer.start_as_current_span( name=handler_call_details.method, From 121461ed0173ad86a0c00d9c3794a29340ce23b1 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Fri, 23 Oct 2020 09:41:04 -0400 Subject: [PATCH 09/17] appease the linter --- .../src/opentelemetry/instrumentation/grpc/_server.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 4c3df29976c..45cdc397c6c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -21,11 +21,11 @@ Implementation of the service-side open-telemetry interceptor. """ +import logging from contextlib import contextmanager from typing import List import grpc -import logging from opentelemetry import propagators, trace from opentelemetry.context import attach, detach @@ -209,7 +209,9 @@ def _start_span(self, handler_call_details, context): # * ipv4:10.2.1.1:57284,127.0.0.1:57284 # try: - host, port = context.peer().split(',')[0].split(":", 1)[1].rsplit(":", 1) + host, port = ( + context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1) + ) # other telemetry sources convert this, so we will too if host in ("[::1]", "127.0.0.1"): From 60a99eb0e0a863ad742eeb7cd0377cf77f5fd40c Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Mon, 26 Oct 2020 14:20:08 -0400 Subject: [PATCH 10/17] Update instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py Co-authored-by: Aaron Abbott --- .../src/opentelemetry/instrumentation/grpc/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 5f1fa8cdddc..776e29e8e20 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -77,7 +77,7 @@ def run(): import grpc from opentelemetry import trace - from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer, server_interceptor + from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( ConsoleSpanExporter, From 88c4194142199eedf5c8bb616366e3f09cbd4730 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Mon, 26 Oct 2020 14:20:47 -0400 Subject: [PATCH 11/17] Re-add a thing I accidentally removed --- instrumentation/opentelemetry-instrumentation-grpc/setup.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg index dfc7aabce86..5d39574bfd0 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-grpc/setup.cfg @@ -55,3 +55,4 @@ where = src [options.entry_points] opentelemetry_instrumentor = grpc_client = opentelemetry.instrumentation.grpc:GrpcInstrumentorClient + grpc_server = opentelemetry.instrumentation.grpc:GrpcInstrumentorServer From 7691f5c25fd75279459642cfaca76847dc2d1802 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Mon, 26 Oct 2020 14:21:33 -0400 Subject: [PATCH 12/17] Follow repo conventions --- .../src/opentelemetry/instrumentation/grpc/_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 45cdc397c6c..a9f39d9afc9 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -31,7 +31,7 @@ from opentelemetry.context import attach, detach from opentelemetry.trace.status import Status, StatusCanonicalCode -_log = logging.getLogger(__name__) +logger = logging.getLogger(__name__) # wrap an RPC call @@ -219,7 +219,7 @@ def _start_span(self, handler_call_details, context): attributes.update({"net.peer.name": host, "net.peer.port": port}) except IndexError: - _log.warning("Failed to parse peer address '%s'", context.peer()) + logger.warning("Failed to parse peer address '%s'", context.peer()) return self._tracer.start_as_current_span( name=handler_call_details.method, From 86150c3936f5f20b445a1c42ccb0e6926f850b4c Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Mon, 26 Oct 2020 14:25:33 -0400 Subject: [PATCH 13/17] Add more attributes, name them 'rpc' Per suggestion, these attributes should be in the `rpc` namespace. --- .../src/opentelemetry/instrumentation/grpc/_server.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index a9f39d9afc9..6fa7c6eb0d4 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -195,12 +195,13 @@ def get_from_grpc_metadata(metadata, key) -> List[str]: def _start_span(self, handler_call_details, context): attributes = { - "grpc.method": handler_call_details.method, + "rpc.method": handler_call_details.method, + "rpc.system": "grpc", } metadata = dict(context.invocation_metadata()) if "user-agent" in metadata: - attributes["grpc.user_agent"] = metadata["user-agent"] + attributes["rpc.user_agent"] = metadata["user-agent"] # Split up the peer to keep with how other telemetry sources # do it. This looks like: From 025874fc761ebc4afb6ad25a8e0d2380c288f007 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Mon, 26 Oct 2020 20:20:57 -0400 Subject: [PATCH 14/17] More rename 'grpc' to 'rpc' --- .../src/opentelemetry/instrumentation/grpc/_server.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 6fa7c6eb0d4..841e502e657 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -111,8 +111,8 @@ def set_trailing_metadata(self, *args, **kwargs): return self._servicer_context.set_trailing_metadata(*args, **kwargs) def abort(self, code, details): - self._active_span.set_attribute("grpc.status_code", code.name) - self._active_span.set_attribute("grpc.details", details) + self._active_span.set_attribute("rpc.status_code", code.name) + self._active_span.set_attribute("rpc.details", details) self._active_span.set_status( Status( canonical_code=StatusCanonicalCode(code.value[0]), @@ -128,7 +128,7 @@ def set_code(self, code): self.code = code # use details if we already have it, otherwise the status description details = self.details or code.value[1] - self._active_span.set_attribute("grpc.status_code", code.name) + self._active_span.set_attribute("rpc.status_code", code.name) self._active_span.set_status( Status( canonical_code=StatusCanonicalCode(code.value[0]), @@ -139,7 +139,7 @@ def set_code(self, code): def set_details(self, details): self.details = details - self._active_span.set_attribute("grpc.details", details) + self._active_span.set_attribute("rpc.details", details) self._active_span.set_status( Status( canonical_code=StatusCanonicalCode(self.code.value[0]), From 02b92bd6e3f05b1e07915ac04ee1d3ca536761c6 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Wed, 28 Oct 2020 19:58:44 -0400 Subject: [PATCH 15/17] Remove extra tags which aren't needed I was using Zipkin and some useful flags were missing, but when using Jaeger they're present, so I didn't actually need to set these explicitly. --- .../src/opentelemetry/instrumentation/grpc/_server.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 841e502e657..59bd28e2ff9 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -111,8 +111,8 @@ def set_trailing_metadata(self, *args, **kwargs): return self._servicer_context.set_trailing_metadata(*args, **kwargs) def abort(self, code, details): - self._active_span.set_attribute("rpc.status_code", code.name) - self._active_span.set_attribute("rpc.details", details) + self.code = code + self.details = details self._active_span.set_status( Status( canonical_code=StatusCanonicalCode(code.value[0]), @@ -128,7 +128,6 @@ def set_code(self, code): self.code = code # use details if we already have it, otherwise the status description details = self.details or code.value[1] - self._active_span.set_attribute("rpc.status_code", code.name) self._active_span.set_status( Status( canonical_code=StatusCanonicalCode(code.value[0]), @@ -139,7 +138,6 @@ def set_code(self, code): def set_details(self, details): self.details = details - self._active_span.set_attribute("rpc.details", details) self._active_span.set_status( Status( canonical_code=StatusCanonicalCode(self.code.value[0]), From 36877b5e902396a3385a91b4aa9bb72e1a0a6807 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Thu, 29 Oct 2020 14:51:33 -0400 Subject: [PATCH 16/17] update post-rebase --- .../src/opentelemetry/instrumentation/grpc/_server.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 59bd28e2ff9..0de94ac1921 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -29,7 +29,7 @@ from opentelemetry import propagators, trace from opentelemetry.context import attach, detach -from opentelemetry.trace.status import Status, StatusCanonicalCode +from opentelemetry.trace.status import Status, StatusCode logger = logging.getLogger(__name__) @@ -115,7 +115,7 @@ def abort(self, code, details): self.details = details self._active_span.set_status( Status( - canonical_code=StatusCanonicalCode(code.value[0]), + canonical_code=StatusCode(code.value[0]), description=details, ) ) @@ -130,7 +130,7 @@ def set_code(self, code): details = self.details or code.value[1] self._active_span.set_status( Status( - canonical_code=StatusCanonicalCode(code.value[0]), + canonical_code=StatusCode(code.value[0]), description=details, ) ) @@ -140,7 +140,7 @@ def set_details(self, details): self.details = details self._active_span.set_status( Status( - canonical_code=StatusCanonicalCode(self.code.value[0]), + canonical_code=StatusCode(self.code.value[0]), description=details, ) ) From fc4a031d99b873423dc086afad5c1a3e9662151a Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Thu, 29 Oct 2020 15:01:15 -0400 Subject: [PATCH 17/17] I hate the linter --- .../opentelemetry/instrumentation/grpc/_server.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 0de94ac1921..83cc5824f17 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -114,10 +114,7 @@ def abort(self, code, details): self.code = code self.details = details self._active_span.set_status( - Status( - canonical_code=StatusCode(code.value[0]), - description=details, - ) + Status(status_code=StatusCode(code.value[0]), description=details) ) return self._servicer_context.abort(code, details) @@ -129,10 +126,7 @@ def set_code(self, code): # use details if we already have it, otherwise the status description details = self.details or code.value[1] self._active_span.set_status( - Status( - canonical_code=StatusCode(code.value[0]), - description=details, - ) + Status(status_code=StatusCode(code.value[0]), description=details) ) return self._servicer_context.set_code(code) @@ -140,7 +134,7 @@ def set_details(self, details): self.details = details self._active_span.set_status( Status( - canonical_code=StatusCode(self.code.value[0]), + status_code=StatusCode(self.code.value[0]), description=details, ) )