Skip to content

Commit 2996617

Browse files
committed
ongoing
1 parent c73d56f commit 2996617

File tree

2 files changed

+114
-39
lines changed

2 files changed

+114
-39
lines changed

packages/service-library/src/servicelib/redis/_decorators.py

Lines changed: 83 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,52 @@
44
import logging
55
from collections.abc import Callable, Coroutine
66
from datetime import timedelta
7-
from typing import Any, ParamSpec, TypeVar
7+
from typing import Any, Final, ParamSpec, TypeVar
88

99
import redis.exceptions
10+
from redis.asyncio.lock import Lock
11+
from tenacity import retry
1012

11-
from ..background_task import periodic_task
12-
from ..logging_utils import log_context
13+
from servicelib.async_utils import cancel_wait_task, with_delay
14+
from servicelib.logging_utils import log_context
15+
16+
from ..background_task import periodic
1317
from ._client import RedisClientSDK
1418
from ._constants import DEFAULT_LOCK_TTL, SHUTDOWN_TIMEOUT_S
15-
from ._errors import CouldNotAcquireLockError
19+
from ._errors import CouldNotAcquireLockError, LockLostError
1620
from ._utils import auto_extend_lock
1721

1822
_logger = logging.getLogger(__file__)
1923

2024
P = ParamSpec("P")
2125
R = TypeVar("R")
2226

27+
_EXCLUSIVE_TASK_NAME: Final[str] = "exclusive/{func_name}"
28+
_EXCLUSIVE_AUTO_EXTEND_TASK_NAME: Final[str] = (
29+
"exclusive/autoextend_lock_{redis_lock_key}"
30+
)
31+
32+
33+
@periodic(interval=DEFAULT_LOCK_TTL / 2, raise_on_error=True)
34+
async def _periodic_auto_extender(lock: Lock, started_event: asyncio.Event) -> None:
35+
started_event.set()
36+
await auto_extend_lock(lock)
37+
current_task = asyncio.tasks.current_task()
38+
assert current_task is not None # nosec
39+
print(current_task.cancelling())
40+
41+
42+
def _cancel_auto_extender_task(
43+
_: asyncio.Task, *, auto_extend_task: asyncio.Task
44+
) -> None:
45+
with log_context(
46+
_logger,
47+
logging.DEBUG,
48+
f"Cancelling auto-extend task {auto_extend_task.get_name()}",
49+
):
50+
auto_extend_task.cancel()
51+
assert auto_extend_task.cancelling()
52+
2353

