Skip to content

Commit 2ce0b69

Browse files
author
Yoshi Yamaguchi
authored
Add gRPC filter (#1241)
1 parent d5ada28 commit 2ce0b69

File tree

8 files changed

+1615
-22
lines changed

8 files changed

+1615
-22
lines changed

CHANGELOG.md

+13-11
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88
## [Unreleased]
99

1010
### Added
11+
12+
- `opentelemetry-instrumentation-grpc` add supports to filter requests to instrument. ([#1241](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1241))
1113
- Flask sqlalchemy psycopg2 integration
1214
([#1224](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1224))
1315
- Add metric instrumentation in fastapi
@@ -125,7 +127,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
125127

126128
### Added
127129
- `opentelemetry-instrumentation-starlette` Capture custom request/response headers in span attributes
128-
([#1046])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1046)
130+
([#1046](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1046))
129131

130132
### Fixed
131133
- Prune autoinstrumentation sitecustomize module directory from PYTHONPATH immediately
@@ -148,35 +150,35 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
148150
### Added
149151

150152
- `opentelemetry-instrumentation-fastapi` Capture custom request/response headers in span attributes
151-
([#1032])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1032)
153+
([#1032](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1032))
152154
- `opentelemetry-instrumentation-django` Capture custom request/response headers in span attributes
153-
([#1024])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1024)
155+
([#1024](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1024))
154156
- `opentelemetry-instrumentation-asgi` Capture custom request/response headers in span attributes
155-
([#1004])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1004)
157+
([#1004](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1004))
156158
- `opentelemetry-instrumentation-psycopg2` extended the sql commenter support of dbapi into psycopg2
157159
([#940](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/940))
158160
- `opentelemetry-instrumentation-falcon` Add support for falcon==1.4.1
159-
([#1000])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1000)
161+
([#1000](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1000))
160162
- `opentelemetry-instrumentation-falcon` Falcon: Capture custom request/response headers in span attributes
161-
([#1003])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1003)
163+
([#1003](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1003))
162164
- `opentelemetry-instrumentation-elasticsearch` no longer creates unique span names by including search target, replaces them with `<target>` and puts the value in attribute `elasticsearch.target`
163165
([#1018](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1018))
164166
- `opentelemetry-instrumentation-pyramid` Handle non-HTTPException exceptions
165167
([#1001](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1001))
166168
- `opentelemetry-instrumentation-system-metrics` restore `SystemMetrics` instrumentation as `SystemMetricsInstrumentor`
167169
([#1012](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1012))
168170
- `opentelemetry-instrumentation-pyramid` Pyramid: Capture custom request/response headers in span attributes
169-
([#1022])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1022)
171+
([#1022](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1022))
170172

171173

172174
## [1.10.0-0.29b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.10.0-0.29b0) - 2022-03-10
173175

174176
- `opentelemetry-instrumentation-wsgi` Capture custom request/response headers in span attributes
175-
([#925])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/925)
177+
([#925](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/925))
176178
- `opentelemetry-instrumentation-flask` Flask: Capture custom request/response headers in span attributes
177-
([#952])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/952)
179+
([#952](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/952))
178180
- `opentelemetry-instrumentation-tornado` Tornado: Capture custom request/response headers in span attributes
179-
([#950])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/950)
181+
([#950](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/950))
180182

181183
### Added
182184

@@ -971,7 +973,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
971973
([#572](https://github.com/open-telemetry/opentelemetry-python/pull/572))
972974
- `opentelemetry-ext-sqlite3` Initial release
973975
- `opentelemetry-ext-psycopg2` Implement instrumentor interface, enabling auto-instrumentation
974-
([#694]https://github.com/open-telemetry/opentelemetry-python/pull/694)
976+
([#694](https://github.com/open-telemetry/opentelemetry-python/pull/694))
975977
- `opentelemetry-ext-asgi` Add ASGI middleware
976978
([#716](https://github.com/open-telemetry/opentelemetry-python/pull/716))
977979
- `opentelemetry-ext-django` Add exclude list for paths and hosts to prevent from tracing

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py

+126-8
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,62 @@ def serve():
118118
server = grpc.server(futures.ThreadPoolExecutor(),
119119
interceptors = [server_interceptor()])
120120
121+
Filters
122+
-------
123+
124+
If you prefer to filter specific requests to be instrumented, you can specify
125+
the condition by assigning filters to instrumentors.
126+
127+
You can write a global server instrumentor as follows:
128+
129+
.. code-block::
130+
131+
from opentelemetry.instrumentation.grpc import filters, GrpcInstrumentorServer
132+
133+
grpc_server_instrumentor = GrpcInstrumentorServer(
134+
filter_ = filters.any_of(
135+
filters.method_name("SimpleMethod"),
136+
filters.method_name("ComplexMethod"),
137+
)
138+
)
139+
grpc_server_instrumentor.instrument()
140+
141+
You can also use the filters directly on the provided interceptors:
142+
143+
.. code-block::
144+
145+
my_interceptor = server_interceptor(
146+
filter_ = filters.negate(filters.method_name("TestMethod"))
147+
)
148+
server = grpc.server(futures.ThreadPoolExecutor(),
149+
interceptors = [my_interceptor])
150+
151+
``filter_`` option also applies to both global and manual client intrumentors.
152+
153+
154+
Environment variable
155+
--------------------
156+
157+
If you'd like to exclude specific services for the instrumentations, you can use
158+
``OTEL_PYTHON_GRPC_EXCLUDED_SERVICES`` environment variables.
159+
160+
For example, if you assign ``"GRPCTestServer,GRPCHealthServer"`` to the variable,
161+
then the global interceptor automatically adds the filters to exclude requests to
162+
services ``GRPCTestServer`` and ``GRPCHealthServer``.
163+
121164
"""
122-
from typing import Collection
165+
import os
166+
from typing import Callable, Collection, List, Union
123167

124168
import grpc # pylint:disable=import-self
125169
from wrapt import wrap_function_wrapper as _wrap
126170

127171
from opentelemetry import trace
172+
from opentelemetry.instrumentation.grpc.filters import (
173+
any_of,
174+
negate,
175+
service_name,
176+
)
128177
from opentelemetry.instrumentation.grpc.grpcext import intercept_channel
129178
from opentelemetry.instrumentation.grpc.package import _instruments
130179
from opentelemetry.instrumentation.grpc.version import __version__
@@ -145,10 +194,26 @@ class GrpcInstrumentorServer(BaseInstrumentor):
145194
grpc_server_instrumentor = GrpcInstrumentorServer()
146195
grpc_server_instrumentor.instrument()
147196
197+
If you want to add a filter that only intercept requests
198+
to match the condition, pass ``filter_`` to GrpcInstrumentorServer.
199+
200+
grpc_server_instrumentor = GrpcInstrumentorServer(
201+
filter_=filters.method_prefix("SimpleMethod"))
202+
grpc_server_instrumentor.instrument()
203+
148204
"""
149205

150206
# pylint:disable=attribute-defined-outside-init, redefined-outer-name
151207

208+
def __init__(self, filter_=None):
209+
excluded_service_filter = _excluded_service_filter()
210+
if excluded_service_filter is not None:
211+
if filter_ is None:
212+
filter_ = excluded_service_filter
213+
else:
214+
filter_ = any_of(filter_, excluded_service_filter)
215+
self._filter = filter_
216+
152217
def instrumentation_dependencies(self) -> Collection[str]:
153218
return _instruments
154219

@@ -160,11 +225,16 @@ def server(*args, **kwargs):
160225
if "interceptors" in kwargs:
161226
# add our interceptor as the first
162227
kwargs["interceptors"].insert(
163-
0, server_interceptor(tracer_provider=tracer_provider)
228+
0,
229+
server_interceptor(
230+
tracer_provider=tracer_provider, filter_=self._filter
231+
),
164232
)
165233
else:
166234
kwargs["interceptors"] = [
167-
server_interceptor(tracer_provider=tracer_provider)
235+
server_interceptor(
236+
tracer_provider=tracer_provider, filter_=self._filter
237+
)
168238
]
169239
return self._original_func(*args, **kwargs)
170240

@@ -183,8 +253,25 @@ class GrpcInstrumentorClient(BaseInstrumentor):
183253
grpc_client_instrumentor = GrpcInstrumentorClient()
184254
grpc_client_instrumentor.instrument()
185255
256+
If you want to add a filter that only intercept requests
257+
to match the condition, pass ``filter_`` option to GrpcInstrumentorClient.
258+
259+
grpc_client_instrumentor = GrpcInstrumentorClient(
260+
filter_=filters.negate(filters.health_check())
261+
)
262+
grpc_client_instrumentor.instrument()
263+
186264
"""
187265

266+
def __init__(self, filter_=None):
267+
excluded_service_filter = _excluded_service_filter()
268+
if excluded_service_filter is not None:
269+
if filter_ is None:
270+
filter_ = excluded_service_filter
271+
else:
272+
filter_ = any_of(filter_, excluded_service_filter)
273+
self._filter = filter_
274+
188275
# Figures out which channel type we need to wrap
189276
def _which_channel(self, kwargs):
190277
# handle legacy argument
@@ -221,37 +308,68 @@ def wrapper_fn(self, original_func, instance, args, kwargs):
221308
tracer_provider = kwargs.get("tracer_provider")
222309
return intercept_channel(
223310
channel,
224-
client_interceptor(tracer_provider=tracer_provider),
311+
client_interceptor(
312+
tracer_provider=tracer_provider,
313+
filter_=self._filter,
314+
),
225315
)
226316

227317

228-
def client_interceptor(tracer_provider=None):
318+
def client_interceptor(tracer_provider=None, filter_=None):
229319
"""Create a gRPC client channel interceptor.
230320
231321
Args:
232322
tracer: The tracer to use to create client-side spans.
233323
324+
filter_: filter function that returns True if gRPC requests
325+
matches the condition. Default is None and intercept
326+
all requests.
327+
234328
Returns:
235329
An invocation-side interceptor object.
236330
"""
237331
from . import _client
238332

239333
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
240334

241-
return _client.OpenTelemetryClientInterceptor(tracer)
335+
return _client.OpenTelemetryClientInterceptor(tracer, filter_=filter_)
242336

243337

244-
def server_interceptor(tracer_provider=None):
338+
def server_interceptor(tracer_provider=None, filter_=None):
245339
"""Create a gRPC server interceptor.
246340
247341
Args:
248342
tracer: The tracer to use to create server-side spans.
249343
344+
filter_: filter function that returns True if gRPC requests
345+
matches the condition. Default is None and intercept
346+
all requests.
347+
250348
Returns:
251349
A service-side interceptor object.
252350
"""
253351
from . import _server
254352

255353
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
256354

257-
return _server.OpenTelemetryServerInterceptor(tracer)
355+
return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_)
356+
357+
358+
def _excluded_service_filter() -> Union[Callable[[object], bool], None]:
359+
services = _parse_services(
360+
os.environ.get("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES", "")
361+
)
362+
if len(services) == 0:
363+
return None
364+
filters = (service_name(srv) for srv in services)
365+
return negate(any_of(*filters))
366+
367+
368+
def _parse_services(excluded_services: str) -> List[str]:
369+
if excluded_services != "":
370+
excluded_service_list = [
371+
s.strip() for s in excluded_services.split(",")
372+
]
373+
else:
374+
excluded_service_list = []
375+
return excluded_service_list

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ def callback(response_future):
6262
class OpenTelemetryClientInterceptor(
6363
grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor
6464
):
65-
def __init__(self, tracer):
65+
def __init__(self, tracer, filter_=None):
6666
self._tracer = tracer
67+
self._filter = filter_
6768

6869
def _start_span(self, method, **kwargs):
6970
service, meth = method.lstrip("/").split("/", 1)
@@ -148,6 +149,8 @@ def _intercept(self, request, metadata, client_info, invoker):
148149
return self._trace_result(span, rpc_info, result)
149150

150151
def intercept_unary(self, request, metadata, client_info, invoker):
152+
if self._filter is not None and not self._filter(client_info):
153+
return invoker(request, metadata)
151154
return self._intercept(request, metadata, client_info, invoker)
152155

153156
# For RPCs that stream responses, the result can be a generator. To record
@@ -188,6 +191,9 @@ def intercept_stream(
188191
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
189192
return invoker(request_or_iterator, metadata)
190193

194+
if self._filter is not None and not self._filter(client_info):
195+
return invoker(request_or_iterator, metadata)
196+
191197
if client_info.is_server_stream:
192198
return self._intercept_server_stream(
193199
request_or_iterator, metadata, client_info, invoker

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,10 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
173173
Usage::
174174
175175
tracer = some OpenTelemetry tracer
176+
filter = filters.negate(filters.method_name("service.Foo"))
176177
177178
interceptors = [
178-
OpenTelemetryServerInterceptor(tracer),
179+
OpenTelemetryServerInterceptor(tracer, filter),
179180
]
180181
181182
server = grpc.server(
@@ -184,8 +185,9 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
184185
185186
"""
186187

187-
def __init__(self, tracer):
188+
def __init__(self, tracer, filter_=None):
188189
self._tracer = tracer
190+
self._filter = filter_
189191

190192
@contextmanager
191193
def _set_remote_context(self, servicer_context):
@@ -261,6 +263,9 @@ def _start_span(
261263
)
262264

263265
def intercept_service(self, continuation, handler_call_details):
266+
if self._filter is not None and not self._filter(handler_call_details):
267+
return continuation(handler_call_details)
268+
264269
def telemetry_wrapper(behavior, request_streaming, response_streaming):
265270
def telemetry_interceptor(request_or_iterator, context):
266271

0 commit comments

Comments
 (0)