Skip to content

Commit 7d8d580

Browse files
authored
feat: third batch of AsyncIO integration (googleapis#29)
* LRO client * gRPC wrappers & helpers * With unit tests & docs
1 parent dd9b2f3 commit 7d8d580

File tree

10 files changed

+1332
-9
lines changed

10 files changed

+1332
-9
lines changed

google/api_core/gapic_v1/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@
2323

2424
if sys.version_info >= (3, 6):
2525
from google.api_core.gapic_v1 import config_async # noqa: F401
26+
from google.api_core.gapic_v1 import method_async # noqa: F401
2627
__all__.append("config_async")
28+
__all__.append("method_async")
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""AsyncIO helpers for wrapping gRPC methods with common functionality.
15+
16+
This is used by gapic clients to provide common error mapping, retry, timeout,
17+
pagination, and long-running operations to gRPC methods.
18+
"""
19+
20+
from google.api_core import general_helpers, grpc_helpers_async
21+
from google.api_core.gapic_v1 import client_info
22+
from google.api_core.gapic_v1.method import (_GapicCallable, # noqa: F401
23+
DEFAULT,
24+
USE_DEFAULT_METADATA)
25+
26+
27+
def wrap_method(
28+
func,
29+
default_retry=None,
30+
default_timeout=None,
31+
client_info=client_info.DEFAULT_CLIENT_INFO,
32+
):
33+
"""Wrap an async RPC method with common behavior.
34+
35+
Returns:
36+
Callable: A new callable that takes optional ``retry`` and ``timeout``
37+
arguments and applies the common error mapping, retry, timeout,
38+
and metadata behavior to the low-level RPC method.
39+
"""
40+
func = grpc_helpers_async.wrap_errors(func)
41+
42+
metadata = [client_info.to_grpc_metadata()] if client_info is not None else None
43+
44+
return general_helpers.wraps(func)(_GapicCallable(
45+
func, default_retry, default_timeout, metadata=metadata))

google/api_core/grpc_helpers.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,10 @@ def wrap_errors(callable_):
170170
return _wrap_unary_errors(callable_)
171171

172172

173-
def create_channel(
174-
target, credentials=None, scopes=None, ssl_credentials=None, **kwargs
175-
):
176-
"""Create a secure channel with credentials.
173+
def _create_composite_credentials(credentials=None, scopes=None, ssl_credentials=None):
174+
"""Create the composite credentials for secure channels.
177175
178176
Args:
179-
target (str): The target service address in the format 'hostname:port'.
180177
credentials (google.auth.credentials.Credentials): The credentials. If
181178
not specified, then this function will attempt to ascertain the
182179
credentials from the environment using :func:`google.auth.default`.
@@ -185,11 +182,9 @@ def create_channel(
185182
are passed to :func:`google.auth.default`.
186183
ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
187184
credentials. This can be used to specify different certificates.
188-
kwargs: Additional key-word args passed to
189-
:func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
190185
191186
Returns:
192-
grpc.Channel: The created channel.
187+
grpc.ChannelCredentials: The composed channel credentials object.
193188
"""
194189
if credentials is None:
195190
credentials, _ = google.auth.default(scopes=scopes)
@@ -212,10 +207,34 @@ def create_channel(
212207
ssl_credentials = grpc.ssl_channel_credentials()
213208

214209
# Combine the ssl credentials and the authorization credentials.
215-
composite_credentials = grpc.composite_channel_credentials(
210+
return grpc.composite_channel_credentials(
216211
ssl_credentials, google_auth_credentials
217212
)
218213

214+
215+
def create_channel(target, credentials=None, scopes=None, ssl_credentials=None, **kwargs):
216+
"""Create a secure channel with credentials.
217+
218+
Args:
219+
target (str): The target service address in the format 'hostname:port'.
220+
credentials (google.auth.credentials.Credentials): The credentials. If
221+
not specified, then this function will attempt to ascertain the
222+
credentials from the environment using :func:`google.auth.default`.
223+
scopes (Sequence[str]): A optional list of scopes needed for this
224+
service. These are only used when credentials are not specified and
225+
are passed to :func:`google.auth.default`.
226+
ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
227+
credentials. This can be used to specify different certificates.
228+
kwargs: Additional key-word args passed to
229+
:func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
230+
231+
Returns:
232+
grpc.Channel: The created channel.
233+
"""
234+
composite_credentials = _create_composite_credentials(
235+
credentials, scopes, ssl_credentials
236+
)
237+
219238
if HAS_GRPC_GCP:
220239
# If grpc_gcp module is available use grpc_gcp.secure_channel,
221240
# otherwise, use grpc.secure_channel to create grpc channel.

google/api_core/grpc_helpers_async.py

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""AsyncIO helpers for :mod:`grpc` supporting 3.6+.
16+
17+
Please combine more detailed docstring in grpc_helpers.py to use following
18+
functions. This module is implementing the same surface with AsyncIO semantics.
19+
"""
20+
21+
import asyncio
22+
import functools
23+
24+
import grpc
25+
from grpc.experimental import aio
26+
27+
from google.api_core import exceptions, grpc_helpers
28+
29+
30+
# TODO(lidiz) Support gRPC GCP wrapper
31+
HAS_GRPC_GCP = False
32+
33+
# NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform
34+
# automatic patching for us. But that means the overhead of creating an
35+
# extra Python function spreads to every single send and receive.
36+
37+
38+
class _WrappedCall(aio.Call):
39+
40+
def __init__(self):
41+
self._call = None
42+
43+
def with_call(self, call):
44+
"""Supplies the call object separately to keep __init__ clean."""
45+
self._call = call
46+
return self
47+
48+
async def initial_metadata(self):
49+
return await self._call.initial_metadata()
50+
51+
async def trailing_metadata(self):
52+
return await self._call.trailing_metadata()
53+
54+
async def code(self):
55+
return await self._call.code()
56+
57+
async def details(self):
58+
return await self._call.details()
59+
60+
def cancelled(self):
61+
return self._call.cancelled()
62+
63+
def done(self):
64+
return self._call.done()
65+
66+
def time_remaining(self):
67+
return self._call.time_remaining()
68+
69+
def cancel(self):
70+
return self._call.cancel()
71+
72+
def add_done_callback(self, callback):
73+
self._call.add_done_callback(callback)
74+
75+
async def wait_for_connection(self):
76+
try:
77+
await self._call.wait_for_connection()
78+
except grpc.RpcError as rpc_error:
79+
raise exceptions.from_grpc_error(rpc_error) from rpc_error
80+
81+
82+
class _WrappedUnaryResponseMixin(_WrappedCall):
83+
84+
def __await__(self):
85+
try:
86+
response = yield from self._call.__await__()
87+
return response
88+
except grpc.RpcError as rpc_error:
89+
raise exceptions.from_grpc_error(rpc_error) from rpc_error
90+
91+
92+
class _WrappedStreamResponseMixin(_WrappedCall):
93+
94+
def __init__(self):
95+
self._wrapped_async_generator = None
96+
97+
async def read(self):
98+
try:
99+
return await self._call.read()
100+
except grpc.RpcError as rpc_error:
101+
raise exceptions.from_grpc_error(rpc_error) from rpc_error
102+
103+
async def _wrapped_aiter(self):
104+
try:
105+
# NOTE(lidiz) coverage doesn't understand the exception raised from
106+
# __anext__ method. It is covered by test case:
107+
# test_wrap_stream_errors_aiter_non_rpc_error
108+
async for response in self._call: # pragma: no branch
109+
yield response
110+
except grpc.RpcError as rpc_error:
111+
raise exceptions.from_grpc_error(rpc_error) from rpc_error
112+
113+
def __aiter__(self):
114+
if not self._wrapped_async_generator:
115+
self._wrapped_async_generator = self._wrapped_aiter()
116+
return self._wrapped_async_generator
117+
118+
119+
class _WrappedStreamRequestMixin(_WrappedCall):
120+
121+
async def write(self, request):
122+
try:
123+
await self._call.write(request)
124+
except grpc.RpcError as rpc_error:
125+
raise exceptions.from_grpc_error(rpc_error) from rpc_error
126+
127+
async def done_writing(self):
128+
try:
129+
await self._call.done_writing()
130+
except grpc.RpcError as rpc_error:
131+
raise exceptions.from_grpc_error(rpc_error) from rpc_error
132+
133+
134+
# NOTE(lidiz) Implementing each individual class separately, so we don't
135+
# expose any API that should not be seen. E.g., __aiter__ in unary-unary
136+
# RPC, or __await__ in stream-stream RPC.
137+
class _WrappedUnaryUnaryCall(_WrappedUnaryResponseMixin, aio.UnaryUnaryCall):
138+
"""Wrapped UnaryUnaryCall to map exceptions."""
139+
140+
141+
class _WrappedUnaryStreamCall(_WrappedStreamResponseMixin, aio.UnaryStreamCall):
142+
"""Wrapped UnaryStreamCall to map exceptions."""
143+
144+
145+
class _WrappedStreamUnaryCall(_WrappedUnaryResponseMixin, _WrappedStreamRequestMixin, aio.StreamUnaryCall):
146+
"""Wrapped StreamUnaryCall to map exceptions."""
147+
148+
149+
class _WrappedStreamStreamCall(_WrappedStreamRequestMixin, _WrappedStreamResponseMixin, aio.StreamStreamCall):
150+
"""Wrapped StreamStreamCall to map exceptions."""
151+
152+
153+
def _wrap_unary_errors(callable_):
154+
"""Map errors for Unary-Unary async callables."""
155+
grpc_helpers._patch_callable_name(callable_)
156+
157+
@functools.wraps(callable_)
158+
def error_remapped_callable(*args, **kwargs):
159+
call = callable_(*args, **kwargs)
160+
return _WrappedUnaryUnaryCall().with_call(call)
161+
162+
return error_remapped_callable
163+
164+
165+
def _wrap_stream_errors(callable_):
166+
"""Map errors for streaming RPC async callables."""
167+
grpc_helpers._patch_callable_name(callable_)
168+
169+
@functools.wraps(callable_)
170+
async def error_remapped_callable(*args, **kwargs):
171+
call = callable_(*args, **kwargs)
172+
173+
if isinstance(call, aio.UnaryStreamCall):
174+
call = _WrappedUnaryStreamCall().with_call(call)
175+
elif isinstance(call, aio.StreamUnaryCall):
176+
call = _WrappedStreamUnaryCall().with_call(call)
177+
elif isinstance(call, aio.StreamStreamCall):
178+
call = _WrappedStreamStreamCall().with_call(call)
179+
else:
180+
raise TypeError('Unexpected type of call %s' % type(call))
181+
182+
await call.wait_for_connection()
183+
return call
184+
185+
return error_remapped_callable
186+
187+
188+
def wrap_errors(callable_):
189+
"""Wrap a gRPC async callable and map :class:`grpc.RpcErrors` to
190+
friendly error classes.
191+
192+
Errors raised by the gRPC callable are mapped to the appropriate
193+
:class:`google.api_core.exceptions.GoogleAPICallError` subclasses. The
194+
original `grpc.RpcError` (which is usually also a `grpc.Call`) is
195+
available from the ``response`` property on the mapped exception. This
196+
is useful for extracting metadata from the original error.
197+
198+
Args:
199+
callable_ (Callable): A gRPC callable.
200+
201+
Returns: Callable: The wrapped gRPC callable.
202+
"""
203+
if isinstance(callable_, aio.UnaryUnaryMultiCallable):
204+
return _wrap_unary_errors(callable_)
205+
else:
206+
return _wrap_stream_errors(callable_)
207+
208+
209+
def create_channel(target, credentials=None, scopes=None, ssl_credentials=None, **kwargs):
210+
"""Create an AsyncIO secure channel with credentials.
211+
212+
Args:
213+
target (str): The target service address in the format 'hostname:port'.
214+
credentials (google.auth.credentials.Credentials): The credentials. If
215+
not specified, then this function will attempt to ascertain the
216+
credentials from the environment using :func:`google.auth.default`.
217+
scopes (Sequence[str]): A optional list of scopes needed for this
218+
service. These are only used when credentials are not specified and
219+
are passed to :func:`google.auth.default`.
220+
ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
221+
credentials. This can be used to specify different certificates.
222+
kwargs: Additional key-word args passed to :func:`aio.secure_channel`.
223+
224+
Returns:
225+
aio.Channel: The created channel.
226+
"""
227+
composite_credentials = grpc_helpers._create_composite_credentials(
228+
credentials, scopes, ssl_credentials
229+
)
230+
231+
return aio.secure_channel(target, composite_credentials, **kwargs)
232+
233+
234+
class FakeUnaryUnaryCall(_WrappedUnaryUnaryCall):
235+
"""Fake implementation for unary-unary RPCs.
236+
237+
It is a dummy object for response message. Supply the intended response
238+
upon the initialization, and the coroutine will return the exact response
239+
message.
240+
"""
241+
242+
def __init__(self, response=object()):
243+
self.response = response
244+
self._future = asyncio.get_event_loop().create_future()
245+
self._future.set_result(self.response)
246+
247+
def __await__(self):
248+
response = yield from self._future.__await__()
249+
return response
250+
251+
252+
class FakeStreamUnaryCall(_WrappedStreamUnaryCall):
253+
"""Fake implementation for stream-unary RPCs.
254+
255+
It is a dummy object for response message. Supply the intended response
256+
upon the initialization, and the coroutine will return the exact response
257+
message.
258+
"""
259+
260+
def __init__(self, response=object()):
261+
self.response = response
262+
self._future = asyncio.get_event_loop().create_future()
263+
self._future.set_result(self.response)
264+
265+
def __await__(self):
266+
response = yield from self._future.__await__()
267+
return response
268+
269+
async def wait_for_connection(self):
270+
pass

0 commit comments

Comments
 (0)