Skip to content

Commit 03e021b

Browse files
committed
Add support for service-side grpc.aio
This adds support for grpc.aio server interceptors. The vast majority of the code is either re-used wholesale or duplicated with slight modifications from the existing standard interceptors.
1 parent 8107ad4 commit 03e021b

File tree

4 files changed

+773
-1
lines changed

4 files changed

+773
-1
lines changed

instrumentation/opentelemetry-instrumentation-grpc/setup.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,4 @@ where = src
5858
opentelemetry_instrumentor =
5959
grpc_client = opentelemetry.instrumentation.grpc:GrpcInstrumentorClient
6060
grpc_server = opentelemetry.instrumentation.grpc:GrpcInstrumentorServer
61+
grpc_aio_server = opentelemetry.instrumentation.grpc:GrpcAioInstrumentorServer

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py

+113-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def serve():
108108
logging.basicConfig()
109109
serve()
110110
111-
You can also add the instrumentor manually, rather than using
111+
You can also add the interceptor manually, rather than using
112112
:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorServer`:
113113
114114
.. code-block:: python
@@ -118,6 +118,64 @@ def serve():
118118
server = grpc.server(futures.ThreadPoolExecutor(),
119119
interceptors = [server_interceptor()])
120120
121+
Usage Aio Server
122+
------------
123+
.. code-block:: python
124+
125+
import logging
126+
import asyncio
127+
128+
import grpc
129+
130+
from opentelemetry import trace
131+
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorServer
132+
from opentelemetry.sdk.trace import TracerProvider
133+
from opentelemetry.sdk.trace.export import (
134+
ConsoleSpanExporter,
135+
SimpleSpanProcessor,
136+
)
137+
138+
try:
139+
from .gen import helloworld_pb2, helloworld_pb2_grpc
140+
except ImportError:
141+
from gen import helloworld_pb2, helloworld_pb2_grpc
142+
143+
trace.set_tracer_provider(TracerProvider())
144+
trace.get_tracer_provider().add_span_processor(
145+
SimpleSpanProcessor(ConsoleSpanExporter())
146+
)
147+
148+
grpc_server_instrumentor = GrpcAioInstrumentorServer()
149+
grpc_server_instrumentor.instrument()
150+
151+
class Greeter(helloworld_pb2_grpc.GreeterServicer):
152+
async def SayHello(self, request, context):
153+
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)
154+
155+
156+
async def serve():
157+
158+
server = grpc.aio.server()
159+
160+
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
161+
server.add_insecure_port("[::]:50051")
162+
await server.start()
163+
await server.wait_for_termination()
164+
165+
166+
if __name__ == "__main__":
167+
logging.basicConfig()
168+
asyncio.run(serve())
169+
170+
You can also add the interceptor manually, rather than using
171+
:py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorServer`:
172+
173+
.. code-block:: python
174+
175+
from opentelemetry.instrumentation.grpc import aio_server_interceptor
176+
177+
server = grpc.aio.server(interceptors = [aio_server_interceptor()])
178+
121179
"""
122180
from typing import Collection
123181

@@ -174,6 +232,44 @@ def _uninstrument(self, **kwargs):
174232
grpc.server = self._original_func
175233

176234

235+
class GrpcAioInstrumentorServer(BaseInstrumentor):
236+
"""
237+
Globally instrument the grpc.aio server.
238+
239+
Usage::
240+
241+
grpc_aio_server_instrumentor = GrpcAioInstrumentorServer()
242+
grpc_aio_server_instrumentor.instrument()
243+
244+
"""
245+
246+
# pylint:disable=attribute-defined-outside-init, redefined-outer-name
247+
248+
def instrumentation_dependencies(self) -> Collection[str]:
249+
return _instruments
250+
251+
def _instrument(self, **kwargs):
252+
self._original_func = grpc.aio.server
253+
tracer_provider = kwargs.get("tracer_provider")
254+
255+
def server(*args, **kwargs):
256+
if "interceptors" in kwargs:
257+
# add our interceptor as the first
258+
kwargs["interceptors"].insert(
259+
0, aio_server_interceptor(tracer_provider=tracer_provider)
260+
)
261+
else:
262+
kwargs["interceptors"] = [
263+
aio_server_interceptor(tracer_provider=tracer_provider)
264+
]
265+
return self._original_func(*args, **kwargs)
266+
267+
grpc.aio.server = server
268+
269+
def _uninstrument(self, **kwargs):
270+
grpc.aio.server = self._original_func
271+
272+
177273
class GrpcInstrumentorClient(BaseInstrumentor):
178274
"""
179275
Globally instrument the grpc client
@@ -255,3 +351,19 @@ def server_interceptor(tracer_provider=None):
255351
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
256352

