Skip to content

Commit 3e58f20

Browse files
committed
feat: add support for asynchronous rest streaming
1 parent 7ccbf57 commit 3e58f20

File tree

6 files changed

+668
-123
lines changed

6 files changed

+668
-123
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Helpers for server-side streaming in REST."""
16+
17+
from collections import deque
18+
import string
19+
from typing import Deque, Union
20+
21+
import proto
22+
import google.protobuf.message
23+
from google.protobuf.json_format import Parse
24+
25+
26+
class BaseResponseIterator:
27+
"""Base Iterator over REST API responses. This class should not be used directly.
28+
29+
Args:
30+
response (requests.Response): An API response object.
31+
response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response
32+
class expected to be returned from an API.
33+
34+
Raises:
35+
ValueError: If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`.
36+
"""
37+
38+
def __init__(
39+
self,
40+
response_message_cls: Union[proto.Message, google.protobuf.message.Message],
41+
):
42+
self._response_message_cls = response_message_cls
43+
# Inner iterator over HTTP response's content.
44+
# self._response_itr = self._response.iter_content(decode_unicode=True)
45+
# Contains a list of JSON responses ready to be sent to user.
46+
self._ready_objs: Deque[str] = deque()
47+
# Current JSON response being built.
48+
self._obj = ""
49+
# Keeps track of the nesting level within a JSON object.
50+
self._level = 0
51+
# Keeps track whether HTTP response is currently sending values
52+
# inside of a string value.
53+
self._in_string = False
54+
# Whether an escape symbol "\" was encountered.
55+
self._escape_next = False
56+
57+
def _process_chunk(self, chunk: str):
58+
if self._level == 0:
59+
if chunk[0] != "[":
60+
raise ValueError(
61+
"Can only parse array of JSON objects, instead got %s" % chunk
62+
)
63+
for char in chunk:
64+
if char == "{":
65+
if self._level == 1:
66+
# Level 1 corresponds to the outermost JSON object
67+
# (i.e. the one we care about).
68+
self._obj = ""
69+
if not self._in_string:
70+
self._level += 1
71+
self._obj += char
72+
elif char == "}":
73+
self._obj += char
74+
if not self._in_string:
75+
self._level -= 1
76+
if not self._in_string and self._level == 1:
77+
self._ready_objs.append(self._obj)
78+
elif char == '"':
79+
# Helps to deal with an escaped quotes inside of a string.
80+
if not self._escape_next:
81+
self._in_string = not self._in_string
82+
self._obj += char
83+
elif char in string.whitespace:
84+
if self._in_string:
85+
self._obj += char
86+
elif char == "[":
87+
if self._level == 0:
88+
self._level += 1
89+
else:
90+
self._obj += char
91+
elif char == "]":
92+
if self._level == 1:
93+
self._level -= 1
94+
else:
95+
self._obj += char
96+
else:
97+
self._obj += char
98+
self._escape_next = not self._escape_next if char == "\\" else False
99+
100+
def _grab(self):
101+
# Add extra quotes to make json.loads happy.
102+
if issubclass(self._response_message_cls, proto.Message):
103+
return self._response_message_cls.from_json(self._ready_objs.popleft())
104+
elif issubclass(self._response_message_cls, google.protobuf.message.Message):
105+
return Parse(self._ready_objs.popleft(), self._response_message_cls())
106+
else:
107+
raise ValueError(
108+
"Response message class must be a subclass of proto.Message or google.protobuf.message.Message."
109+
)

google/api_core/rest_streaming.py

Lines changed: 6 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,15 @@
1414

1515
"""Helpers for server-side streaming in REST."""
1616

17-
from collections import deque
18-
import string
19-
from typing import Deque, Union
17+
from typing import Union
2018

2119
import proto
2220
import requests
2321
import google.protobuf.message
24-
from google.protobuf.json_format import Parse
22+
from google.api_core._rest_streaming_base import BaseResponseIterator
2523

2624

27-
class ResponseIterator:
25+
class ResponseIterator(BaseResponseIterator):
2826
"""Iterator over REST API responses.
2927
3028
Args:
@@ -42,68 +40,16 @@ def __init__(
4240
response_message_cls: Union[proto.Message, google.protobuf.message.Message],
4341
):
4442
self._response = response
45-
self._response_message_cls = response_message_cls
4643
# Inner iterator over HTTP response's content.
4744
self._response_itr = self._response.iter_content(decode_unicode=True)
48-
# Contains a list of JSON responses ready to be sent to user.
49-
self._ready_objs: Deque[str] = deque()
50-
# Current JSON response being built.
51-
self._obj = ""
52-
# Keeps track of the nesting level within a JSON object.
53-
self._level = 0
54-
# Keeps track whether HTTP response is currently sending values
55-
# inside of a string value.
56-
self._in_string = False
57-
# Whether an escape symbol "\" was encountered.
58-
self._escape_next = False
45+
super(ResponseIterator, self).__init__(
46+
response_message_cls=response_message_cls
47+
)
5948

6049
def cancel(self):
6150
"""Cancel existing streaming operation."""
6251
self._response.close()
6352

64-
def _process_chunk(self, chunk: str):
65-
if self._level == 0:
66-
if chunk[0] != "[":
67-
raise ValueError(
68-
"Can only parse array of JSON objects, instead got %s" % chunk
69-
)
70-
for char in chunk:
71-
if char == "{":
72-
if self._level == 1:
73-
# Level 1 corresponds to the outermost JSON object
74-
# (i.e. the one we care about).
75-
self._obj = ""
76-
if not self._in_string:
77-
self._level += 1
78-
self._obj += char
79-
elif char == "}":
80-
self._obj += char
81-
if not self._in_string:
82-
self._level -= 1
83-
if not self._in_string and self._level == 1:
84-
self._ready_objs.append(self._obj)
85-
elif char == '"':
86-
# Helps to deal with an escaped quotes inside of a string.
87-
if not self._escape_next:
88-
self._in_string = not self._in_string
89-
self._obj += char
90-
elif char in string.whitespace:
91-
if self._in_string:
92-
self._obj += char
93-
elif char == "[":
94-
if self._level == 0:
95-
self._level += 1
96-
else:
97-
self._obj += char
98-
elif char == "]":
99-
if self._level == 1:
100-
self._level -= 1
101-
else:
102-
self._obj += char
103-
else:
104-
self._obj += char
105-
self._escape_next = not self._escape_next if char == "\\" else False
106-
10753
def __next__(self):
10854
while not self._ready_objs:
10955
try:
@@ -115,18 +61,5 @@ def __next__(self):
11561
raise e
11662
return self._grab()
11763

118-
def _grab(self):
119-
# Add extra quotes to make json.loads happy.
120-
if issubclass(self._response_message_cls, proto.Message):
121-
return self._response_message_cls.from_json(
122-
self._ready_objs.popleft(), ignore_unknown_fields=True
123-
)
124-
elif issubclass(self._response_message_cls, google.protobuf.message.Message):
125-
return Parse(self._ready_objs.popleft(), self._response_message_cls())
126-
else:
127-
raise ValueError(
128-
"Response message class must be a subclass of proto.Message or google.protobuf.message.Message."
129-
)
130-
13164
def __iter__(self):
13265
return self
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Helpers for server-side asynchronous streaming in REST."""
16+
17+
from typing import Union
18+
19+
import proto
20+
import google.protobuf.message
21+
import google.auth.aio.transport
22+
from google.api_core._rest_streaming_base import BaseResponseIterator
23+
24+
25+
class AsyncResponseIterator(BaseResponseIterator):
26+
"""Asynchronous Iterator over REST API responses.
27+
28+
Args:
29+
response (google.auth.aio.transport.Response): An API response object.
30+
response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response
31+
class expected to be returned from an API.
32+
33+
Raises:
34+
ValueError:
35+
- If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`.
36+
- If `response` is not an instance of `google.auth.aio.transport.aiohttp.Response`.
37+
"""
38+
39+
def __init__(
40+
self,
41+
response: google.auth.aio.transport.Response,
42+
response_message_cls: Union[proto.Message, google.protobuf.message.Message],
43+
):
44+
self._response = response
45+
self._chunk_size = 1024
46+
self._response_itr = None
47+
super(AsyncResponseIterator, self).__init__(
48+
response_message_cls=response_message_cls
49+
)
50+
51+
async def _create_response_iter(self):
52+
53+
if not isinstance(self._response, google.auth.aio.transport.Response):
54+
raise ValueError(
55+
"Response must be of type google.auth.aio.transport.Response"
56+
)
57+
58+
# TODO (ohmayr): Ideally the response from auth should expose an attribute
59+
# to read streaming response iterator directly i.e.
60+
#
61+
# self -> google.auth.aio.transport.aiohttp.Response:
62+
# def stream_response_itr(self, chunk_size):
63+
# return self._response.content.iter_chunked(chunk_size)
64+
#
65+
# self -> google.auth.aio.transport.httpx.Response:
66+
# def stream_response_itr(self, chunk_size):
67+
# return self._response.aiter_raw(chunk_size)
68+
#
69+
# this way we can just call the property directly to get the appropriate
70+
# response iterator without having to deal with the underlying API differences
71+
# or alternatively, having to check the type of inherited response types here
72+
# i.e we could do: self._response_itr = self._response.stream_response_itr(self._chunk_size)
73+
74+
content = self._response.content
75+
if hasattr(content, "iter_chunked"):
76+
return content.iter_chunked(self._chunk_size)
77+
else:
78+
# TODO (ohmayr): since iter_chunked is only available in an instance of
79+
# google.auth.aio.transport.aiohttp.Response, we indirectly depend on
80+
# on the inherited class.
81+
raise ValueError(
82+
f"Unsupported Response type: {type(self._response)}. Expected google.auth.aio.transport.aiohttp.Response."
83+
)
84+
85+
async def cancel(self):
86+
"""Cancel existing streaming operation."""
87+
await self._response.close()
88+
89+
async def __anext__(self):
90+
while not self._ready_objs:
91+
try:
92+
if not self._response_itr:
93+
self._response_itr = await self._create_response_iter()
94+
# TODO (ohmayr): cleanup
95+
# content = await self._response.content
96+
# self._response_itr = content.iter_chunked(self._chunk_size)
97+
98+
chunk = await self._response_itr.__anext__()
99+
chunk = chunk.decode("utf-8")
100+
self._process_chunk(chunk)
101+
except StopAsyncIteration as e:
102+
if self._level > 0:
103+
raise ValueError("i Unfinished stream: %s" % self._obj)
104+
raise e
105+
except ValueError as e:
106+
raise e
107+
return self._grab()
108+
109+
def __aiter__(self):
110+
return self
111+
112+
async def __aexit__(self, exc_type, exc, tb):
113+
"""Cancel existing streaming operation."""
114+
await self._response.close()

0 commit comments

Comments
 (0)