Skip to content

Commit 021bb7d

Browse files
authored
feat: initial support for Extended Operations (#344)
Certain APIs with Long-Running Operations deviate from the semantics in https://google.aip.dev/151 and instead define custom operation messages, aka Extended Operations. This change adds a PollingFuture subclass designed to be used with Extended Operations. It is analogous and broadly similar to google.api_core.operation.Operation and subclasses google.api_core.future.polling.PollingFuture. The full description of Extended Operation semantics is beyond the scope of this change.
1 parent 6f61491 commit 021bb7d

File tree

3 files changed

+417
-17
lines changed

3 files changed

+417
-17
lines changed

google/api_core/extended_operation.py

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Futures for extended long-running operations returned from Google Cloud APIs.
16+
17+
These futures can be used to synchronously wait for the result of a
18+
long-running operations using :meth:`ExtendedOperation.result`:
19+
20+
.. code-block:: python
21+
22+
extended_operation = my_api_client.long_running_method()
23+
24+
extended_operation.result()
25+
26+
Or asynchronously using callbacks and :meth:`Operation.add_done_callback`:
27+
28+
.. code-block:: python
29+
30+
extended_operation = my_api_client.long_running_method()
31+
32+
def my_callback(ex_op):
33+
print(f"Operation {ex_op.name} completed")
34+
35+
extended_operation.add_done_callback(my_callback)
36+
37+
"""
38+
39+
import threading
40+
41+
from google.api_core import exceptions
42+
from google.api_core.future import polling
43+
44+
45+
class ExtendedOperation(polling.PollingFuture):
46+
"""An ExtendedOperation future for interacting with a Google API Long-Running Operation.
47+
48+
Args:
49+
extended_operation (proto.Message): The initial operation.
50+
refresh (Callable[[], type(extended_operation)]): A callable that returns
51+
the latest state of the operation.
52+
cancel (Callable[[], None]): A callable that tries to cancel the operation.
53+
retry: Optional(google.api_core.retry.Retry): The retry configuration used
54+
when polling. This can be used to control how often :meth:`done`
55+
is polled. Regardless of the retry's ``deadline``, it will be
56+
overridden by the ``timeout`` argument to :meth:`result`.
57+
58+
Note: Most long-running API methods use google.api_core.operation.Operation
59+
This class is a wrapper for a subset of methods that use alternative
60+
Long-Running Operation (LRO) semantics.
61+
62+
Note: there is not a concrete type the extended operation must be.
63+
It MUST have fields that correspond to the following, POSSIBLY WITH DIFFERENT NAMES:
64+
* name: str
65+
* status: Union[str, bool, enum.Enum]
66+
* error_code: int
67+
* error_message: str
68+
"""
69+
70+
def __init__(
71+
self, extended_operation, refresh, cancel, retry=polling.DEFAULT_RETRY
72+
):
73+
super().__init__(retry=retry)
74+
self._extended_operation = extended_operation
75+
self._refresh = refresh
76+
self._cancel = cancel
77+
# Note: the extended operation does not give a good way to indicate cancellation.
78+
# We make do with manually tracking cancellation and checking for doneness.
79+
self._cancelled = False
80+
self._completion_lock = threading.Lock()
81+
# Invoke in case the operation came back already complete.
82+
self._handle_refreshed_operation()
83+
84+
# Note: the following four properties MUST be overridden in a subclass
85+
# if, and only if, the fields in the corresponding extended operation message
86+
# have different names.
87+
#
88+
# E.g. we have an extended operation class that looks like
89+
#
90+
# class MyOperation(proto.Message):
91+
# moniker = proto.Field(proto.STRING, number=1)
92+
# status_msg = proto.Field(proto.STRING, number=2)
93+
# optional http_error_code = proto.Field(proto.INT32, number=3)
94+
# optional http_error_msg = proto.Field(proto.STRING, number=4)
95+
#
96+
# the ExtendedOperation subclass would provide property overrrides that map
97+
# to these (poorly named) fields.
98+
@property
99+
def name(self):
100+
return self._extended_operation.name
101+
102+
@property
103+
def status(self):
104+
return self._extended_operation.status
105+
106+
@property
107+
def error_code(self):
108+
return self._extended_operation.error_code
109+
110+
@property
111+
def error_message(self):
112+
return self._extended_operation.error_message
113+
114+
def done(self, retry=polling.DEFAULT_RETRY):
115+
self._refresh_and_update(retry)
116+
return self._extended_operation.done
117+
118+
def cancel(self):
119+
if self.done():
120+
return False
121+
122+
self._cancel()
123+
self._cancelled = True
124+
return True
125+
126+
def cancelled(self):
127+
# TODO(dovs): there is not currently a good way to determine whether the
128+
# operation has been cancelled.
129+
# The best we can do is manually keep track of cancellation
130+
# and check for doneness.
131+
if not self._cancelled:
132+
return False
133+
134+
self._refresh_and_update()
135+
return self._extended_operation.done
136+
137+
def _refresh_and_update(self, retry=polling.DEFAULT_RETRY):
138+
if not self._extended_operation.done:
139+
self._extended_operation = self._refresh(retry=retry)
140+
self._handle_refreshed_operation()
141+
142+
def _handle_refreshed_operation(self):
143+
with self._completion_lock:
144+
if not self._extended_operation.done:
145+
return
146+
147+
if self.error_code and self.error_message:
148+
exception = exceptions.from_http_status(
149+
status_code=self.error_code,
150+
message=self.error_message,
151+
response=self._extended_operation,
152+
)
153+
self.set_exception(exception)
154+
elif self.error_code or self.error_message:
155+
exception = exceptions.GoogleAPICallError(
156+
f"Unexpected error {self.error_code}: {self.error_message}"
157+
)
158+
self.set_exception(exception)
159+
else:
160+
# Extended operations have no payload.
161+
self.set_result(None)
162+
163+
@classmethod
164+
def make(cls, refresh, cancel, extended_operation, **kwargs):
165+
"""
166+
Return an instantiated ExtendedOperation (or child) that wraps
167+
* a refresh callable
168+
* a cancel callable (can be a no-op)
169+
* an initial result
170+
171+
.. note::
172+
It is the caller's responsibility to set up refresh and cancel
173+
with their correct request argument.
174+
The reason for this is that the services that use Extended Operations
175+
have rpcs that look something like the following:
176+
177+
// service.proto
178+
service MyLongService {
179+
rpc StartLongTask(StartLongTaskRequest) returns (ExtendedOperation) {
180+
option (google.cloud.operation_service) = "CustomOperationService";
181+
}
182+
}
183+
184+
service CustomOperationService {
185+
rpc Get(GetOperationRequest) returns (ExtendedOperation) {
186+
option (google.cloud.operation_polling_method) = true;
187+
}
188+
}
189+
190+
Any info needed for the poll, e.g. a name, path params, etc.
191+
is held in the request, which the initial client method is in a much
192+
better position to make made because the caller made the initial request.
193+
194+
TL;DR: the caller sets up closures for refresh and cancel that carry
195+
the properly configured requests.
196+
197+
Args:
198+
refresh (Callable[Optional[Retry]][type(extended_operation)]): A callable that
199+
returns the latest state of the operation.
200+
cancel (Callable[][Any]): A callable that tries to cancel the operation
201+
on a best effort basis.
202+
extended_operation (Any): The initial response of the long running method.
203+
See the docstring for ExtendedOperation.__init__ for requirements on
204+
the type and fields of extended_operation
205+
"""
206+
return cls(extended_operation, refresh, cancel, **kwargs)

noxfile.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def default(session, install_grpc=True):
9292
)
9393

9494
# Install all test dependencies, then install this package in-place.
95-
session.install("mock", "pytest", "pytest-cov")
95+
session.install("dataclasses", "mock", "pytest", "pytest-cov", "pytest-xdist")
9696
if install_grpc:
9797
session.install("-e", ".[grpc]", "-c", constraints_path)
9898
else:
@@ -102,28 +102,36 @@ def default(session, install_grpc=True):
102102
"python",
103103
"-m",
104104
"py.test",
105-
"--quiet",
106-
"--cov=google.api_core",
107-
"--cov=tests.unit",
108-
"--cov-append",
109-
"--cov-config=.coveragerc",
110-
"--cov-report=",
111-
"--cov-fail-under=0",
112-
os.path.join("tests", "unit"),
105+
*(
106+
# Helpful for running a single test or testfile.
107+
session.posargs
108+
or [
109+
"--quiet",
110+
"--cov=google.api_core",
111+
"--cov=tests.unit",
112+
"--cov-append",
113+
"--cov-config=.coveragerc",
114+
"--cov-report=",
115+
"--cov-fail-under=0",
116+
# Running individual tests with parallelism enabled is usually not helpful.
117+
"-n=auto",
118+
os.path.join("tests", "unit"),
119+
]
120+
),
113121
]
114-
pytest_args.extend(session.posargs)
115122

116123
# Inject AsyncIO content and proto-plus, if version >= 3.6.
117124
# proto-plus is needed for a field mask test in test_protobuf_helpers.py
118125
if _greater_or_equal_than_36(session.python):
119126
session.install("asyncmock", "pytest-asyncio", "proto-plus")
120127

121-
pytest_args.append("--cov=tests.asyncio")
122-
pytest_args.append(os.path.join("tests", "asyncio"))
123-
session.run(*pytest_args)
124-
else:
125-
# Run py.test against the unit tests.
126-
session.run(*pytest_args)
128+
# Having positional arguments means the user wants to run specific tests.
129+
# Best not to add additional tests to that list.
130+
if not session.posargs:
131+
pytest_args.append("--cov=tests.asyncio")
132+
pytest_args.append(os.path.join("tests", "asyncio"))
133+
134+
session.run(*pytest_args)
127135

128136

129137
@nox.session(python=["3.6", "3.7", "3.8", "3.9", "3.10"])
@@ -171,7 +179,11 @@ def mypy(session):
171179
"""Run type-checking."""
172180
session.install(".[grpc, grpcgcp]", "mypy")
173181
session.install(
174-
"types-setuptools", "types-requests", "types-protobuf", "types-mock"
182+
"types-setuptools",
183+
"types-requests",
184+
"types-protobuf",
185+
"types-mock",
186+
"types-dataclasses",
175187
)
176188
session.run("mypy", "google", "tests")
177189

0 commit comments

Comments
 (0)