Skip to content

Adding default retry configuration changes for cluster clients #3622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
11 changes: 7 additions & 4 deletions docs/clustering.rst
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ When a ClusterPubSub instance is created without specifying a node, a
single node will be transparently chosen for the pubsub connection on
the first command execution. The node will be determined by: 1. Hashing
the channel name in the request to find its keyslot 2. Selecting a node
that handles the keyslot: If read_from_replicas is set to true, a
replica can be selected.
that handles the keyslot: If read_from_replicas is set to true or
load_balancing_strategy is provided, a replica can be selected.

Known PubSub Limitations
------------------------
Expand Down Expand Up @@ -216,9 +216,12 @@ By default, Redis Cluster always returns MOVE redirection response on
accessing a replica node. You can overcome this limitation and scale
read commands by triggering READONLY mode.

To enable READONLY mode pass read_from_replicas=True to RedisCluster
constructor. When set to true, read commands will be assigned between
To enable READONLY mode pass read_from_replicas=True or define
a load_balancing_strategy to RedisCluster constructor.
When read_from_replicas is set to true read commands will be assigned between
the primary and its replications in a Round-Robin manner.
With load_balancing_strategy you can define a custom strategy for
assigning read commands to the replicas and primary nodes.

READONLY mode can be set at runtime by calling the readonly() method
with target_nodes=‘replicas’, and read-write access can be restored by
Expand Down
43 changes: 23 additions & 20 deletions docs/retry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,25 @@ Retry in Redis Standalone
>>> from redis.client import Redis
>>> from redis.exceptions import (
>>> BusyLoadingError,
>>> ConnectionError,
>>> TimeoutError
>>> RedisError,
>>> )
>>>
>>> # Run 3 retries with exponential backoff strategy
>>> retry = Retry(ExponentialBackoff(), 3)
>>> # Redis client with retries on custom errors
>>> r = Redis(host='localhost', port=6379, retry=retry, retry_on_error=[BusyLoadingError, ConnectionError, TimeoutError])
>>> # Redis client with retries on TimeoutError only
>>> r_only_timeout = Redis(host='localhost', port=6379, retry=retry, retry_on_timeout=True)
>>> # Redis client with retries on custom errors in addition to the errors
>>> # that are already retried by default
>>> r = Redis(host='localhost', port=6379, retry=retry, retry_on_error=[BusyLoadingError, RedisError])

As you can see from the example above, Redis client supports 3 parameters to configure the retry behaviour:
As you can see from the example above, Redis client supports 2 parameters to configure the retry behaviour:

* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries
* ``retry_on_error``: list of :ref:`exceptions-label` to retry on
* ``retry_on_timeout``: if ``True``, retry on :class:`~.TimeoutError` only
* The :class:`~.Retry` instance has default set of :ref:`exceptions-label` to retry on,
which can be overridden by passing a tuple with :ref:`exceptions-label` to the ``supported_errors`` parameter.
* ``retry_on_error``: list of additional :ref:`exceptions-label` to retry on

If either ``retry_on_error`` or ``retry_on_timeout`` are passed and no ``retry`` is given,
by default it uses a ``Retry(NoBackoff(), 1)`` (meaning 1 retry right after the first failure).

If no ``retry`` is provided, a default one is created with :class:`~.ExponentialWithJitterBackoff` as backoff strategy
and 3 retries.


Retry in Redis Cluster
Expand All @@ -44,27 +44,30 @@ Retry in Redis Cluster
>>> # Run 3 retries with exponential backoff strategy
>>> retry = Retry(ExponentialBackoff(), 3)
>>> # Redis Cluster client with retries
>>> rc = RedisCluster(host='localhost', port=6379, retry=retry, cluster_error_retry_attempts=2)
>>> rc = RedisCluster(host='localhost', port=6379, retry=retry)

Retry behaviour in Redis Cluster is a little bit different from Standalone:

* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries, default value is ``Retry(NoBackoff(), 0)``
* ``cluster_error_retry_attempts``: number of times to retry before raising an error when :class:`~.TimeoutError` or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered, default value is ``3``
* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries, default value is ``Retry(ExponentialWithJitterBackoff(base=1, cap=10), cluster_error_retry_attempts)``
* ``cluster_error_retry_attempts``: number of times to retry before raising an error when :class:`~.TimeoutError`, :class:`~.ConnectionError`, :class:`~.ClusterDownError` or :class:`~.SlotNotCoveredError` are encountered, default value is ``3``
* This argument is deprecated - it is used to initialize the number of retries for the retry object,
only in the case when the ``retry`` object is not provided.
When the ``retry`` argument is provided, the ``cluster_error_retry_attempts`` argument is ignored!

* The retry object is not yet fully utilized in the cluster client.
The retry object is used only to determine the number of retries for the cluster level calls.

Let's consider the following example:

