Skip to content

Commit 74e0b0f

Browse files
authored
feat: allow disabling response stream pre-fetch (googleapis#30)
Closes googleapis#25. This PR adds the ability to disable automatically pre-fetching the first item of a stream returned by `*-Stream` gRPC callables. This hook will be used in PubSub to fix the [stalled stream issue](googleapis/python-pubsub#93), while also not affecting Firestore, since the default behavior is preserved. I realize the fix is far from ideal, but it's the least ugly among the approaches I tried, e.g. somehow passing the flag through `ResumableBidiRpc` (it's a messy rabbit hole). On the PubSub side monkeypatching the generated SubscriberClient will be needed, but it's a (relatively) clean one-liner: ```patch diff --git google/cloud/pubsub_v1/gapic/subscriber_client.py google/cloud/pubsub_v1/gapic/subscriber_client.py index e98a686..1d6c058 100644 --- google/cloud/pubsub_v1/gapic/subscriber_client.py +++ google/cloud/pubsub_v1/gapic/subscriber_client.py @@ -1169,6 +1169,8 @@ class SubscriberClient(object): default_timeout=self._method_configs["StreamingPull"].timeout, client_info=self._client_info, ) + # TODO: explain this monkeypatch! + self.transport.streaming_pull._prefetch_first_result_ = False return self._inner_api_calls["streaming_pull"]( requests, retry=retry, timeout=timeout, metadata=metadata ``` If/when we merge this, we should also release it, and then we can add `!= 1.17.0` to the `google-api-core` version pin in PubSub. ### PR checklist - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-api-core/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary)
1 parent 945bafc commit 74e0b0f

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

google/api_core/grpc_helpers.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,15 @@ def error_remapped_callable(*args, **kwargs):
6262

6363

6464
class _StreamingResponseIterator(grpc.Call):
65-
def __init__(self, wrapped):
65+
def __init__(self, wrapped, prefetch_first_result=True):
6666
self._wrapped = wrapped
6767

6868
# This iterator is used in a retry context, and returned outside after init.
6969
# gRPC will not throw an exception until the stream is consumed, so we need
7070
# to retrieve the first result, in order to fail, in order to trigger a retry.
7171
try:
72-
self._stored_first_result = six.next(self._wrapped)
72+
if prefetch_first_result:
73+
self._stored_first_result = six.next(self._wrapped)
7374
except TypeError:
7475
# It is possible the wrapped method isn't an iterable (a grpc.Call
7576
# for instance). If this happens don't store the first result.
@@ -141,7 +142,12 @@ def _wrap_stream_errors(callable_):
141142
def error_remapped_callable(*args, **kwargs):
142143
try:
143144
result = callable_(*args, **kwargs)
144-
return _StreamingResponseIterator(result)
145+
# Auto-fetching the first result causes PubSub client's streaming pull
146+
# to hang when re-opening the stream, thus we need examine the hacky
147+
# hidden flag to see if pre-fetching is disabled.
148+
# https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
149+
prefetch_first = getattr(callable_, "_prefetch_first_result_", True)
150+
return _StreamingResponseIterator(result, prefetch_first_result=prefetch_first)
145151
except grpc.RpcError as exc:
146152
six.raise_from(exceptions.from_grpc_error(exc), exc)
147153

tests/unit/test_grpc_helpers.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,19 @@ def test_wrap_stream_okay():
8080
assert responses == expected_responses
8181

8282

83+
def test_wrap_stream_prefetch_disabled():
84+
responses = [1, 2, 3]
85+
iter_responses = iter(responses)
86+
callable_ = mock.Mock(spec=["__call__"], return_value=iter_responses)
87+
callable_._prefetch_first_result_ = False
88+
89+
wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
90+
wrapped_callable(1, 2, three="four")
91+
92+
assert list(iter_responses) == responses # no items should have been pre-fetched
93+
callable_.assert_called_once_with(1, 2, three="four")
94+
95+
8396
def test_wrap_stream_iterable_iterface():
8497
response_iter = mock.create_autospec(grpc.Call, instance=True)
8598
callable_ = mock.Mock(spec=["__call__"], return_value=response_iter)

0 commit comments

Comments
 (0)