Skip to content

Commit b8fc7b2

Browse files
author
Mihir Gore
committed
Keep client interceptors in sync with grpc client interceptors
1 parent 01db88b commit b8fc7b2

File tree

3 files changed

+161
-19
lines changed

3 files changed

+161
-19
lines changed

CHANGELOG.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6161
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))
6262
- `opentelemetry-instrumenation-flask` now supports trace response headers.
6363
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))
64-
64+
- `opentelemetry-instrumentation-grpc` Keep client interceptor in sync with grpc client interceptors.
65+
([#442](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/442))
66+
6567
### Removed
6668
- Remove `http.status_text` from span attributes
6769
([#406](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/406))

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py

+114-18
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
# limitations under the License.
1414

1515
# pylint:disable=relative-beyond-top-level
16-
# pylint:disable=arguments-differ
1716
# pylint:disable=no-member
18-
# pylint:disable=signature-differs
1917

2018
"""Implementation of gRPC Python interceptors."""
2119

@@ -48,32 +46,71 @@ def __init__(self, method, base_callable, interceptor):
4846
self._base_callable = base_callable
4947
self._interceptor = interceptor
5048

51-
def __call__(self, request, timeout=None, metadata=None, credentials=None):
49+
def __call__(
50+
self,
51+
request,
52+
timeout=None,
53+
metadata=None,
54+
credentials=None,
55+
wait_for_ready=None,
56+
compression=None,
57+
):
5258
def invoker(request, metadata):
53-
return self._base_callable(request, timeout, metadata, credentials)
59+
return self._base_callable(
60+
request,
61+
timeout,
62+
metadata,
63+
credentials,
64+
wait_for_ready,
65+
compression,
66+
)
5467

5568
client_info = _UnaryClientInfo(self._method, timeout)
5669
return self._interceptor.intercept_unary(
5770
request, metadata, client_info, invoker
5871
)
5972

6073
def with_call(
61-
self, request, timeout=None, metadata=None, credentials=None
74+
self,
75+
request,
76+
timeout=None,
77+
metadata=None,
78+
credentials=None,
79+
wait_for_ready=None,
80+
compression=None,
6281
):
6382
def invoker(request, metadata):
6483
return self._base_callable.with_call(
65-
request, timeout, metadata, credentials
84+
request,
85+
timeout,
86+
metadata,
87+
credentials,
88+
wait_for_ready,
89+
compression,
6690
)
6791

6892
client_info = _UnaryClientInfo(self._method, timeout)
6993
return self._interceptor.intercept_unary(
7094
request, metadata, client_info, invoker
7195
)
7296

73-
def future(self, request, timeout=None, metadata=None, credentials=None):
97+
def future(
98+
self,
99+
request,
100+
timeout=None,
101+
metadata=None,
102+
credentials=None,
103+
wait_for_ready=None,
104+
compression=None,
105+
):
74106
def invoker(request, metadata):
75107
return self._base_callable.future(
76-
request, timeout, metadata, credentials
108+
request,
109+
timeout,
110+
metadata,
111+
credentials,
112+
wait_for_ready,
113+
compression,
77114
)
78115

79116
client_info = _UnaryClientInfo(self._method, timeout)
@@ -88,9 +125,24 @@ def __init__(self, method, base_callable, interceptor):
88125
self._base_callable = base_callable
89126
self._interceptor = interceptor
90127

91-
def __call__(self, request, timeout=None, metadata=None, credentials=None):
128+
def __call__(
129+
self,
130+
request,
131+
timeout=None,
132+
metadata=None,
133+
credentials=None,
134+
wait_for_ready=None,
135+
compression=None,
136+
):
92137
def invoker(request, metadata):
93-
return self._base_callable(request, timeout, metadata, credentials)
138+
return self._base_callable(
139+
request,
140+
timeout,
141+
metadata,
142+
credentials,
143+
wait_for_ready,
144+
compression,
145+
)
94146

95147
client_info = _StreamClientInfo(self._method, False, True, timeout)
96148
return self._interceptor.intercept_stream(
@@ -105,11 +157,22 @@ def __init__(self, method, base_callable, interceptor):
105157
self._interceptor = interceptor
106158

107159
def __call__(
108-
self, request_iterator, timeout=None, metadata=None, credentials=None
160+
self,
161+
request_iterator,
162+
timeout=None,
163+
metadata=None,
164+
credentials=None,
165+
wait_for_ready=None,
166+
compression=None,
109167
):
110168
def invoker(request_iterator, metadata):
111169
return self._base_callable(
112-
request_iterator, timeout, metadata, credentials
170+
request_iterator,
171+
timeout,
172+
metadata,
173+
credentials,
174+
wait_for_ready,
175+
compression,
113176
)
114177

115178
client_info = _StreamClientInfo(self._method, True, False, timeout)
@@ -118,11 +181,22 @@ def invoker(request_iterator, metadata):
118181
)
119182

120183
def with_call(
121-
self, request_iterator, timeout=None, metadata=None, credentials=None
184+
self,
185+
request_iterator,
186+
timeout=None,
187+
metadata=None,
188+
credentials=None,
189+
wait_for_ready=None,
190+
compression=None,
122191
):
123192
def invoker(request_iterator, metadata):
124193
return self._base_callable.with_call(
125-
request_iterator, timeout, metadata, credentials
194+
request_iterator,
195+
timeout,
196+
metadata,
197+
credentials,
198+
wait_for_ready,
199+
compression,
126200
)
127201

128202
client_info = _StreamClientInfo(self._method, True, False, timeout)
@@ -131,11 +205,22 @@ def invoker(request_iterator, metadata):
131205
)
132206

133207
def future(
134-
self, request_iterator, timeout=None, metadata=None, credentials=None
208+
self,
209+
request_iterator,
210+
timeout=None,
211+
metadata=None,
212+
credentials=None,
213+
wait_for_ready=None,
214+
compression=None,
135215
):
136216
def invoker(request_iterator, metadata):
137217
return self._base_callable.future(
138-
request_iterator, timeout, metadata, credentials
218+
request_iterator,
219+
timeout,
220+
metadata,
221+
credentials,
222+
wait_for_ready,
223+
compression,
139224
)
140225

141226
client_info = _StreamClientInfo(self._method, True, False, timeout)
@@ -151,11 +236,22 @@ def __init__(self, method, base_callable, interceptor):
151236
self._interceptor = interceptor
152237

153238
def __call__(
154-
self, request_iterator, timeout=None, metadata=None, credentials=None
239+
self,
240+
request_iterator,
241+
timeout=None,
242+
metadata=None,
243+
credentials=None,
244+
wait_for_ready=None,
245+
compression=None,
155246
):
156247
def invoker(request_iterator, metadata):
157248
return self._base_callable(
158-
request_iterator, timeout, metadata, credentials
249+
request_iterator,
250+
timeout,
251+
metadata,
252+
credentials,
253+
wait_for_ready,
254+
compression,
159255
)
160256

161257
client_info = _StreamClientInfo(self._method, True, True, timeout)

instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py

+44
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,57 @@
4141
from .protobuf.test_server_pb2 import Request
4242

4343

44+
# User defined interceptor. Is used in the tests along with the opentelemetry client interceptor.
45+
class Interceptor(
46+
grpc.UnaryUnaryClientInterceptor,
47+
grpc.UnaryStreamClientInterceptor,
48+
grpc.StreamUnaryClientInterceptor,
49+
grpc.StreamStreamClientInterceptor,
50+
):
51+
def __init__(self):
52+
pass
53+
54+
def intercept_unary_unary(
55+
self, continuation, client_call_details, request
56+
):
57+
return self._intercept_call(continuation, client_call_details, request)
58+
59+
def intercept_unary_stream(
60+
self, continuation, client_call_details, request
61+
):
62+
return self._intercept_call(continuation, client_call_details, request)
63+
64+
def intercept_stream_unary(
65+
self, continuation, client_call_details, request_iterator
66+
):
67+
return self._intercept_call(
68+
continuation, client_call_details, request_iterator
69+
)
70+
71+
def intercept_stream_stream(
72+
self, continuation, client_call_details, request_iterator
73+
):
74+
return self._intercept_call(
75+
continuation, client_call_details, request_iterator
76+
)
77+
78+
@staticmethod
79+
def _intercept_call(
80+
continuation, client_call_details, request_or_iterator
81+
):
82+
return continuation(client_call_details, request_or_iterator)
83+
84+
4485
class TestClientProto(TestBase):
4586
def setUp(self):
4687
super().setUp()
4788
GrpcInstrumentorClient().instrument()
4889
self.server = create_test_server(25565)
4990
self.server.start()
91+
# use a user defined interceptor along with the opentelemetry client interceptor
92+
interceptors = [Interceptor()]
5093
self.channel = grpc.insecure_channel("localhost:25565")
94+
self.channel = grpc.intercept_channel(self.channel, *interceptors)
5195
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
5296

5397
def tearDown(self):

0 commit comments

Comments
 (0)