Skip to content

Commit fc58032

Browse files
authored
automatic metrics for gRPC client interceptor (#917)
1 parent eec050b commit fc58032

File tree

6 files changed

+415
-80
lines changed

6 files changed

+415
-80
lines changed

Diff for: ext/opentelemetry-ext-grpc/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ Released 2020-07-28
1111
- Add gRPC client and server instrumentors
1212
([788](https://github.com/open-telemetry/opentelemetry-python/pull/788))
1313

14+
- Add metric recording (bytes in/out, errors, latency) to gRPC client
15+
1416
## 0.8b0
1517

1618
Released 2020-05-27

Diff for: ext/opentelemetry-ext-grpc/setup.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ package_dir=
4141
packages=find_namespace:
4242
install_requires =
4343
opentelemetry-api == 0.12.dev0
44+
opentelemetry-sdk == 0.12.dev0
4445
grpcio ~= 1.27
4546

4647
[options.extras_require]

Diff for: ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py

+42-11
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
SimpleExportSpanProcessor,
3434
)
3535
36+
from opentelemetry import metrics
37+
from opentelemetry.sdk.metrics import MeterProvider
38+
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
39+
3640
try:
3741
from .gen import helloworld_pb2, helloworld_pb2_grpc
3842
except ImportError:
@@ -42,7 +46,12 @@
4246
trace.get_tracer_provider().add_span_processor(
4347
SimpleExportSpanProcessor(ConsoleSpanExporter())
4448
)
45-
instrumentor = GrpcInstrumentorClient()
49+
50+
# Set meter provider to opentelemetry-sdk's MeterProvider
51+
metrics.set_meter_provider(MeterProvider())
52+
53+
# Optional - export GRPC specific metrics (latency, bytes in/out, errors) by passing an exporter
54+
instrumentor = GrpcInstrumentorClient(exporter=ConsoleMetricsExporter(), interval=10)
4655
instrumentor.instrument()
4756
4857
def run():
@@ -109,6 +118,7 @@ def serve():
109118
serve()
110119
"""
111120
from contextlib import contextmanager
121+
from functools import partial
112122

113123
import grpc
114124
from wrapt import wrap_function_wrapper as _wrap
@@ -139,11 +149,21 @@ def wrapper_fn(self, original_func, instance, args, kwargs):
139149

140150
class GrpcInstrumentorClient(BaseInstrumentor):
141151
def _instrument(self, **kwargs):
152+
exporter = kwargs.get("exporter", None)
153+
interval = kwargs.get("interval", 30)
142154
if kwargs.get("channel_type") == "secure":
143-
_wrap("grpc", "secure_channel", self.wrapper_fn)
155+
_wrap(
156+
"grpc",
157+
"secure_channel",
158+
partial(self.wrapper_fn, exporter, interval),
159+
)
144160

145161
else:
146-
_wrap("grpc", "insecure_channel", self.wrapper_fn)
162+
_wrap(
163+
"grpc",
164+
"insecure_channel",
165+
partial(self.wrapper_fn, exporter, interval),
166+
)
147167

148168
def _uninstrument(self, **kwargs):
149169
if kwargs.get("channel_type") == "secure":
@@ -152,17 +172,28 @@ def _uninstrument(self, **kwargs):
152172
else:
153173
unwrap(grpc, "insecure_channel")
154174

155-
@contextmanager
156-
def wrapper_fn(self, original_func, instance, args, kwargs):
157-
with original_func(*args, **kwargs) as channel:
158-
yield intercept_channel(channel, client_interceptor())
159-
160-
161-
def client_interceptor(tracer_provider=None):
175+
def wrapper_fn(
176+
self, exporter, interval, original_func, instance, args, kwargs
177+
):
178+
channel = original_func(*args, **kwargs)
179+
tracer_provider = kwargs.get("tracer_provider")
180+
return intercept_channel(
181+
channel,
182+
client_interceptor(
183+
tracer_provider=tracer_provider,
184+
exporter=exporter,
185+
interval=interval,
186+
),
187+
)
188+
189+
190+
def client_interceptor(tracer_provider=None, exporter=None, interval=30):
162191
"""Create a gRPC client channel interceptor.
163192
164193
Args:
165194
tracer: The tracer to use to create client-side spans.
195+
exporter: The exporter that will receive client metrics
196+
interval: Time between every export call
166197
167198
Returns:
168199
An invocation-side interceptor object.
@@ -171,7 +202,7 @@ def client_interceptor(tracer_provider=None):
171202

172203
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
173204

174-
return _client.OpenTelemetryClientInterceptor(tracer)
205+
return _client.OpenTelemetryClientInterceptor(tracer, exporter, interval)
175206

176207

177208
def server_interceptor(tracer_provider=None):

Diff for: ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py

+129-56
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@
2424

2525
import grpc
2626

27-
from opentelemetry import propagators, trace
27+
from opentelemetry import metrics, propagators, trace
28+
from opentelemetry.sdk.metrics.export.controller import PushController
2829
from opentelemetry.trace.status import Status, StatusCanonicalCode
2930

3031
from . import grpcext
31-
from ._utilities import RpcInfo
32+
from ._utilities import RpcInfo, TimedMetricRecorder
3233

3334

3435
class _GuardedSpan:
@@ -63,7 +64,7 @@ def append_metadata(
6364
propagators.inject(append_metadata, metadata)
6465

6566

66-
def _make_future_done_callback(span, rpc_info):
67+
def _make_future_done_callback(span, rpc_info, client_info, metrics_recorder):
6768
def callback(response_future):
6869
with span:
6970
code = response_future.code()
@@ -72,28 +73,45 @@ def callback(response_future):
7273
return
7374
response = response_future.result()
7475
rpc_info.response = response
76+
if "ByteSize" in dir(response):
77+
metrics_recorder.record_bytes_in(
78+
response.ByteSize(), client_info.full_method
79+
)
7580

7681
return callback
7782

7883

7984
class OpenTelemetryClientInterceptor(
8085
grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor
8186
):
82-
def __init__(self, tracer):
87+
def __init__(self, tracer, exporter, interval):
8388
self._tracer = tracer
8489

90+
self._meter = None
91+
if exporter and interval:
92+
self._meter = metrics.get_meter(__name__)
93+
self.controller = PushController(
94+
meter=self._meter, exporter=exporter, interval=interval
95+
)
96+
self._metrics_recorder = TimedMetricRecorder(self._meter, "client")
97+
8598
def _start_span(self, method):
8699
return self._tracer.start_as_current_span(
87100
name=method, kind=trace.SpanKind.CLIENT
88101
)
89102

90103
# pylint:disable=no-self-use
91-
def _trace_result(self, guarded_span, rpc_info, result):
104+
def _trace_result(self, guarded_span, rpc_info, result, client_info):
92105
# If the RPC is called asynchronously, release the guard and add a
93106
# callback so that the span can be finished once the future is done.
94107
if isinstance(result, grpc.Future):
95108
result.add_done_callback(
96-
_make_future_done_callback(guarded_span.release(), rpc_info)
109+
_make_future_done_callback(
110+
guarded_span.release(),
111+
rpc_info,
112+
client_info,
113+
self._metrics_recorder,
114+
)
97115
)
98116
return result
99117
response = result
@@ -104,37 +122,62 @@ def _trace_result(self, guarded_span, rpc_info, result):
104122
if isinstance(result, tuple):
105123
response = result[0]
106124
rpc_info.response = response
125+
126+
if "ByteSize" in dir(response):
127+
self._metrics_recorder.record_bytes_in(
128+
response.ByteSize(), client_info.full_method
129+
)
107130
return result
108131

109132
def _start_guarded_span(self, *args, **kwargs):
110133
return _GuardedSpan(self._start_span(*args, **kwargs))
111134

135+
def _bytes_out_iterator_wrapper(self, iterator, client_info):
136+
for request in iterator:
137+
if "ByteSize" in dir(request):
138+
self._metrics_recorder.record_bytes_out(
139+
request.ByteSize(), client_info.full_method
140+
)
141+
yield request
142+
112143
def intercept_unary(self, request, metadata, client_info, invoker):
113144
if not metadata:
114145
mutable_metadata = OrderedDict()
115146
else:
116147
mutable_metadata = OrderedDict(metadata)
117148

118149
with self._start_guarded_span(client_info.full_method) as guarded_span:
119-
_inject_span_context(mutable_metadata)
120-
metadata = tuple(mutable_metadata.items())
121-
122-
rpc_info = RpcInfo(
123-
full_method=client_info.full_method,
124-
metadata=metadata,
125-
timeout=client_info.timeout,
126-
request=request,
127-
)
128-
129-
try:
130-
result = invoker(request, metadata)
131-
except grpc.RpcError as exc:
132-
guarded_span.generated_span.set_status(
133-
Status(StatusCanonicalCode(exc.code().value[0]))
150+
with self._metrics_recorder.record_latency(
151+
client_info.full_method
152+
):
153+
_inject_span_context(mutable_metadata)
154+
metadata = tuple(mutable_metadata.items())
155+
156+
# If protobuf is used, we can record the bytes in/out. Otherwise, we have no way
157+
# to get the size of the request/response properly, so don't record anything
158+
if "ByteSize" in dir(request):
159+
self._metrics_recorder.record_bytes_out(
160+
request.ByteSize(), client_info.full_method
161+
)
162+
163+
rpc_info = RpcInfo(
164+
full_method=client_info.full_method,
165+
metadata=metadata,
166+
timeout=client_info.timeout,
167+
request=request,
134168
)
135-
raise
136169

137-
return self._trace_result(guarded_span, rpc_info, result)
170+
try:
171+
result = invoker(request, metadata)
172+
except grpc.RpcError as exc:
173+
guarded_span.generated_span.set_status(
174+
Status(StatusCanonicalCode(exc.code().value[0]))
175+
)
176+
raise
177+
178+
return self._trace_result(
179+
guarded_span, rpc_info, result, client_info
180+
)
138181

139182
# For RPCs that stream responses, the result can be a generator. To record
140183
# the span across the generated responses and detect any errors, we wrap
@@ -148,25 +191,44 @@ def _intercept_server_stream(
148191
mutable_metadata = OrderedDict(metadata)
149192

150193
with self._start_span(client_info.full_method) as span:
151-
_inject_span_context(mutable_metadata)
152-
metadata = tuple(mutable_metadata.items())
153-
rpc_info = RpcInfo(
154-
full_method=client_info.full_method,
155-
metadata=metadata,
156-
timeout=client_info.timeout,
157-
)
158-
if client_info.is_client_stream:
159-
rpc_info.request = request_or_iterator
160-
161-
try:
162-
result = invoker(request_or_iterator, metadata)
163-
for response in result:
164-
yield response
165-
except grpc.RpcError as exc:
166-
span.set_status(
167-
Status(StatusCanonicalCode(exc.code().value[0]))
194+
with self._metrics_recorder.record_latency(
195+
client_info.full_method
196+
):
197+
_inject_span_context(mutable_metadata)
198+
metadata = tuple(mutable_metadata.items())
199+
rpc_info = RpcInfo(
200+
full_method=client_info.full_method,
201+
metadata=metadata,
202+
timeout=client_info.timeout,
168203
)
169-
raise
204+
205+
if client_info.is_client_stream:
206+
rpc_info.request = request_or_iterator
207+
request_or_iterator = self._bytes_out_iterator_wrapper(
208+
request_or_iterator, client_info
209+
)
210+
else:
211+
if "ByteSize" in dir(request_or_iterator):
212+
self._metrics_recorder.record_bytes_out(
213+
request_or_iterator.ByteSize(),
214+
client_info.full_method,
215+
)
216+
217+
try:
218+
result = invoker(request_or_iterator, metadata)
219+
220+
# Rewrap the result stream into a generator, and record the bytes received
221+
for response in result:
222+
if "ByteSize" in dir(response):
223+
self._metrics_recorder.record_bytes_in(
224+
response.ByteSize(), client_info.full_method
225+
)
226+
yield response
227+
except grpc.RpcError as exc:
228+
span.set_status(
229+
Status(StatusCanonicalCode(exc.code().value[0]))
230+
)
231+
raise
170232

171233
def intercept_stream(
172234
self, request_or_iterator, metadata, client_info, invoker
@@ -182,21 +244,32 @@ def intercept_stream(
182244
mutable_metadata = OrderedDict(metadata)
183245

184246
with self._start_guarded_span(client_info.full_method) as guarded_span:
185-
_inject_span_context(mutable_metadata)
186-
metadata = tuple(mutable_metadata.items())
187-
rpc_info = RpcInfo(
188-
full_method=client_info.full_method,
189-
metadata=metadata,
190-
timeout=client_info.timeout,
191-
request=request_or_iterator,
192-
)
247+
with self._metrics_recorder.record_latency(
248+
client_info.full_method
249+
):
250+
_inject_span_context(mutable_metadata)
251+
metadata = tuple(mutable_metadata.items())
252+
rpc_info = RpcInfo(
253+
full_method=client_info.full_method,
254+
metadata=metadata,
255+
timeout=client_info.timeout,
256+
request=request_or_iterator,
257+
)
258+
259+
rpc_info.request = request_or_iterator
193260

194-
try:
195-
result = invoker(request_or_iterator, metadata)
196-
except grpc.RpcError as exc:
197-
guarded_span.generated_span.set_status(
198-
Status(StatusCanonicalCode(exc.code().value[0]))
261+
request_or_iterator = self._bytes_out_iterator_wrapper(
262+
request_or_iterator, client_info
199263
)
200-
raise
201264

202-
return self._trace_result(guarded_span, rpc_info, result)
265+
try:
266+
result = invoker(request_or_iterator, metadata)
267+
except grpc.RpcError as exc:
268+
guarded_span.generated_span.set_status(
269+
Status(StatusCanonicalCode(exc.code().value[0]))
270+
)
271+
raise
272+
273+
return self._trace_result(
274+
guarded_span, rpc_info, result, client_info
275+
)

0 commit comments

Comments
 (0)