Skip to content

Commit d367493

Browse files
committed
Draft: Experimenting with retry strategy unification for standalone client and cluster client.
1 parent 4e59d24 commit d367493

File tree

8 files changed

+258
-163
lines changed

8 files changed

+258
-163
lines changed

redis/asyncio/cluster.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,10 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
759759
)
760760
return dict(zip(keys, values))
761761
except Exception as e:
762-
if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
762+
if (
763+
retry_attempts > 0
764+
and type(e) in self.__class__.CONNECTION_ERRORS_FOR_RETRY
765+
):
763766
# The nodes and slots cache were should be reinitialized.
764767
# Try again with the new cluster setup.
765768
retry_attempts -= 1
@@ -1566,7 +1569,7 @@ async def execute(
15661569
allow_redirections=allow_redirections,
15671570
)
15681571

1569-
except self.__class__.ERRORS_ALLOW_RETRY as e:
1572+
except self.__class__.CONNECTION_ERRORS_FOR_RETRY as e:
15701573
if retry_attempts > 0:
15711574
# Try again with the new cluster setup. All other errors
15721575
# should be raised.
@@ -1657,7 +1660,10 @@ async def _execute(
16571660
for cmd in default_node[1]:
16581661
# Check if it has a command that failed with a relevant
16591662
# exception
1660-
if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY:
1663+
if (
1664+
type(cmd.result)
1665+
in self.__class__.CONNECTION_ERRORS_FOR_RETRY
1666+
):
16611667
client.replace_default_node()
16621668
break
16631669

redis/client.py

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
_RedisCallbacksRESP3,
2424
bool_ok,
2525
)
26+
from redis.backoff import ExponentialWithJitterBackoff
2627
from redis.cache import CacheConfig, CacheInterface
2728
from redis.commands import (
2829
CoreCommands,
@@ -58,6 +59,7 @@
5859
from redis.utils import (
5960
HIREDIS_AVAILABLE,
6061
_set_info_logger,
62+
deprecated_args,
6163
get_lib_version,
6264
safe_str,
6365
str_if_bytes,
@@ -189,6 +191,11 @@ def from_pool(
189191
client.auto_close_connection_pool = True
190192
return client
191193

194+
@deprecated_args(
195+
args_to_warn=["retry_on_timeout"],
196+
reason="TimeoutError is included by default.",
197+
version="6.1.0",
198+
)
192199
def __init__(
193200
self,
194201
host: str = "localhost",
@@ -203,8 +210,6 @@ def __init__(
203210
unix_socket_path: Optional[str] = None,
204211
encoding: str = "utf-8",
205212
encoding_errors: str = "strict",
206-
charset: Optional[str] = None,
207-
errors: Optional[str] = None,
208213
decode_responses: bool = False,
209214
retry_on_timeout: bool = False,
210215
retry_on_error: Optional[List[Type[Exception]]] = None,
@@ -230,7 +235,9 @@ def __init__(
230235
lib_name: Optional[str] = "redis-py",
231236
lib_version: Optional[str] = get_lib_version(),
232237
username: Optional[str] = None,
233-
retry: Optional[Retry] = None,
238+
retry: Optional[Retry] = Retry(
239+
backoff=ExponentialWithJitterBackoff(base=0.1, cap=5), retries=3
240+
),
234241
redis_connect_func: Optional[Callable[[], None]] = None,
235242
credential_provider: Optional[CredentialProvider] = None,
236243
protocol: Optional[int] = 2,
@@ -240,10 +247,24 @@ def __init__(
240247
) -> None:
241248
"""
242249
Initialize a new Redis client.
243-
To specify a retry policy for specific errors, first set
244-
`retry_on_error` to a list of the error/s to retry on, then set
245-
`retry` to a valid `Retry` object.
246-
To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.
250+
251+
To specify a retry policy for specific errors, you have two options:
252+
253+
1. Set the `retry_on_error` to a list of the error/s to retry on, and
254+
you can also set `retry` to a valid `Retry` object(in case the default
255+
one is not appropriate) - with this approach the retries will be triggered
256+
on the default errors specified in the Retry object enriched with the
257+
errors specified in `retry_on_error`.
258+
259+
2. Define a `Retry` object with configured 'supported_errors' and set
260+
it to the `retry` parameter - with this approach you completely redefine
261+
the errors on which retries will happen.
262+
263+
`retry_on_timeout` is deprecated - please include the TimeoutError
264+
either in the Retry object or in the `retry_on_error` list.
265+
266+
When 'connection_pool' is provided - the retry configuration of the
267+
provided pool will be used.
247268
248269
Args:
249270
@@ -256,24 +277,8 @@ def __init__(
256277
else:
257278
self._event_dispatcher = event_dispatcher
258279
if not connection_pool:
259-
if charset is not None:
260-
warnings.warn(
261-
DeprecationWarning(
262-
'"charset" is deprecated. Use "encoding" instead'
263-
)
264-
)
265-
encoding = charset
266-
if errors is not None:
267-
warnings.warn(
268-
DeprecationWarning(
269-
'"errors" is deprecated. Use "encoding_errors" instead'
270-
)
271-
)
272-
encoding_errors = errors
273280
if not retry_on_error:
274281
retry_on_error = []
275-
if retry_on_timeout is True:
276-
retry_on_error.append(TimeoutError)
277282
kwargs = {
278283
"db": db,
279284
"username": username,
@@ -395,10 +400,10 @@ def get_connection_kwargs(self) -> Dict:
395400
"""Get the connection's key-word arguments"""
396401
return self.connection_pool.connection_kwargs
397402

398-
def get_retry(self) -> Optional["Retry"]:
403+
def get_retry(self) -> Retry:
399404
return self.get_connection_kwargs().get("retry")
400405

401-
def set_retry(self, retry: "Retry") -> None:
406+
def set_retry(self, retry: Retry) -> None:
402407
self.get_connection_kwargs().update({"retry": retry})
403408
self.connection_pool.set_retry(retry)
404409

@@ -598,18 +603,20 @@ def _send_command_parse_response(self, conn, command_name, *args, **options):
598603
conn.send_command(*args, **options)
599604
return self.parse_response(conn, command_name, **options)
600605

601-
def _disconnect_raise(self, conn, error):
606+
def _conditional_disconnect(self, conn, error) -> None:
602607
"""
603-
Close the connection and raise an exception
604-
if retry_on_error is not set or the error
605-
is not one of the specified error types
608+
Close the connection if the error is not TimeoutError.
609+
The supported exceptions are already checked in the
610+
retry object so we don't need to do it here.
611+
After we disconnect the connection, it will try to reconnect and
612+
do a health check as part of the send_command logic(on connection level).
606613
"""
614+
if isinstance(error, TimeoutError):
615+
# If the error is a TimeoutError, we don't want to
616+
# disconnect the connection. We want to retry the command.
617+
return
618+
607619
conn.disconnect()
608-
if (
609-
conn.retry_on_error is None
610-
or isinstance(error, tuple(conn.retry_on_error)) is False
611-
):
612-
raise error
613620

614621
# COMMAND EXECUTION AND PROTOCOL PARSING
615622
def execute_command(self, *args, **options):
@@ -619,17 +626,20 @@ def _execute_command(self, *args, **options):
619626
"""Execute a command and return a parsed response"""
620627
pool = self.connection_pool
621628
command_name = args[0]
629+
622630
conn = self.connection or pool.get_connection()
623631

624632
if self._single_connection_client:
625633
self.single_connection_lock.acquire()
634+
626635
try:
627636
return conn.retry.call_with_retry(
628637
lambda: self._send_command_parse_response(
629638
conn, command_name, *args, **options
630639
),
631-
lambda error: self._disconnect_raise(conn, error),
640+
lambda error: self._conditional_disconnect(conn, error),
632641
)
642+
633643
finally:
634644
if self._single_connection_client:
635645
self.single_connection_lock.release()

0 commit comments

Comments
 (0)