Skip to content

Commit 70697a3

Browse files
daniel-sanchegcf-owl-bot[bot]parthea
authored
feat: support dynamic retry backoff values (#793)
* feat: retry generates backoff value after completing on_error callbacks * added comment * use single sleep iterator for retries * fix tests * update variable name * adjusted docstring * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fixed mypy issues * added comment * added unit tests for dynamic backoff --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Anthonios Partheniou <[email protected]>
1 parent a5648fa commit 70697a3

9 files changed

+159
-35
lines changed

google/api_core/retry/retry_base.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import time
2626

2727
from enum import Enum
28-
from typing import Any, Callable, Optional, TYPE_CHECKING
28+
from typing import Any, Callable, Optional, Iterator, TYPE_CHECKING
2929

3030
import requests.exceptions
3131

@@ -174,7 +174,7 @@ def build_retry_error(
174174
def _retry_error_helper(
175175
exc: Exception,
176176
deadline: float | None,
177-
next_sleep: float,
177+
sleep_iterator: Iterator[float],
178178
error_list: list[Exception],
179179
predicate_fn: Callable[[Exception], bool],
180180
on_error_fn: Callable[[Exception], None] | None,
@@ -183,7 +183,7 @@ def _retry_error_helper(
183183
tuple[Exception, Exception | None],
184184
],
185185
original_timeout: float | None,
186-
):
186+
) -> float:
187187
"""
188188
Shared logic for handling an error for all retry implementations
189189
@@ -194,13 +194,15 @@ def _retry_error_helper(
194194
Args:
195195
- exc: the exception that was raised
196196
- deadline: the deadline for the retry, calculated as a diff from time.monotonic()
197-
- next_sleep: the next sleep interval
197+
- sleep_iterator: iterator to draw the next backoff value from
198198
- error_list: the list of exceptions that have been raised so far
199199
- predicate_fn: takes `exc` and returns true if the operation should be retried
200200
- on_error_fn: callback to execute when a retryable error occurs
201201
- exc_factory_fn: callback used to build the exception to be raised on terminal failure
202202
- original_timeout_val: the original timeout value for the retry (in seconds),
203203
to be passed to the exception factory for building an error message
204+
Returns:
205+
- the sleep value chosen before the next attempt
204206
"""
205207
error_list.append(exc)
206208
if not predicate_fn(exc):
@@ -212,6 +214,12 @@ def _retry_error_helper(
212214
raise final_exc from source_exc
213215
if on_error_fn is not None:
214216
on_error_fn(exc)
217+
# next_sleep is fetched after the on_error callback, to allow clients
218+
# to update sleep_iterator values dynamically in response to errors
219+
try:
220+
next_sleep = next(sleep_iterator)
221+
except StopIteration:
222+
raise ValueError("Sleep generator stopped yielding sleep values.") from exc
215223
if deadline is not None and time.monotonic() + next_sleep > deadline:
216224
final_exc, source_exc = exc_factory_fn(
217225
error_list,
@@ -222,6 +230,7 @@ def _retry_error_helper(
222230
_LOGGER.debug(
223231
"Retrying due to {}, sleeping {:.1f}s ...".format(error_list[-1], next_sleep)
224232
)
233+
return next_sleep
225234

226235

227236
class _BaseRetry(object):

google/api_core/retry/retry_streaming.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,11 @@ def retry_target_stream(
107107
time.monotonic() + timeout if timeout is not None else None
108108
)
109109
error_list: list[Exception] = []
110+
sleep_iter = iter(sleep_generator)
110111

111-
for sleep in sleep_generator:
112+
# continue trying until an attempt completes, or a terminal exception is raised in _retry_error_helper
113+
# TODO: support max_attempts argument: https://github.com/googleapis/python-api-core/issues/535
114+
while True:
112115
# Start a new retry loop
113116
try:
114117
# Note: in the future, we can add a ResumptionStrategy object
@@ -121,20 +124,18 @@ def retry_target_stream(
121124
# This function explicitly must deal with broad exceptions.
122125
except Exception as exc:
123126
# defer to shared logic for handling errors
124-
_retry_error_helper(
127+
next_sleep = _retry_error_helper(
125128
exc,
126129
deadline,
127-
sleep,
130+
sleep_iter,
128131
error_list,
129132
predicate,
130133
on_error,
131134
exception_factory,
132135
timeout,
133136
)
134137
# if exception not raised, sleep before next attempt
135-
time.sleep(sleep)
136-
137-
raise ValueError("Sleep generator stopped yielding sleep values.")
138+
time.sleep(next_sleep)
138139

139140

140141
class StreamingRetry(_BaseRetry):

google/api_core/retry/retry_streaming_async.py

+8-5
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,12 @@ async def retry_target_stream(
109109
deadline = time.monotonic() + timeout if timeout else None
110110
# keep track of retryable exceptions we encounter to pass in to exception_factory
111111
error_list: list[Exception] = []
112+
sleep_iter = iter(sleep_generator)
112113
target_is_generator: bool | None = None
113114

114-
for sleep in sleep_generator:
115+
# continue trying until an attempt completes, or a terminal exception is raised in _retry_error_helper
116+
# TODO: support max_attempts argument: https://github.com/googleapis/python-api-core/issues/535
117+
while True:
115118
# Start a new retry loop
116119
try:
117120
# Note: in the future, we can add a ResumptionStrategy object
@@ -174,22 +177,22 @@ async def retry_target_stream(
174177
# This function explicitly must deal with broad exceptions.
175178
except Exception as exc:
176179
# defer to shared logic for handling errors
177-
_retry_error_helper(
180+
next_sleep = _retry_error_helper(
178181
exc,
179182
deadline,
180-
sleep,
183+
sleep_iter,
181184
error_list,
182185
predicate,
183186
on_error,
184187
exception_factory,
185188
timeout,
186189
)
187190
# if exception not raised, sleep before next attempt
188-
await asyncio.sleep(sleep)
191+
await asyncio.sleep(next_sleep)
192+
189193
finally:
190194
if target_is_generator and target_iterator is not None:
191195
await cast(AsyncGenerator["_Y", None], target_iterator).aclose()
192-
raise ValueError("Sleep generator stopped yielding sleep values.")
193196

194197

195198
class AsyncStreamingRetry(_BaseRetry):

google/api_core/retry/retry_unary.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,11 @@ def retry_target(
138138

139139
deadline = time.monotonic() + timeout if timeout is not None else None
140140
error_list: list[Exception] = []
141+
sleep_iter = iter(sleep_generator)
141142

142-
for sleep in sleep_generator:
143+
# continue trying until an attempt completes, or a terminal exception is raised in _retry_error_helper
144+
# TODO: support max_attempts argument: https://github.com/googleapis/python-api-core/issues/535
145+
while True:
143146
try:
144147
result = target()
145148
if inspect.isawaitable(result):
@@ -150,20 +153,18 @@ def retry_target(
150153
# This function explicitly must deal with broad exceptions.
151154
except Exception as exc:
152155
# defer to shared logic for handling errors
153-
_retry_error_helper(
156+
next_sleep = _retry_error_helper(
154157
exc,
155158
deadline,
156-
sleep,
159+
sleep_iter,
157160
error_list,
158161
predicate,
159162
on_error,
160163
exception_factory,
161164
timeout,
162165
)
163166
# if exception not raised, sleep before next attempt
164-
time.sleep(sleep)
165-
166-
raise ValueError("Sleep generator stopped yielding sleep values.")
167+
time.sleep(next_sleep)
167168

168169

169170
class Retry(_BaseRetry):

google/api_core/retry/retry_unary_async.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -149,28 +149,29 @@ async def retry_target(
149149

150150
deadline = time.monotonic() + timeout if timeout is not None else None
151151
error_list: list[Exception] = []
152+
sleep_iter = iter(sleep_generator)
152153

153-
for sleep in sleep_generator:
154+
# continue trying until an attempt completes, or a terminal exception is raised in _retry_error_helper
155+
# TODO: support max_attempts argument: https://github.com/googleapis/python-api-core/issues/535
156+
while True:
154157
try:
155158
return await target()
156159
# pylint: disable=broad-except
157160
# This function explicitly must deal with broad exceptions.
158161
except Exception as exc:
159162
# defer to shared logic for handling errors
160-
_retry_error_helper(
163+
next_sleep = _retry_error_helper(
161164
exc,
162165
deadline,
163-
sleep,
166+
sleep_iter,
164167
error_list,
165168
predicate,
166169
on_error,
167170
exception_factory,
168171
timeout,
169172
)
170173
# if exception not raised, sleep before next attempt
171-
await asyncio.sleep(sleep)
172-
173-
raise ValueError("Sleep generator stopped yielding sleep values.")
174+
await asyncio.sleep(next_sleep)
174175

175176

176177
class AsyncRetry(_BaseRetry):

tests/asyncio/retry/test_retry_streaming_async.py

+32-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,36 @@ async def test_retry_streaming_target_bad_sleep_generator():
3636
from google.api_core.retry.retry_streaming_async import retry_target_stream
3737

3838
with pytest.raises(ValueError, match="Sleep generator"):
39-
await retry_target_stream(None, None, [], None).__anext__()
39+
await retry_target_stream(None, lambda x: True, [], None).__anext__()
40+
41+
42+
@mock.patch("asyncio.sleep", autospec=True)
43+
@pytest.mark.asyncio
44+
async def test_retry_streaming_target_dynamic_backoff(sleep):
45+
"""
46+
sleep_generator should be iterated after on_error, to support dynamic backoff
47+
"""
48+
from functools import partial
49+
from google.api_core.retry.retry_streaming_async import retry_target_stream
50+
51+
sleep.side_effect = RuntimeError("stop after sleep")
52+
# start with empty sleep generator; values are added after exception in push_sleep_value
53+
sleep_values = []
54+
error_target = partial(TestAsyncStreamingRetry._generator_mock, error_on=0)
55+
inserted_sleep = 99
56+
57+
def push_sleep_value(err):
58+
sleep_values.append(inserted_sleep)
59+
60+
with pytest.raises(RuntimeError):
61+
await retry_target_stream(
62+
error_target,
63+
predicate=lambda x: True,
64+
sleep_generator=sleep_values,
65+
on_error=push_sleep_value,
66+
).__anext__()
67+
assert sleep.call_count == 1
68+
sleep.assert_called_once_with(inserted_sleep)
4069

4170

4271
class TestAsyncStreamingRetry(Test_BaseRetry):
@@ -66,8 +95,8 @@ def if_exception_type(exc):
6695
str(retry_),
6796
)
6897

98+
@staticmethod
6999
async def _generator_mock(
70-
self,
71100
num=5,
72101
error_on=None,
73102
exceptions_seen=None,
@@ -87,7 +116,7 @@ async def _generator_mock(
87116
for i in range(num):
88117
if sleep_time:
89118
await asyncio.sleep(sleep_time)
90-
if error_on and i == error_on:
119+
if error_on is not None and i == error_on:
91120
raise ValueError("generator mock error")
92121
yield i
93122
except (Exception, BaseException, GeneratorExit) as e:

tests/asyncio/retry/test_retry_unary_async.py

+26-1
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,34 @@ async def test_retry_target_timeout_exceeded(monotonic, sleep, use_deadline_arg)
136136
@pytest.mark.asyncio
137137
async def test_retry_target_bad_sleep_generator():
138138
with pytest.raises(ValueError, match="Sleep generator"):
139+
await retry_async.retry_target(mock.sentinel.target, lambda x: True, [], None)
140+
141+
142+
@mock.patch("asyncio.sleep", autospec=True)
143+
@pytest.mark.asyncio
144+
async def test_retry_target_dynamic_backoff(sleep):
145+
"""
146+
sleep_generator should be iterated after on_error, to support dynamic backoff
147+
"""
148+
sleep.side_effect = RuntimeError("stop after sleep")
149+
# start with empty sleep generator; values are added after exception in push_sleep_value
150+
sleep_values = []
151+
exception = ValueError("trigger retry")
152+
error_target = mock.Mock(side_effect=exception)
153+
inserted_sleep = 99
154+
155+
def push_sleep_value(err):
156+
sleep_values.append(inserted_sleep)
157+
158+
with pytest.raises(RuntimeError):
139159
await retry_async.retry_target(
140-
mock.sentinel.target, mock.sentinel.predicate, [], None
160+
error_target,
161+
predicate=lambda x: True,
162+
sleep_generator=sleep_values,
163+
on_error=push_sleep_value,
141164
)
165+
assert sleep.call_count == 1
166+
sleep.assert_called_once_with(inserted_sleep)
142167

143168

144169
class TestAsyncRetry(Test_BaseRetry):

tests/unit/retry/test_retry_streaming.py

+32-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,36 @@ def test_retry_streaming_target_bad_sleep_generator():
3333
with pytest.raises(
3434
ValueError, match="Sleep generator stopped yielding sleep values"
3535
):
36-
next(retry_streaming.retry_target_stream(None, None, [], None))
36+
next(retry_streaming.retry_target_stream(None, lambda x: True, [], None))
37+
38+
39+
@mock.patch("time.sleep", autospec=True)
40+
def test_retry_streaming_target_dynamic_backoff(sleep):
41+
"""
42+
sleep_generator should be iterated after on_error, to support dynamic backoff
43+
"""
44+
from functools import partial
45+
46+
sleep.side_effect = RuntimeError("stop after sleep")
47+
# start with empty sleep generator; values are added after exception in push_sleep_value
48+
sleep_values = []
49+
error_target = partial(TestStreamingRetry._generator_mock, error_on=0)
50+
inserted_sleep = 99
51+
52+
def push_sleep_value(err):
53+
sleep_values.append(inserted_sleep)
54+
55+
with pytest.raises(RuntimeError):
56+
next(
57+
retry_streaming.retry_target_stream(
58+
error_target,
59+
predicate=lambda x: True,
60+
sleep_generator=sleep_values,
61+
on_error=push_sleep_value,
62+
)
63+
)
64+
assert sleep.call_count == 1
65+
sleep.assert_called_once_with(inserted_sleep)
3766

3867

3968
class TestStreamingRetry(Test_BaseRetry):
@@ -63,8 +92,8 @@ def if_exception_type(exc):
6392
str(retry_),
6493
)
6594

95+
@staticmethod
6696
def _generator_mock(
67-
self,
6897
num=5,
6998
error_on=None,
7099
return_val=None,
@@ -82,7 +111,7 @@ def _generator_mock(
82111
"""
83112
try:
84113
for i in range(num):
85-
if error_on and i == error_on:
114+
if error_on is not None and i == error_on:
86115
raise ValueError("generator mock error")
87116
yield i
88117
return return_val

0 commit comments

Comments
 (0)