257353
return _server.OpenTelemetryServerInterceptor(tracer)
354+
355+
356+
def aio_server_interceptor(tracer_provider=None):
357+
"""Create a gRPC aio server interceptor.
358+
359+
Args:
360+
tracer: The tracer to use to create server-side spans.
361+
362+
Returns:
363+
A service-side interceptor object.
364+
"""
365+
from . import _aio_server
366+
367+
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
368+
369+
return _aio_server.OpenTelemetryAioServerInterceptor(tracer)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import grpc.aio
16+
17+
from ._server import (
18+
OpenTelemetryServerInterceptor,
19+
_wrap_rpc_behavior,
20+
_OpenTelemetryServicerContext,
21+
)
22+
23+
24+
class OpenTelemetryAioServerInterceptor(
25+
grpc.aio.ServerInterceptor, OpenTelemetryServerInterceptor
26+
):
27+
"""
28+
An AsyncIO gRPC server interceptor, to add OpenTelemetry.
29+
Usage::
30+
tracer = some OpenTelemetry tracer
31+
interceptors = [
32+
AsyncOpenTelemetryServerInterceptor(tracer),
33+
]
34+
server = aio.server(
35+
futures.ThreadPoolExecutor(max_workers=concurrency),
36+
interceptors = (interceptors,))
37+
"""
38+
39+
async def intercept_service(self, continuation, handler_call_details):
40+
def telemetry_wrapper(behavior, request_streaming, response_streaming):
41+
# handle streaming responses specially
42+
if response_streaming:
43+
return self._intercept_server_stream(
44+
behavior,
45+
handler_call_details,
46+
)
47+
else:
48+
return self._intercept_server_unary(
49+
behavior,
50+
handler_call_details,
51+
)
52+
53+
next_handler = await continuation(handler_call_details)
54+
55+
return _wrap_rpc_behavior(next_handler, telemetry_wrapper)
56+
57+
def _intercept_server_unary(self, behavior, handler_call_details):
58+
async def _unary_interceptor(request_or_iterator, context):
59+
with self._set_remote_context(context):
60+
with self._start_span(
61+
handler_call_details,
62+
context,
63+
set_status_on_exception=False,
64+
) as span:
65+
# wrap the context
66+
context = _OpenTelemetryServicerContext(context, span)
67+
68+
# And now we run the actual RPC.
69+
try:
70+
return await behavior(request_or_iterator, context)
71+
72+
except Exception as error:
73+
# Bare exceptions are likely to be gRPC aborts, which
74+
# we handle in our context wrapper.
75+
# Here, we're interested in uncaught exceptions.
76+
# pylint:disable=unidiomatic-typecheck
77+
if type(error) != Exception:
78+
span.record_exception(error)
79+
raise error
80+
81+
return _unary_interceptor
82+
83+
def _intercept_server_stream(self, behavior, handler_call_details):
84+
async def _stream_interceptor(request_or_iterator, context):
85+
with self._set_remote_context(context):
86+
with self._start_span(
87+
handler_call_details,
88+
context,
89+
set_status_on_exception=False,
90+
) as span:
91+
context = _OpenTelemetryServicerContext(context, span)
92+
93+
try:
94+
async for response in behavior(
95+
request_or_iterator, context
96+
):
97+
yield response
98+
99+
except Exception as error:
100+
# pylint:disable=unidiomatic-typecheck
101+
if type(error) != Exception:
102+
span.record_exception(error)
103+
raise error
104+
105+
return _stream_interceptor

0 commit comments

Comments
 (0)