Skip to content

Commit d840347

Browse files
author
Mihir Gore
committed
Keep client interceptors in sync with grpc client interceptors
1 parent 01db88b commit d840347

File tree

3 files changed

+161
-19
lines changed

3 files changed

+161
-19
lines changed

Diff for: CHANGELOG.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6161
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))
6262
- `opentelemetry-instrumenation-flask` now supports trace response headers.
6363
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))
64-
64+
- `opentelemetry-instrumentation-grpc` Keep client interceptor in sync with grpc client interceptors.
65+
([#442](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/442))
66+
6567
### Removed
6668
- Remove `http.status_text` from span attributes
6769
([#406](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/406))

Diff for: instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py

+115-18
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
# limitations under the License.
1414

1515
# pylint:disable=relative-beyond-top-level
16-
# pylint:disable=arguments-differ
1716
# pylint:disable=no-member
18-
# pylint:disable=signature-differs
1917

2018
"""Implementation of gRPC Python interceptors."""
2119

@@ -48,32 +46,71 @@ def __init__(self, method, base_callable, interceptor):
4846
self._base_callable = base_callable
4947
self._interceptor = interceptor
5048

51-
def __call__(self, request, timeout=None, metadata=None, credentials=None):
49+
def __call__(
50+
self,
51+
request,
52+
timeout=None,
53+
metadata=None,
54+
credentials=None,
55+
wait_for_ready=None,
56+
compression=None,
57+
):
5258
def invoker(request, metadata):
53-
return self._base_callable(request, timeout, metadata, credentials)
59+
return self._base_callable(
60+
request,
61+
timeout,
62+
metadata,
63+
credentials,
64+
wait_for_ready,
65+
compression,
66+
)
5467

5568
client_info = _UnaryClientInfo(self._method, timeout)
5669
return self._interceptor.intercept_unary(
5770
request, metadata, client_info, invoker
5871
)
5972

6073
def with_call(
61-
self, request, timeout=None, metadata=None, credentials=None
74+
self,
75+
request,
76+
timeout=None,
77+
metadata=None,
78+
credentials=None,
79+
wait_for_ready=None,
80+
compression=None,
6281
):
6382
def invoker(request, metadata):
6483
return self._base_callable.with_call(
65-
request, timeout, metadata, credentials
84+
request,
85+
timeout,
86+
metadata,
87+
credentials,
88+
wait_for_ready,
89+
compression,
6690
)
6791

6892
client_info = _UnaryClientInfo(self._method, timeout)
6993
return self._interceptor.intercept_unary(
7094
request, metadata, client_info, invoker
7195
)
7296

73-
def future(self, request, timeout=None, metadata=None, credentials=None):
97+
def future(
98+
self,
99+
request,
100+
timeout=None,
101+
metadata=None,
102+
credentials=None,
103+
wait_for_ready=None,
104+
compression=None,
105+
):
74106
def invoker(request, metadata):
75107
return self._base_callable.future(
76-
request, timeout, metadata, credentials
108+
request,
109+
timeout,
110+
metadata,
111+
credentials,
112+
wait_for_ready,
113+
compression,
77114
)
78115

79116
client_info = _UnaryClientInfo(self._method, timeout)
@@ -88,9 +125,25 @@ def __init__(self, method, base_callable, interceptor):
88125
self._base_callable = base_callable
89126
self._interceptor = interceptor
90127

91-
def __call__(self, request, timeout=None, metadata=None, credentials=None):
128+
def __call__(
129+
self,
130+
request,
131+
timeout=None,
132+
metadata=None,
133+
credentials=None,
134+
wait_for_ready=None,
135+
compression=None,
136+
137+
):
92138
def invoker(request, metadata):
93-
return self._base_callable(request, timeout, metadata, credentials)
139+
return self._base_callable(
140+
request,
141+
timeout,
142+
metadata,
143+
credentials,
144+
wait_for_ready,
145+
compression,
146+
)
94147

95148
client_info = _StreamClientInfo(self._method, False, True, timeout)
96149
return self._interceptor.intercept_stream(
@@ -105,11 +158,22 @@ def __init__(self, method, base_callable, interceptor):
105158
self._interceptor = interceptor
106159

107160
def __call__(
108-
self, request_iterator, timeout=None, metadata=None, credentials=None
161+
self,
162+
request_iterator,
163+
timeout=None,
164+
metadata=None,
165+
credentials=None,
166+
wait_for_ready=None,
167+
compression=None,
109168
):
110169
def invoker(request_iterator, metadata):
111170
return self._base_callable(
112-
request_iterator, timeout, metadata, credentials
171+
request_iterator,
172+
timeout,
173+
metadata,
174+
credentials,
175+
wait_for_ready,
176+
compression,
113177
)
114178

115179
client_info = _StreamClientInfo(self._method, True, False, timeout)
@@ -118,11 +182,22 @@ def invoker(request_iterator, metadata):
118182
)
119183

120184
def with_call(
121-
self, request_iterator, timeout=None, metadata=None, credentials=None
185+
self,
186+
request_iterator,
187+
timeout=None,
188+
metadata=None,
189+
credentials=None,
190+
wait_for_ready=None,
191+
compression=None,
122192
):
123193
def invoker(request_iterator, metadata):
124194
return self._base_callable.with_call(
125-
request_iterator, timeout, metadata, credentials
195+
request_iterator,
196+
timeout,
197+
metadata,
198+
credentials,
199+
wait_for_ready,
200+
compression,
126201
)
127202

128203
client_info = _StreamClientInfo(self._method, True, False, timeout)
@@ -131,11 +206,22 @@ def invoker(request_iterator, metadata):
131206
)
132207

133208
def future(
134-
self, request_iterator, timeout=None, metadata=None, credentials=None
209+
self,
210+
request_iterator,
211+
timeout=None,
212+
metadata=None,
213+
credentials=None,
214+
wait_for_ready=None,
215+
compression=None,
135216
):
136217
def invoker(request_iterator, metadata):
137218
return self._base_callable.future(
138-
request_iterator, timeout, metadata, credentials
219+
request_iterator,
220+
timeout,
221+
metadata,
222+
credentials,
223+
wait_for_ready,
224+
compression,
139225
)
140226

141227
client_info = _StreamClientInfo(self._method, True, False, timeout)
@@ -151,11 +237,22 @@ def __init__(self, method, base_callable, interceptor):
151237
self._interceptor = interceptor
152238

153239
def __call__(
154-
self, request_iterator, timeout=None, metadata=None, credentials=None
240+
self,
241+
request_iterator,
242+
timeout=None,
243+
metadata=None,
244+
credentials=None,
245+
wait_for_ready=None,
246+
compression=None,
155247
):
156248
def invoker(request_iterator, metadata):
157249
return self._base_callable(
158-
request_iterator, timeout, metadata, credentials
250+
request_iterator,
251+
timeout,
252+
metadata,
253+
credentials,
254+
wait_for_ready,
255+
compression,
159256
)
160257

161258
client_info = _StreamClientInfo(self._method, True, True, timeout)

Diff for: instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py

+43
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,57 @@
4040
from ._server import create_test_server
4141
from .protobuf.test_server_pb2 import Request
4242

43+
# User defined interceptor. Is used in the tests along with the opentelemetry client interceptor.
44+
class Interceptor(
45+
grpc.UnaryUnaryClientInterceptor,
46+
grpc.UnaryStreamClientInterceptor,
47+
grpc.StreamUnaryClientInterceptor,
48+
grpc.StreamStreamClientInterceptor,
49+
):
50+
def __init__(self):
51+
pass
52+
53+
def intercept_unary_unary(
54+
self, continuation, client_call_details, request
55+
):
56+
return self._intercept_call(continuation, client_call_details, request)
57+
58+
def intercept_unary_stream(
59+
self, continuation, client_call_details, request
60+
):
61+
return self._intercept_call(continuation, client_call_details, request)
62+
63+
def intercept_stream_unary(
64+
self, continuation, client_call_details, request_iterator
65+
):
66+
return self._intercept_call(
67+
continuation, client_call_details, request_iterator
68+
)
69+
70+
def intercept_stream_stream(
71+
self, continuation, client_call_details, request_iterator
72+
):
73+
return self._intercept_call(
74+
continuation, client_call_details, request_iterator
75+
)
76+
77+
@staticmethod
78+
def _intercept_call(
79+
continuation, client_call_details, request_or_iterator
80+
):
81+
return continuation(client_call_details, request_or_iterator)
82+
4383

4484
class TestClientProto(TestBase):
4585
def setUp(self):
4686
super().setUp()
4787
GrpcInstrumentorClient().instrument()
4888
self.server = create_test_server(25565)
4989
self.server.start()
90+
# use a user defined interceptor along with the opentelemetry client interceptor
91+
interceptors = [Interceptor()]
5092
self.channel = grpc.insecure_channel("localhost:25565")
93+
self.channel = grpc.intercept_channel(self.channel, *interceptors)
5194
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
5295

5396
def tearDown(self):

0 commit comments

Comments
 (0)