Skip to content

Commit e197685

Browse files
author
Jon Wayne Parrott
authored
Use a class to wrap grpc streaming errors instead of monkey-patching (#4995)
1 parent 35e87e0 commit e197685

File tree

2 files changed

+103
-21
lines changed

2 files changed

+103
-21
lines changed

google/api_core/grpc_helpers.py

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,55 @@ def error_remapped_callable(*args, **kwargs):
5858
return error_remapped_callable
5959

6060

61+
class _StreamingResponseIterator(grpc.Call):
62+
def __init__(self, wrapped):
63+
self._wrapped = wrapped
64+
65+
def __iter__(self):
66+
"""This iterator is also an iterable that returns itself."""
67+
return self
68+
69+
def next(self):
70+
"""Get the next response from the stream.
71+
72+
Returns:
73+
protobuf.Message: A single response from the stream.
74+
"""
75+
try:
76+
return six.next(self._wrapped)
77+
except grpc.RpcError as exc:
78+
six.raise_from(exceptions.from_grpc_error(exc), exc)
79+
80+
# Alias needed for Python 2/3 support.
81+
__next__ = next
82+
83+
# grpc.Call & grpc.RpcContext interface
84+
85+
def add_callback(self, callback):
86+
return self._wrapped.add_callback(callback)
87+
88+
def cancel(self):
89+
return self._wrapped.cancel()
90+
91+
def code(self):
92+
return self._wrapped.code()
93+
94+
def details(self):
95+
return self._wrapped.details()
96+
97+
def initial_metadata(self):
98+
return self._wrapped.initial_metadata()
99+
100+
def is_active(self):
101+
return self._wrapped.is_active()
102+
103+
def time_remaining(self):
104+
return self._wrapped.time_remaining()
105+
106+
def trailing_metadata(self):
107+
return self._wrapped.trailing_metadata()
108+
109+
61110
def _wrap_stream_errors(callable_):
62111
"""Wrap errors for Unary-Stream and Stream-Stream gRPC callables.
63112
@@ -71,18 +120,7 @@ def _wrap_stream_errors(callable_):
71120
def error_remapped_callable(*args, **kwargs):
72121
try:
73122
result = callable_(*args, **kwargs)
74-
# Note: we are patching the private grpc._channel._Rendezvous._next
75-
# method as magic methods (__next__ in this case) can not be
76-
# patched on a per-instance basis (see
77-
# https://docs.python.org/3/reference/datamodel.html
78-
# #special-lookup).
79-
# In an ideal world, gRPC would return a *specific* interface
80-
# from *StreamMultiCallables, but they return a God class that's
81-
# a combination of basically every interface in gRPC making it
82-
# untenable for us to implement a wrapper object using the same
83-
# interface.
84-
result._next = _wrap_unary_errors(result._next)
85-
return result
123+
return _StreamingResponseIterator(result)
86124
except grpc.RpcError as exc:
87125
six.raise_from(exceptions.from_grpc_error(exc), exc)
88126

tests/unit/test_grpc_helpers.py

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,57 @@ def test_wrap_unary_errors():
6666
assert exc_info.value.response == grpc_error
6767

6868

69+
def test_wrap_stream_okay():
70+
expected_responses = [1, 2, 3]
71+
callable_ = mock.Mock(spec=[
72+
'__call__'], return_value=iter(expected_responses))
73+
74+
wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
75+
76+
got_iterator = wrapped_callable(1, 2, three='four')
77+
78+
responses = list(got_iterator)
79+
80+
callable_.assert_called_once_with(1, 2, three='four')
81+
assert responses == expected_responses
82+
83+
84+
def test_wrap_stream_iterable_iterface():
85+
response_iter = mock.create_autospec(grpc.Call, instance=True)
86+
callable_ = mock.Mock(spec=['__call__'], return_value=response_iter)
87+
88+
wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
89+
90+
got_iterator = wrapped_callable()
91+
92+
callable_.assert_called_once_with()
93+
94+
# Check each aliased method in the grpc.Call interface
95+
got_iterator.add_callback(mock.sentinel.callback)
96+
response_iter.add_callback.assert_called_once_with(mock.sentinel.callback)
97+
98+
got_iterator.cancel()
99+
response_iter.cancel.assert_called_once_with()
100+
101+
got_iterator.code()
102+
response_iter.code.assert_called_once_with()
103+
104+
got_iterator.details()
105+
response_iter.details.assert_called_once_with()
106+
107+
got_iterator.initial_metadata()
108+
response_iter.initial_metadata.assert_called_once_with()
109+
110+
got_iterator.is_active()
111+
response_iter.is_active.assert_called_once_with()
112+
113+
got_iterator.time_remaining()
114+
response_iter.time_remaining.assert_called_once_with()
115+
116+
got_iterator.trailing_metadata()
117+
response_iter.trailing_metadata.assert_called_once_with()
118+
119+
69120
def test_wrap_stream_errors_invocation():
70121
grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)
71122
callable_ = mock.Mock(spec=['__call__'], side_effect=grpc_error)
@@ -83,16 +134,10 @@ class RpcResponseIteratorImpl(object):
83134
def __init__(self, exception):
84135
self._exception = exception
85136

86-
# Note: This matches grpc._channel._Rendezvous._next which is what is
87-
# patched by _wrap_stream_errors.
88-
def _next(self):
137+
def next(self):
89138
raise self._exception
90139

91-
def __next__(self): # pragma: NO COVER
92-
return self._next()
93-
94-
def next(self): # pragma: NO COVER
95-
return self._next()
140+
__next__ = next
96141

97142

98143
def test_wrap_stream_errors_iterator():
@@ -107,7 +152,6 @@ def test_wrap_stream_errors_iterator():
107152
with pytest.raises(exceptions.ServiceUnavailable) as exc_info:
108153
next(got_iterator)
109154

110-
assert got_iterator == response_iter
111155
callable_.assert_called_once_with(1, 2, three='four')
112156
assert exc_info.value.response == grpc_error
113157

0 commit comments

Comments
 (0)