39
39
)
40
40
from redis .asyncio .lock import Lock
41
41
from redis .asyncio .retry import Retry
42
+ from redis .backoff import ExponentialWithJitterBackoff
42
43
from redis .client import (
43
44
EMPTY_RESPONSE ,
44
45
NEVER_DECODE ,
65
66
PubSubError ,
66
67
RedisError ,
67
68
ResponseError ,
68
- TimeoutError ,
69
69
WatchError ,
70
70
)
71
71
from redis .typing import ChannelT , EncodableT , KeyT
72
72
from redis .utils import (
73
73
HIREDIS_AVAILABLE ,
74
74
SSL_AVAILABLE ,
75
75
_set_info_logger ,
76
+ deprecated_args ,
76
77
deprecated_function ,
77
78
get_lib_version ,
78
79
safe_str ,
@@ -208,6 +209,11 @@ def from_pool(
208
209
client .auto_close_connection_pool = True
209
210
return client
210
211
212
+ @deprecated_args (
213
+ args_to_warn = ["retry_on_timeout" ],
214
+ reason = "TimeoutError is included by default." ,
215
+ version = "6.0.0" ,
216
+ )
211
217
def __init__ (
212
218
self ,
213
219
* ,
@@ -225,6 +231,9 @@ def __init__(
225
231
encoding_errors : str = "strict" ,
226
232
decode_responses : bool = False ,
227
233
retry_on_timeout : bool = False ,
234
+ retry : Retry = Retry (
235
+ backoff = ExponentialWithJitterBackoff (base = 1 , cap = 10 ), retries = 3
236
+ ),
228
237
retry_on_error : Optional [list ] = None ,
229
238
ssl : bool = False ,
230
239
ssl_keyfile : Optional [str ] = None ,
@@ -242,7 +251,6 @@ def __init__(
242
251
lib_name : Optional [str ] = "redis-py" ,
243
252
lib_version : Optional [str ] = get_lib_version (),
244
253
username : Optional [str ] = None ,
245
- retry : Optional [Retry ] = None ,
246
254
auto_close_connection_pool : Optional [bool ] = None ,
247
255
redis_connect_func = None ,
248
256
credential_provider : Optional [CredentialProvider ] = None ,
@@ -251,10 +259,24 @@ def __init__(
251
259
):
252
260
"""
253
261
Initialize a new Redis client.
254
- To specify a retry policy for specific errors, first set
255
- `retry_on_error` to a list of the error/s to retry on, then set
256
- `retry` to a valid `Retry` object.
257
- To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.
262
+
263
+ To specify a retry policy for specific errors, you have two options:
264
+
265
+ 1. Set the `retry_on_error` to a list of the error/s to retry on, and
266
+ you can also set `retry` to a valid `Retry` object(in case the default
267
+ one is not appropriate) - with this approach the retries will be triggered
268
+ on the default errors specified in the Retry object enriched with the
269
+ errors specified in `retry_on_error`.
270
+
271
+ 2. Define a `Retry` object with configured 'supported_errors' and set
272
+ it to the `retry` parameter - with this approach you completely redefine
273
+ the errors on which retries will happen.
274
+
275
+ `retry_on_timeout` is deprecated - please include the TimeoutError
276
+ either in the Retry object or in the `retry_on_error` list.
277
+
278
+ When 'connection_pool' is provided - the retry configuration of the
279
+ provided pool will be used.
258
280
"""
259
281
kwargs : Dict [str , Any ]
260
282
if event_dispatcher is None :
@@ -280,8 +302,6 @@ def __init__(
280
302
# Create internal connection pool, expected to be closed by Redis instance
281
303
if not retry_on_error :
282
304
retry_on_error = []
283
- if retry_on_timeout is True :
284
- retry_on_error .append (TimeoutError )
285
305
kwargs = {
286
306
"db" : db ,
287
307
"username" : username ,
@@ -291,7 +311,6 @@ def __init__(
291
311
"encoding" : encoding ,
292
312
"encoding_errors" : encoding_errors ,
293
313
"decode_responses" : decode_responses ,
294
- "retry_on_timeout" : retry_on_timeout ,
295
314
"retry_on_error" : retry_on_error ,
296
315
"retry" : copy .deepcopy (retry ),
297
316
"max_connections" : max_connections ,
@@ -403,10 +422,10 @@ def get_connection_kwargs(self):
403
422
"""Get the connection's key-word arguments"""
404
423
return self .connection_pool .connection_kwargs
405
424
406
- def get_retry (self ) -> Optional [" Retry" ]:
425
+ def get_retry (self ) -> Optional [Retry ]:
407
426
return self .get_connection_kwargs ().get ("retry" )
408
427
409
- def set_retry (self , retry : " Retry" ) -> None :
428
+ def set_retry (self , retry : Retry ) -> None :
410
429
self .get_connection_kwargs ().update ({"retry" : retry })
411
430
self .connection_pool .set_retry (retry )
412
431
@@ -633,18 +652,17 @@ async def _send_command_parse_response(self, conn, command_name, *args, **option
633
652
await conn .send_command (* args )
634
653
return await self .parse_response (conn , command_name , ** options )
635
654
636
- async def _disconnect_raise (self , conn : Connection , error : Exception ):
655
+ async def _close_connection (self , conn : Connection ):
637
656
"""
638
- Close the connection and raise an exception
639
- if retry_on_error is not set or the error
640
- is not one of the specified error types
657
+ Close the connection before retrying.
658
+
659
+ The supported exceptions are already checked in the
660
+ retry object so we don't need to do it here.
661
+
662
+ After we disconnect the connection, it will try to reconnect and
663
+ do a health check as part of the send_command logic(on connection level).
641
664
"""
642
665
await conn .disconnect ()
643
- if (
644
- conn .retry_on_error is None
645
- or isinstance (error , tuple (conn .retry_on_error )) is False
646
- ):
647
- raise error
648
666
649
667
# COMMAND EXECUTION AND PROTOCOL PARSING
650
668
async def execute_command (self , * args , ** options ):
@@ -661,7 +679,7 @@ async def execute_command(self, *args, **options):
661
679
lambda : self ._send_command_parse_response (
662
680
conn , command_name , * args , ** options
663
681
),
664
- lambda error : self ._disconnect_raise (conn , error ),
682
+ lambda _ : self ._close_connection (conn ),
665
683
)
666
684
finally :
667
685
if self .single_connection_client :
@@ -929,19 +947,11 @@ async def connect(self):
929
947
)
930
948
)
931
949
932
- async def _disconnect_raise_connect (self , conn , error ):
950
+ async def _reconnect (self , conn ):
933
951
"""
934
- Close the connection and raise an exception
935
- if retry_on_error is not set or the error is not one
936
- of the specified error types. Otherwise, try to
937
- reconnect
952
+ Try to reconnect
938
953
"""
939
954
await conn .disconnect ()
940
- if (
941
- conn .retry_on_error is None
942
- or isinstance (error , tuple (conn .retry_on_error )) is False
943
- ):
944
- raise error
945
955
await conn .connect ()
946
956
947
957
async def _execute (self , conn , command , * args , ** kwargs ):
@@ -954,7 +964,7 @@ async def _execute(self, conn, command, *args, **kwargs):
954
964
"""
955
965
return await conn .retry .call_with_retry (
956
966
lambda : command (* args , ** kwargs ),
957
- lambda error : self ._disconnect_raise_connect (conn , error ),
967
+ lambda _ : self ._reconnect (conn ),
958
968
)
959
969
960
970
async def parse_response (self , block : bool = True , timeout : float = 0 ):
@@ -1245,7 +1255,8 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
1245
1255
in one transmission. This is convenient for batch processing, such as
1246
1256
saving all the values in a list to Redis.
1247
1257
1248
- All commands executed within a pipeline are wrapped with MULTI and EXEC
1258
+ All commands executed within a pipeline(when running in transactional mode,
1259
+ which is the default behavior) are wrapped with MULTI and EXEC
1249
1260
calls. This guarantees all commands executed in the pipeline will be
1250
1261
executed atomically.
1251
1262
@@ -1274,7 +1285,7 @@ def __init__(
1274
1285
self .shard_hint = shard_hint
1275
1286
self .watching = False
1276
1287
self .command_stack : CommandStackT = []
1277
- self .scripts : Set [" Script" ] = set ()
1288
+ self .scripts : Set [Script ] = set ()
1278
1289
self .explicit_transaction = False
1279
1290
1280
1291
async def __aenter__ (self : _RedisT ) -> _RedisT :
@@ -1346,36 +1357,36 @@ def execute_command(
1346
1357
return self .immediate_execute_command (* args , ** kwargs )
1347
1358
return self .pipeline_execute_command (* args , ** kwargs )
1348
1359
1349
- async def _disconnect_reset_raise (self , conn , error ):
1360
+ async def _disconnect_reset_raise_on_watching (
1361
+ self ,
1362
+ conn : Connection ,
1363
+ error : Exception ,
1364
+ ):
1350
1365
"""
1351
- Close the connection, reset watching state and
1352
- raise an exception if we were watching,
1353
- if retry_on_error is not set or the error is not one
1354
- of the specified error types.
1366
+ Close the connection reset watching state and
1367
+ raise an exception if we were watching.
1368
+
1369
+ The supported exceptions are already checked in the
1370
+ retry object so we don't need to do it here.
1371
+
1372
+ After we disconnect the connection, it will try to reconnect and
1373
+ do a health check as part of the send_command logic(on connection level).
1355
1374
"""
1356
1375
await conn .disconnect ()
1357
1376
# if we were already watching a variable, the watch is no longer
1358
1377
# valid since this connection has died. raise a WatchError, which
1359
1378
# indicates the user should retry this transaction.
1360
1379
if self .watching :
1361
- await self .aclose ()
1380
+ await self .reset ()
1362
1381
raise WatchError (
1363
- "A ConnectionError occurred on while watching one or more keys"
1382
+ f "A { type ( error ). __name__ } occurred while watching one or more keys"
1364
1383
)
1365
- # if retry_on_error is not set or the error is not one
1366
- # of the specified error types, raise it
1367
- if (
1368
- conn .retry_on_error is None
1369
- or isinstance (error , tuple (conn .retry_on_error )) is False
1370
- ):
1371
- await self .aclose ()
1372
- raise
1373
1384
1374
1385
async def immediate_execute_command (self , * args , ** options ):
1375
1386
"""
1376
- Execute a command immediately, but don't auto-retry on a
1377
- ConnectionError if we're already WATCHing a variable. Used when
1378
- issuing WATCH or subsequent commands retrieving their values but before
1387
+ Execute a command immediately, but don't auto-retry on the supported
1388
+ errors for retry if we're already WATCHing a variable.
1389
+ Used when issuing WATCH or subsequent commands retrieving their values but before
1379
1390
MULTI is called.
1380
1391
"""
1381
1392
command_name = args [0 ]
@@ -1389,7 +1400,7 @@ async def immediate_execute_command(self, *args, **options):
1389
1400
lambda : self ._send_command_parse_response (
1390
1401
conn , command_name , * args , ** options
1391
1402
),
1392
- lambda error : self ._disconnect_reset_raise (conn , error ),
1403
+ lambda error : self ._disconnect_reset_raise_on_watching (conn , error ),
1393
1404
)
1394
1405
1395
1406
def pipeline_execute_command (self , * args , ** options ):
@@ -1544,28 +1555,24 @@ async def load_scripts(self):
1544
1555
if not exist :
1545
1556
s .sha = await immediate ("SCRIPT LOAD" , s .script )
1546
1557
1547
- async def _disconnect_raise_reset (self , conn : Connection , error : Exception ):
1558
+ async def _disconnect_raise_on_watching (self , conn : Connection , error : Exception ):
1548
1559
"""
1549
- Close the connection, raise an exception if we were watching,
1550
- and raise an exception if retry_on_error is not set or the
1551
- error is not one of the specified error types.
1560
+ Close the connection, raise an exception if we were watching.
1561
+
1562
+ The supported exceptions are already checked in the
1563
+ retry object so we don't need to do it here.
1564
+
1565
+ After we disconnect the connection, it will try to reconnect and
1566
+ do a health check as part of the send_command logic(on connection level).
1552
1567
"""
1553
1568
await conn .disconnect ()
1554
1569
# if we were watching a variable, the watch is no longer valid
1555
1570
# since this connection has died. raise a WatchError, which
1556
1571
# indicates the user should retry this transaction.
1557
1572
if self .watching :
1558
1573
raise WatchError (
1559
- "A ConnectionError occurred on while watching one or more keys"
1574
+ f "A { type ( error ). __name__ } occurred while watching one or more keys"
1560
1575
)
1561
- # if retry_on_error is not set or the error is not one
1562
- # of the specified error types, raise it
1563
- if (
1564
- conn .retry_on_error is None
1565
- or isinstance (error , tuple (conn .retry_on_error )) is False
1566
- ):
1567
- await self .reset ()
1568
- raise
1569
1576
1570
1577
async def execute (self , raise_on_error : bool = True ) -> List [Any ]:
1571
1578
"""Execute all the commands in the current pipeline"""
@@ -1590,7 +1597,7 @@ async def execute(self, raise_on_error: bool = True) -> List[Any]:
1590
1597
try :
1591
1598
return await conn .retry .call_with_retry (
1592
1599
lambda : execute (conn , stack , raise_on_error ),
1593
- lambda error : self ._disconnect_raise_reset (conn , error ),
1600
+ lambda error : self ._disconnect_raise_on_watching (conn , error ),
1594
1601
)
1595
1602
finally :
1596
1603
await self .reset ()
0 commit comments