8
8
9
9
import redis .exceptions
10
10
from redis .asyncio .lock import Lock
11
- from tenacity import retry
12
-
13
- from servicelib .async_utils import cancel_wait_task , with_delay
14
- from servicelib .logging_utils import log_context
15
11
16
12
from ..background_task import periodic
17
13
from ._client import RedisClientSDK
18
- from ._constants import DEFAULT_LOCK_TTL , SHUTDOWN_TIMEOUT_S
14
+ from ._constants import DEFAULT_LOCK_TTL
19
15
from ._errors import CouldNotAcquireLockError , LockLostError
20
16
from ._utils import auto_extend_lock
21
17
25
21
R = TypeVar ("R" )
26
22
27
23
_EXCLUSIVE_TASK_NAME : Final [str ] = "exclusive/{func_name}"
28
- _EXCLUSIVE_AUTO_EXTEND_TASK_NAME : Final [str ] = (
29
- "exclusive/autoextend_lock_{redis_lock_key}"
30
- )
24
+ _EXCLUSIVE_AUTO_EXTEND_TASK_NAME : Final [
25
+ str
26
+ ] = "exclusive/autoextend_lock_{redis_lock_key}"
31
27
32
28
33
29
@periodic (interval = DEFAULT_LOCK_TTL / 2 , raise_on_error = True )
34
30
async def _periodic_auto_extender (lock : Lock , started_event : asyncio .Event ) -> None :
35
- started_event .set ()
36
31
await auto_extend_lock (lock )
37
- current_task = asyncio .tasks .current_task ()
38
- assert current_task is not None # nosec
39
- print (current_task .cancelling ())
40
-
41
-
42
- def _cancel_auto_extender_task (
43
- _ : asyncio .Task , * , auto_extend_task : asyncio .Task
44
- ) -> None :
45
- with log_context (
46
- _logger ,
47
- logging .DEBUG ,
48
- f"Cancelling auto-extend task { auto_extend_task .get_name ()} " ,
49
- ):
50
- auto_extend_task .cancel ()
51
- assert auto_extend_task .cancelling ()
32
+ started_event .set ()
52
33
53
34
54
35
def exclusive (
@@ -101,9 +82,9 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
101
82
if not await lock .acquire (
102
83
token = lock_value ,
103
84
blocking = blocking ,
104
- blocking_timeout = blocking_timeout . total_seconds ()
105
- if blocking_timeout
106
- else None ,
85
+ blocking_timeout = (
86
+ blocking_timeout . total_seconds () if blocking_timeout else None
87
+ ) ,
107
88
):
108
89
raise CouldNotAcquireLockError (lock = lock )
109
90
@@ -117,7 +98,8 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
117
98
redis_lock_key = redis_lock_key
118
99
),
119
100
)
120
- # NOTE: in case the work task is super short lived, then we might fail in cancelling it
101
+ # NOTE: In case the work thread is raising right away,
102
+ # this ensures the extend task ran once and ensure cancellation works
121
103
await started_event .wait ()
122
104
123
105
# then the task that runs the user code
@@ -127,20 +109,11 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
127
109
name = _EXCLUSIVE_TASK_NAME .format (func_name = func .__name__ ),
128
110
)
129
111
130
- # work_task.add_done_callback(
131
- # functools.partial(
132
- # _cancel_auto_extender_task,
133
- # auto_extend_task=auto_extend_lock_task,
134
- # )
135
- # )
136
-
137
112
res = await work_task
138
113
auto_extend_lock_task .cancel ()
139
-
140
114
return res
141
115
142
116
except BaseExceptionGroup as eg :
143
- breakpoint ()
144
117
# Separate exceptions into LockLostError and others
145
118
lock_lost_errors , other_errors = eg .split (LockLostError )
146
119
@@ -158,8 +131,6 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
158
131
lock .name ,
159
132
)
160
133
raise lock_lost_errors .exceptions [0 ] from eg
161
- except Exception as exc :
162
- breakpoint ()
163
134
finally :
164
135
with contextlib .suppress (redis .exceptions .LockNotOwnedError ):
165
136
# in the case where the lock would have been lost,
0 commit comments