2454
def exclusive(
2555
redis_client: RedisClientSDK | Callable[..., RedisClientSDK],
@@ -78,42 +108,58 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
78108
raise CouldNotAcquireLockError(lock=lock)
79109

80110
try:
81-
async with periodic_task(
82-
auto_extend_lock,
83-
interval=DEFAULT_LOCK_TTL / 2,
84-
task_name=f"autoextend_exclusive_lock_{redis_lock_key}",
85-
raise_on_error=True,
86-
lock=lock,
87-
) as auto_extend_task:
88-
assert asyncio.iscoroutinefunction(func) # nosec
89-
work_task = asyncio.create_task(
90-
func(*args, **kwargs), name=f"exclusive_{func.__name__}"
91-
)
92-
done, _pending = await asyncio.wait(
93-
[work_task, auto_extend_task],
94-
return_when=asyncio.FIRST_COMPLETED,
111+
async with asyncio.TaskGroup() as tg:
112+
started_event = asyncio.Event()
113+
# first create a task that will auto-extend the lock
114+
auto_extend_lock_task = tg.create_task(
115+
_periodic_auto_extender(lock, started_event),
116+
name=_EXCLUSIVE_AUTO_EXTEND_TASK_NAME.format(
117+
redis_lock_key=redis_lock_key
118+
),
95119
)
96-
# the task finished, let's return its result whatever it is
97-
if work_task in done:
98-
return await work_task
99-
100-
# the auto extend task can only finish if it raised an error, so it's bad
101-
_logger.error(
102-
"lock %s could not be auto-extended, cancelling work task! "
103-
"TIP: check connection to Redis DBs or look for Synchronous "
104-
"code that might block the auto-extender task.",
105-
lock.name,
120+
# NOTE: in case the work task is super short lived, then we might fail in cancelling it
121+
await started_event.wait()
122+
123+
# then the task that runs the user code
124+
assert asyncio.iscoroutinefunction(func) # nosec
125+
work_task = tg.create_task(
126+
func(*args, **kwargs),
127+
name=_EXCLUSIVE_TASK_NAME.format(func_name=func.__name__),
106128
)
107-
with log_context(_logger, logging.DEBUG, msg="cancel work task"):
108-
work_task.cancel()
109-
with contextlib.suppress(asyncio.CancelledError, TimeoutError):
110-
# this will raise any other error that could have happened in the work task
111-
await asyncio.wait_for(
112-
work_task, timeout=SHUTDOWN_TIMEOUT_S
113-
)
114-
# return the extend task raised error
115-
return await auto_extend_task # type: ignore[no-any-return] # will raise
116129

130+
# work_task.add_done_callback(
131+
# functools.partial(
132+
# _cancel_auto_extender_task,
133+
# auto_extend_task=auto_extend_lock_task,
134+
# )
135+
# )
136+
137+
res = await work_task
138+
auto_extend_lock_task.cancel()
139+
140+
return res
141+
142+
except BaseExceptionGroup as eg:
143+
breakpoint()
144+
# Separate exceptions into LockLostError and others
145+
lock_lost_errors, other_errors = eg.split(LockLostError)
146+
147+
# If there are any other errors, re-raise them
148+
if other_errors:
149+
assert len(other_errors.exceptions) == 1 # nosec
150+
raise other_errors.exceptions[0] from eg
151+
152+
assert lock_lost_errors is not None # nosec
153+
assert len(lock_lost_errors.exceptions) == 1 # nosec
154+
_logger.error( # noqa: TRY400
155+
"lock %s could not be auto-extended! "
156+
"TIP: check connection to Redis DBs or look for Synchronous "
157+
"code that might block the auto-extender task. Somehow the distributed lock disappeared!",
158+
lock.name,
159+
)
160+
raise lock_lost_errors.exceptions[0] from eg
161+
except Exception as exc:
162+
breakpoint()
117163
finally:
118164
with contextlib.suppress(redis.exceptions.LockNotOwnedError):
119165
# in the case where the lock would have been lost,

packages/service-library/tests/redis/test_decorators.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from faker import Faker
1414
from servicelib.redis import CouldNotAcquireLockError, RedisClientSDK, exclusive
1515
from servicelib.redis._errors import LockLostError
16+
from servicelib.redis._decorators import _EXCLUSIVE_AUTO_EXTEND_TASK_NAME
1617
from servicelib.utils import limited_gather, logged_gather
1718

1819
pytest_simcore_core_services_selection = [
@@ -213,6 +214,9 @@ async def test_exclusive_task_erroring_releases_lock(
213214
):
214215
@exclusive(redis_client_sdk, lock_key=lock_name)
215216
async def _() -> None:
217+
# await asyncio.sleep(0)
218+
219+
print("raising error")
216220
msg = "Expected error"
217221
raise RuntimeError(msg)
218222

@@ -221,10 +225,10 @@ async def _() -> None:
221225
assert await redis_client_sdk.lock_value(lock_name) is None
222226

223227
# run the exclusive task
224-
exclusive_task = asyncio.create_task(_())
228+
# exclusive_task = asyncio.create_task(_())
225229

226230
with pytest.raises(RuntimeError):
227-
await exclusive_task
231+
await _()
228232

229233
assert await redis_client_sdk.lock_value(lock_name) is None
230234

@@ -267,3 +271,28 @@ async def _inc_counter() -> None:
267271
*(_inc_counter() for _ in range(INCREASE_OPERATIONS)), limit=15
268272
)
269273
assert counter.value == INCREASE_BY * INCREASE_OPERATIONS
274+
275+
276+
async def test_cancelling_exclusive_task_works(
277+
redis_client_sdk: RedisClientSDK, lock_name: str
278+
):
279+
started_event = asyncio.Event()
280+
281+
@exclusive(redis_client_sdk, lock_key=lock_name)
282+
async def _(time_to_sleep: datetime.timedelta) -> datetime.timedelta:
283+
started_event.set()
284+
await asyncio.sleep(time_to_sleep.total_seconds())
285+
return time_to_sleep
286+
287+
exclusive_task = asyncio.create_task(_(datetime.timedelta(seconds=10)))
288+
await asyncio.wait_for(started_event.wait(), timeout=2)
289+
exclusive_task.cancel()
290+
291+
with pytest.raises(asyncio.CancelledError):
292+
await exclusive_task
293+
294+
assert not await _is_locked(redis_client_sdk, lock_name)
295+
296+
assert _EXCLUSIVE_AUTO_EXTEND_TASK_NAME.format(redis_lock_key=lock_name) not in [
297+
t.get_name() for t in asyncio.tasks.all_tasks()
298+
], "The auto extend lock task was not properly stopped!"

0 commit comments

Comments
 (0)