>>> from redis.backoff import ExponentialBackoff
>>> from redis.retry import Retry
>>> from redis.cluster import RedisCluster
>>>
>>> rc = RedisCluster(host='localhost', port=6379, retry=Retry(ExponentialBackoff(), 6), cluster_error_retry_attempts=1)
>>> rc = RedisCluster(host='localhost', port=6379, retry=Retry(ExponentialBackoff(), 6))
>>> rc.set('foo', 'bar')

#. the client library calculates the hash slot for key 'foo'.
#. given the hash slot, it then determines which node to connect to, in order to execute the command.
#. during the connection, a :class:`~.ConnectionError` is raised.
#. because we set ``retry=Retry(ExponentialBackoff(), 6)``, the client tries to reconnect to the node up to 6 times, with an exponential backoff between each attempt.
#. even after 6 retries, the client is still unable to connect.
#. because we set ``cluster_error_retry_attempts=1``, before giving up, the client starts a cluster update, removes the failed node from the startup nodes, and re-initializes the cluster.
#. after the cluster has been re-initialized, it starts a new cycle of retries, up to 6 retries, with an exponential backoff.
#. if the client can connect, we're good. Otherwise, the exception is finally raised to the caller, because we've run out of attempts.
#. because we set ``retry=Retry(ExponentialBackoff(), 6)``, the cluster client starts a cluster update, removes the failed node from the startup nodes, and re-initializes the cluster.
#. the cluster client retries the command until it either succeeds or the max number of retries is reached.
97 changes: 55 additions & 42 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from redis.asyncio.lock import Lock
from redis.asyncio.retry import Retry
from redis.auth.token import TokenInterface
from redis.backoff import default_backoff
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
from redis.cluster import (
PIPELINE_BLOCKED_COMMANDS,
Expand Down Expand Up @@ -143,19 +143,23 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
To avoid reinitializing the cluster on moved errors, set reinitialize_steps to
0.
:param cluster_error_retry_attempts:
| Number of times to retry before raising an error when :class:`~.TimeoutError`
or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered
:param connection_error_retry_attempts:
| Number of times to retry before reinitializing when :class:`~.TimeoutError`
or :class:`~.ConnectionError` are encountered.
The default backoff strategy will be set if Retry object is not passed (see
default_backoff in backoff.py). To change it, pass a custom Retry object
using the "retry" keyword.
| @deprecated - Please configure the 'retry' object instead
In case 'retry' object is set - this argument is ignored!

Number of times to retry before raising an error when :class:`~.TimeoutError`,
:class:`~.ConnectionError`, :class:`~.SlotNotCoveredError`
or :class:`~.ClusterDownError` are encountered
:param retry:
| A retry object that defines the retry strategy and the number of
retries for the cluster client.
In current implementation for the cluster client (starting form redis-py version 6.0.0)
the retry object is not yet fully utilized, instead it is used just to determine
the number of retries for the cluster client.
In the future releases the retry object will be used to handle the cluster client retries!
:param max_connections:
| Maximum number of connections per node. If there are no free connections & the
maximum number of connections are already created, a
:class:`~.MaxConnectionsError` is raised. This error may be retried as defined
by :attr:`connection_error_retry_attempts`
:class:`~.MaxConnectionsError` is raised.
:param address_remap:
| An optional callable which, when provided with an internal network
address of a node, e.g. a `(host, port)` tuple, will return the address
Expand Down Expand Up @@ -211,10 +215,9 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
__slots__ = (
"_initialize",
"_lock",
"cluster_error_retry_attempts",
"retry",
"command_flags",
"commands_parser",
"connection_error_retry_attempts",
"connection_kwargs",
"encoder",
"node_flags",
Expand All @@ -231,6 +234,13 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
reason="Please configure the 'load_balancing_strategy' instead",
version="5.3.0",
)
@deprecated_args(
args_to_warn=[
"cluster_error_retry_attempts",
],
reason="Please configure the 'retry' object instead",
version="6.0.0",
)
def __init__(
self,
host: Optional[str] = None,
Expand All @@ -242,8 +252,9 @@ def __init__(
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
reinitialize_steps: int = 5,
cluster_error_retry_attempts: int = 3,
connection_error_retry_attempts: int = 3,
max_connections: int = 2**31,
retry: Optional["Retry"] = None,
retry_on_error: Optional[List[Type[Exception]]] = None,
# Client related kwargs
db: Union[str, int] = 0,
path: Optional[str] = None,
Expand All @@ -263,8 +274,6 @@ def __init__(
socket_keepalive: bool = False,
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
socket_timeout: Optional[float] = None,
retry: Optional["Retry"] = None,
retry_on_error: Optional[List[Type[Exception]]] = None,
# SSL related kwargs
ssl: bool = False,
ssl_ca_certs: Optional[str] = None,
Expand Down Expand Up @@ -318,7 +327,6 @@ def __init__(
"socket_keepalive": socket_keepalive,
"socket_keepalive_options": socket_keepalive_options,
"socket_timeout": socket_timeout,
"retry": retry,
"protocol": protocol,
}

Expand All @@ -342,17 +350,15 @@ def __init__(
# Call our on_connect function to configure READONLY mode
kwargs["redis_connect_func"] = self.on_connect

self.retry = retry
if retry or retry_on_error or connection_error_retry_attempts > 0:
# Set a retry object for all cluster nodes
self.retry = retry or Retry(
default_backoff(), connection_error_retry_attempts
if retry:
self.retry = retry
else:
self.retry = Retry(
backoff=ExponentialWithJitterBackoff(base=1, cap=10),
retries=cluster_error_retry_attempts,
)
if not retry_on_error:
# Default errors for retrying
retry_on_error = [ConnectionError, TimeoutError]
if retry_on_error:
self.retry.update_supported_errors(retry_on_error)
kwargs.update({"retry": self.retry})

kwargs["response_callbacks"] = _RedisCallbacks.copy()
if kwargs.get("protocol") in ["3", 3]:
Expand Down Expand Up @@ -389,8 +395,6 @@ def __init__(
self.read_from_replicas = read_from_replicas
self.load_balancing_strategy = load_balancing_strategy
self.reinitialize_steps = reinitialize_steps
self.cluster_error_retry_attempts = cluster_error_retry_attempts
self.connection_error_retry_attempts = connection_error_retry_attempts
self.reinitialize_counter = 0
self.commands_parser = AsyncCommandsParser()
self.node_flags = self.__class__.NODE_FLAGS.copy()
Expand Down Expand Up @@ -561,15 +565,8 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
return self.connection_kwargs

def get_retry(self) -> Optional["Retry"]:
return self.retry

def set_retry(self, retry: "Retry") -> None:
def set_retry(self, retry: Retry) -> None:
self.retry = retry
for node in self.get_nodes():
node.connection_kwargs.update({"retry": retry})
for conn in node._connections:
conn.retry = retry

def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
"""Set a custom response callback."""
Expand Down Expand Up @@ -688,8 +685,8 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
"""
Execute a raw command on the appropriate cluster node or target_nodes.

It will retry the command as specified by :attr:`cluster_error_retry_attempts` &
then raise an exception.
It will retry the command as specified by the retries property of
the :attr:`retry` & then raise an exception.

:param args:
| Raw command args
Expand All @@ -705,7 +702,7 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
command = args[0]
target_nodes = []
target_nodes_specified = False
retry_attempts = self.cluster_error_retry_attempts
retry_attempts = self.retry.get_retries()

passed_targets = kwargs.pop("target_nodes", None)
if passed_targets and not self._is_node_flag(passed_targets):
Expand Down Expand Up @@ -1048,7 +1045,23 @@ def acquire_connection(self) -> Connection:
return self._free.popleft()
except IndexError:
if len(self._connections) < self.max_connections:
connection = self.connection_class(**self.connection_kwargs)
# We are configuring the connection pool not to retry
# connections on lower level clients to avoid retrying
# connections to nodes that are not reachable
# and to avoid blocking the connection pool.
# The only error that will have some handling in the lower
# level clients is ConnectionError which will trigger disconnection
# of the socket.
# The retries will be handled on cluster client level
# where we will have proper handling of the cluster topology
retry = Retry(
backoff=NoBackoff(),
retries=0,
supported_errors=(ConnectionError,),
)
connection_kwargs = self.connection_kwargs.copy()
connection_kwargs["retry"] = retry
connection = self.connection_class(**connection_kwargs)
self._connections.append(connection)
return connection

Expand Down Expand Up @@ -1544,7 +1557,7 @@ async def execute(
"""
Execute the pipeline.

It will retry the commands as specified by :attr:`cluster_error_retry_attempts`
It will retry the commands as specified by retries specified in :attr:`retry`
& then raise an exception.

:param raise_on_error:
Expand All @@ -1560,7 +1573,7 @@ async def execute(
return []

try:
retry_attempts = self._client.cluster_error_retry_attempts
retry_attempts = self._client.retry.get_retries()
while True:
try:
if self._client._initialize:
Expand Down
12 changes: 12 additions & 0 deletions redis/asyncio/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ def update_supported_errors(self, specified_errors: list):
set(self._supported_errors + tuple(specified_errors))
)

def get_retries(self) -> int:
"""
Get the number of retries.
"""
return self._retries

def update_retries(self, value: int) -> None:
"""
Set the number of retries.
"""
self._retries = value

async def call_with_retry(
self, do: Callable[[], Awaitable[T]], fail: Callable[[RedisError], Any]
) -> T:
Expand Down
Loading
Loading