Skip to content

Commit 13b87a7

Browse files
committed
Add grpc.aio client-side support
This adds support for instrumenting grpc.aio channels with spans and telemetry. The instrumentation needed to work differently that the standard grpc channel support but is functionally the same.
1 parent 03e021b commit 13b87a7

File tree

6 files changed

+797
-31
lines changed

6 files changed

+797
-31
lines changed

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py

+126
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,59 @@ def serve():
118118
server = grpc.server(futures.ThreadPoolExecutor(),
119119
interceptors = [server_interceptor()])
120120
121+
Usage Aio Client
122+
------------
123+
.. code-block:: python
124+
125+
import logging
126+
import asyncio
127+
128+
import grpc
129+
130+
from opentelemetry import trace
131+
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorClient
132+
from opentelemetry.sdk.trace import TracerProvider
133+
from opentelemetry.sdk.trace.export import (
134+
ConsoleSpanExporter,
135+
SimpleSpanProcessor,
136+
)
137+
138+
try:
139+
from .gen import helloworld_pb2, helloworld_pb2_grpc
140+
except ImportError:
141+
from gen import helloworld_pb2, helloworld_pb2_grpc
142+
143+
trace.set_tracer_provider(TracerProvider())
144+
trace.get_tracer_provider().add_span_processor(
145+
SimpleSpanProcessor(ConsoleSpanExporter())
146+
)
147+
148+
grpc_client_instrumentor = GrpcAioInstrumentorClient()
149+
grpc_client_instrumentor.instrument()
150+
151+
async def run():
152+
with grpc.aio.insecure_channel("localhost:50051") as channel:
153+
154+
stub = helloworld_pb2_grpc.GreeterStub(channel)
155+
response = await stub.SayHello(helloworld_pb2.HelloRequest(name="YOU"))
156+
157+
print("Greeter client received: " + response.message)
158+
159+
160+
if __name__ == "__main__":
161+
logging.basicConfig()
162+
asyncio.run(run())
163+
164+
You can also add the interceptor manually, rather than using
165+
:py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorClient`:
166+
167+
.. code-block:: python
168+
169+
from opentelemetry.instrumentation.grpc import aio_client_interceptors
170+
171+
channel = grpc.aio.insecure_channel("localhost:12345", interceptors=aio_client_interceptors())
172+
173+
121174
Usage Aio Server
122175
------------
123176
.. code-block:: python
@@ -321,6 +374,58 @@ def wrapper_fn(self, original_func, instance, args, kwargs):
321374
)
322375

323376

377+
class GrpcAioInstrumentorClient(BaseInstrumentor):
378+
"""
379+
Globally instrument the grpc.aio client.
380+
381+
Usage::
382+
383+
grpc_aio_client_instrumentor = GrpcAioInstrumentorClient()
384+
grpc_aio_client_instrumentor.instrument()
385+
386+
"""
387+
388+
# pylint:disable=attribute-defined-outside-init, redefined-outer-name
389+
390+
def instrumentation_dependencies(self) -> Collection[str]:
391+
return _instruments
392+
393+
def _add_interceptors(self, tracer_provider, kwargs):
394+
if "interceptors" in kwargs and kwargs["interceptors"]:
395+
kwargs["interceptors"] = (
396+
aio_client_interceptors(tracer_provider=tracer_provider)
397+
+ kwargs["interceptors"]
398+
)
399+
else:
400+
kwargs["interceptors"] = aio_client_interceptors(
401+
tracer_provider=tracer_provider
402+
)
403+
404+
return kwargs
405+
406+
def _instrument(self, **kwargs):
407+
self._original_insecure = grpc.aio.insecure_channel
408+
self._original_secure = grpc.aio.secure_channel
409+
tracer_provider = kwargs.get("tracer_provider")
410+
411+
def insecure(*args, **kwargs):
412+
kwargs = self._add_interceptors(tracer_provider, kwargs)
413+
414+
return self._original_insecure(*args, **kwargs)
415+
416+
def secure(*args, **kwargs):
417+
kwargs = self._add_interceptors(tracer_provider, kwargs)
418+
419+
return self._original_secure(*args, **kwargs)
420+
421+
grpc.aio.insecure_channel = insecure
422+
grpc.aio.secure_channel = secure
423+
424+
def _uninstrument(self, **kwargs):
425+
grpc.aio.insecure_channel = self._original_insecure
426+
grpc.aio.secure_channel = self._original_secure
427+
428+
324429
def client_interceptor(tracer_provider=None):
325430
"""Create a gRPC client channel interceptor.
326431
@@ -353,6 +458,27 @@ def server_interceptor(tracer_provider=None):
353458
return _server.OpenTelemetryServerInterceptor(tracer)
354459

355460

461+
def aio_client_interceptors(tracer_provider=None):
462+
"""Create a gRPC client channel interceptor.
463+
464+
Args:
465+
tracer: The tracer to use to create client-side spans.
466+
467+
Returns:
468+
An invocation-side interceptor object.
469+
"""
470+
from . import _aio_client
471+
472+
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
473+
474+
return [
475+
_aio_client.UnaryUnaryAioClientInterceptor(tracer),
476+
_aio_client.UnaryStreamAioClientInterceptor(tracer),
477+
_aio_client.StreamUnaryAioClientInterceptor(tracer),
478+
_aio_client.StreamStreamAioClientInterceptor(tracer),
479+
]
480+
481+
356482
def aio_server_interceptor(tracer_provider=None):
357483
"""Create a gRPC aio server interceptor.
358484
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from collections import OrderedDict
16+
import functools
17+
18+
import grpc
19+
from grpc.aio import ClientCallDetails
20+
21+
from opentelemetry import context, trace
22+
from opentelemetry.semconv.trace import SpanAttributes
23+
from opentelemetry.propagate import inject
24+
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
25+
from opentelemetry.instrumentation.grpc.version import __version__
26+
27+
from opentelemetry.trace.status import Status, StatusCode
28+
29+
from opentelemetry.instrumentation.grpc._client import (
30+
OpenTelemetryClientInterceptor,
31+
_carrier_setter,
32+
)
33+
34+
35+
def _unary_done_callback(span, code, details):
36+
def callback(call):
37+
try:
38+
span.set_attribute(
39+
SpanAttributes.RPC_GRPC_STATUS_CODE,
40+
code.value[0],
41+
)
42+
if code != grpc.StatusCode.OK:
43+
span.set_status(
44+
Status(
45+
status_code=StatusCode.ERROR,
46+
description=details,
47+
)
48+
)
49+
finally:
50+
span.end()
51+
52+
return callback
53+
54+
55+
class _BaseAioClientInterceptor(OpenTelemetryClientInterceptor):
56+
@staticmethod
57+
def propagate_trace_in_details(client_call_details):
58+
method = client_call_details.method.decode("utf-8")
59+
metadata = client_call_details.metadata
60+
if not metadata:
61+
mutable_metadata = OrderedDict()
62+
else:
63+
mutable_metadata = OrderedDict(metadata)
64+
65+
inject(mutable_metadata, setter=_carrier_setter)
66+
metadata = tuple(mutable_metadata.items())
67+
68+
return ClientCallDetails(
69+
client_call_details.method,
70+
client_call_details.timeout,
71+
metadata,
72+
client_call_details.credentials,
73+
client_call_details.wait_for_ready,
74+
)
75+
76+
@staticmethod
77+
def add_error_details_to_span(span, exc):
78+
if isinstance(exc, grpc.RpcError):
79+
span.set_attribute(
80+
SpanAttributes.RPC_GRPC_STATUS_CODE,
81+
exc.code().value[0],
82+
)
83+
span.set_status(
84+
Status(
85+
status_code=StatusCode.ERROR,
86+
description=f"{type(exc).__name__}: {exc}",
87+
)
88+
)
89+
span.record_exception(exc)
90+
91+
async def _wrap_unary_response(self, continuation, span):
92+
try:
93+
call = await continuation()
94+
95+
# code and details are both coroutines that need to be await-ed,
96+
# the callbacks added with add_done_callback do not allow async
97+
# code so we need to get the code and details here then pass them
98+
# to the callback.
99+
code = await call.code()
100+
details = await call.details()
101+
102+
call.add_done_callback(_unary_done_callback(span, code, details))
103+
104+
return call
105+
except grpc.aio.AioRpcError as exc:
106+
self.add_error_details_to_span(span, exc)
107+
raise exc
108+
109+
async def _wrap_stream_response(self, span, call):
110+
try:
111+
async for response in call:
112+
yield response
113+
except Exception as exc:
114+
self.add_error_details_to_span(span, exc)
115+
raise exc
116+
finally:
117+
span.end()
118+
119+
120+
class UnaryUnaryAioClientInterceptor(
121+
grpc.aio.UnaryUnaryClientInterceptor,
122+
_BaseAioClientInterceptor,
123+
):
124+
async def intercept_unary_unary(
125+
self, continuation, client_call_details, request
126+
):
127+
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
128+
return await continuation(client_call_details, request)
129+
130+
method = client_call_details.method.decode("utf-8")
131+
with self._start_span(
132+
method,
133+
end_on_exit=False,
134+
record_exception=False,
135+
set_status_on_exception=False,
136+
) as span:
137+
new_details = self.propagate_trace_in_details(client_call_details)
138+
139+
continuation_with_args = functools.partial(
140+
continuation, new_details, request
141+
)
142+
return await self._wrap_unary_response(
143+
continuation_with_args, span
144+
)
145+
146+
147+
class UnaryStreamAioClientInterceptor(
148+
grpc.aio.UnaryStreamClientInterceptor,
149+
_BaseAioClientInterceptor,
150+
):
151+
async def intercept_unary_stream(
152+
self, continuation, client_call_details, request
153+
):
154+
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
155+
return await continuation(client_call_details, request)
156+
157+
method = client_call_details.method.decode("utf-8")
158+
with self._start_span(
159+
method,
160+
end_on_exit=False,
161+
record_exception=False,
162+
set_status_on_exception=False,
163+
) as span:
164+
new_details = self.propagate_trace_in_details(client_call_details)
165+
166+
resp = await continuation(new_details, request)
167+
168+
return self._wrap_stream_response(span, resp)
169+
170+
171+
class StreamUnaryAioClientInterceptor(
172+
grpc.aio.StreamUnaryClientInterceptor,
173+
_BaseAioClientInterceptor,
174+
):
175+
async def intercept_stream_unary(
176+
self, continuation, client_call_details, request_iterator
177+
):
178+
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
179+
return await continuation(client_call_details, request_iterator)
180+
181+
method = client_call_details.method.decode("utf-8")
182+
with self._start_span(
183+
method,
184+
end_on_exit=False,
185+
record_exception=False,
186+
set_status_on_exception=False,
187+
) as span:
188+
new_details = self.propagate_trace_in_details(client_call_details)
189+
190+
continuation_with_args = functools.partial(
191+
continuation, new_details, request_iterator
192+
)
193+
return await self._wrap_unary_response(
194+
continuation_with_args, span
195+
)
196+
197+
198+
class StreamStreamAioClientInterceptor(
199+
grpc.aio.StreamStreamClientInterceptor,
200+
_BaseAioClientInterceptor,
201+
):
202+
async def intercept_stream_stream(
203+
self, continuation, client_call_details, request_iterator
204+
):
205+
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
206+
return await continuation(client_call_details, request_iterator)
207+
208+
method = client_call_details.method.decode("utf-8")
209+
with self._start_span(
210+
method,
211+
end_on_exit=False,
212+
record_exception=False,
213+
set_status_on_exception=False,
214+
) as span:
215+
new_details = self.propagate_trace_in_details(client_call_details)
216+
217+
resp = await continuation(new_details, request_iterator)
218+
219+
return self._wrap_stream_response(span, resp)

0 commit comments

Comments
 (0)