Skip to content

Commit fdeb437

Browse files
committed
Merge branch 'dups' into add-support-for-async-rest-streaming
2 parents b5ddd62 + 2014ef6 commit fdeb437

File tree

2 files changed

+264
-0
lines changed

2 files changed

+264
-0
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Helpers for server-side streaming in REST."""
16+
17+
from collections import deque
18+
import string
19+
from typing import Deque, Union
20+
21+
import proto
22+
import requests
23+
import google.protobuf.message
24+
from google.protobuf.json_format import Parse
25+
26+
27+
class ResponseIterator:
28+
"""Iterator over REST API responses.
29+
30+
Args:
31+
response (requests.Response): An API response object.
32+
response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response
33+
class expected to be returned from an API.
34+
35+
Raises:
36+
ValueError: If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`.
37+
"""
38+
39+
def __init__(
40+
self,
41+
response: requests.Response,
42+
response_message_cls: Union[proto.Message, google.protobuf.message.Message],
43+
):
44+
self._response = response
45+
self._response_message_cls = response_message_cls
46+
# Inner iterator over HTTP response's content.
47+
self._response_itr = self._response.iter_content(decode_unicode=True)
48+
# Contains a list of JSON responses ready to be sent to user.
49+
self._ready_objs: Deque[str] = deque()
50+
# Current JSON response being built.
51+
self._obj = ""
52+
# Keeps track of the nesting level within a JSON object.
53+
self._level = 0
54+
# Keeps track whether HTTP response is currently sending values
55+
# inside of a string value.
56+
self._in_string = False
57+
# Whether an escape symbol "\" was encountered.
58+
self._escape_next = False
59+
60+
def cancel(self):
61+
"""Cancel existing streaming operation."""
62+
self._response.close()
63+
64+
def _process_chunk(self, chunk: str):
65+
if self._level == 0:
66+
if chunk[0] != "[":
67+
raise ValueError(
68+
"Can only parse array of JSON objects, instead got %s" % chunk
69+
)
70+
for char in chunk:
71+
if char == "{":
72+
if self._level == 1:
73+
# Level 1 corresponds to the outermost JSON object
74+
# (i.e. the one we care about).
75+
self._obj = ""
76+
if not self._in_string:
77+
self._level += 1
78+
self._obj += char
79+
elif char == "}":
80+
self._obj += char
81+
if not self._in_string:
82+
self._level -= 1
83+
if not self._in_string and self._level == 1:
84+
self._ready_objs.append(self._obj)
85+
elif char == '"':
86+
# Helps to deal with an escaped quotes inside of a string.
87+
if not self._escape_next:
88+
self._in_string = not self._in_string
89+
self._obj += char
90+
elif char in string.whitespace:
91+
if self._in_string:
92+
self._obj += char
93+
elif char == "[":
94+
if self._level == 0:
95+
self._level += 1
96+
else:
97+
self._obj += char
98+
elif char == "]":
99+
if self._level == 1:
100+
self._level -= 1
101+
else:
102+
self._obj += char
103+
else:
104+
self._obj += char
105+
self._escape_next = not self._escape_next if char == "\\" else False
106+
107+
def __next__(self):
108+
while not self._ready_objs:
109+
try:
110+
chunk = next(self._response_itr)
111+
self._process_chunk(chunk)
112+
except StopIteration as e:
113+
if self._level > 0:
114+
raise ValueError("Unfinished stream: %s" % self._obj)
115+
raise e
116+
return self._grab()
117+
118+
def _grab(self):
119+
# Add extra quotes to make json.loads happy.
120+
if issubclass(self._response_message_cls, proto.Message):
121+
return self._response_message_cls.from_json(
122+
self._ready_objs.popleft(), ignore_unknown_fields=True
123+
)
124+
elif issubclass(self._response_message_cls, google.protobuf.message.Message):
125+
return Parse(self._ready_objs.popleft(), self._response_message_cls())
126+
else:
127+
raise ValueError(
128+
"Response message class must be a subclass of proto.Message or google.protobuf.message.Message."
129+
)
130+
131+
def __iter__(self):
132+
return self
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Helpers for server-side streaming in REST."""
16+
17+
from collections import deque
18+
import string
19+
from typing import Deque, Union
20+
21+
import proto
22+
import requests
23+
import google.protobuf.message
24+
from google.protobuf.json_format import Parse
25+
26+
27+
class ResponseIterator:
28+
"""Iterator over REST API responses.
29+
30+
Args:
31+
response (requests.Response): An API response object.
32+
response_message_cls (Union[proto.Message, google.protobuf.message.Message]): A response
33+
class expected to be returned from an API.
34+
35+
Raises:
36+
ValueError: If `response_message_cls` is not a subclass of `proto.Message` or `google.protobuf.message.Message`.
37+
"""
38+
39+
def __init__(
40+
self,
41+
response: requests.Response,
42+
response_message_cls: Union[proto.Message, google.protobuf.message.Message],
43+
):
44+
self._response = response
45+
self._response_message_cls = response_message_cls
46+
# Inner iterator over HTTP response's content.
47+
self._response_itr = self._response.iter_content(decode_unicode=True)
48+
# Contains a list of JSON responses ready to be sent to user.
49+
self._ready_objs: Deque[str] = deque()
50+
# Current JSON response being built.
51+
self._obj = ""
52+
# Keeps track of the nesting level within a JSON object.
53+
self._level = 0
54+
# Keeps track whether HTTP response is currently sending values
55+
# inside of a string value.
56+
self._in_string = False
57+
# Whether an escape symbol "\" was encountered.
58+
self._escape_next = False
59+
60+
def cancel(self):
61+
"""Cancel existing streaming operation."""
62+
self._response.close()
63+
64+
def _process_chunk(self, chunk: str):
65+
if self._level == 0:
66+
if chunk[0] != "[":
67+
raise ValueError(
68+
"Can only parse array of JSON objects, instead got %s" % chunk
69+
)
70+
for char in chunk:
71+
if char == "{":
72+
if self._level == 1:
73+
# Level 1 corresponds to the outermost JSON object
74+
# (i.e. the one we care about).
75+
self._obj = ""
76+
if not self._in_string:
77+
self._level += 1
78+
self._obj += char
79+
elif char == "}":
80+
self._obj += char
81+
if not self._in_string:
82+
self._level -= 1
83+
if not self._in_string and self._level == 1:
84+
self._ready_objs.append(self._obj)
85+
elif char == '"':
86+
# Helps to deal with an escaped quotes inside of a string.
87+
if not self._escape_next:
88+
self._in_string = not self._in_string
89+
self._obj += char
90+
elif char in string.whitespace:
91+
if self._in_string:
92+
self._obj += char
93+
elif char == "[":
94+
if self._level == 0:
95+
self._level += 1
96+
else:
97+
self._obj += char
98+
elif char == "]":
99+
if self._level == 1:
100+
self._level -= 1
101+
else:
102+
self._obj += char
103+
else:
104+
self._obj += char
105+
self._escape_next = not self._escape_next if char == "\\" else False
106+
107+
def __next__(self):
108+
while not self._ready_objs:
109+
try:
110+
chunk = next(self._response_itr)
111+
self._process_chunk(chunk)
112+
except StopIteration as e:
113+
if self._level > 0:
114+
raise ValueError("Unfinished stream: %s" % self._obj)
115+
raise e
116+
return self._grab()
117+
118+
def _grab(self):
119+
# Add extra quotes to make json.loads happy.
120+
if issubclass(self._response_message_cls, proto.Message):
121+
return self._response_message_cls.from_json(
122+
self._ready_objs.popleft(), ignore_unknown_fields=True
123+
)
124+
elif issubclass(self._response_message_cls, google.protobuf.message.Message):
125+
return Parse(self._ready_objs.popleft(), self._response_message_cls())
126+
else:
127+
raise ValueError(
128+
"Response message class must be a subclass of proto.Message or google.protobuf.message.Message."
129+
)
130+
131+
def __iter__(self):
132+
return self

0 commit comments

Comments
 (0)