Skip to content

🐛Redis locks disappearing and fixup weird usage #7020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
158a548
unskip test
sanderegg Jan 8, 2025
b83a4ed
add function to create distributed locks
sanderegg Jan 8, 2025
9367e12
prepare exclusive decorator instead of context manager
sanderegg Jan 8, 2025
15b302a
ongoing
sanderegg Jan 8, 2025
7c714b4
ongoing
sanderegg Jan 8, 2025
820d4fc
adapted test
sanderegg Jan 8, 2025
a994725
moved async_delay to async_utils and renamed
sanderegg Jan 9, 2025
58f5c56
cleanup
sanderegg Jan 9, 2025
bb527e2
refactor
sanderegg Jan 9, 2025
6939e3b
cleanup
sanderegg Jan 9, 2025
5fe0ab6
typing
sanderegg Jan 9, 2025
0ab6b96
remove lock_context
sanderegg Jan 9, 2025
51c0ee6
renaming
sanderegg Jan 9, 2025
d412df8
renaming
sanderegg Jan 9, 2025
b69e128
ongoing
sanderegg Jan 9, 2025
ca859f8
fix wrong name
sanderegg Jan 9, 2025
c900e52
add redis commander
sanderegg Jan 9, 2025
026b7b1
better names
sanderegg Jan 9, 2025
2877f4b
exclusive expects coroutines
sanderegg Jan 9, 2025
eea85df
syntax
sanderegg Jan 9, 2025
d0fd5a6
raise timeout error
sanderegg Jan 9, 2025
f1a3a9f
re-write exclusive task starter
sanderegg Jan 9, 2025
74f1b2d
added periodic function decorator
sanderegg Jan 9, 2025
ff5fc8b
test passing
sanderegg Jan 9, 2025
8f148b2
test decorator
sanderegg Jan 9, 2025
974ac7a
@pcrespov review: rename
sanderegg Jan 9, 2025
ebce6a4
removed useless code
sanderegg Jan 9, 2025
964d7c5
add default
sanderegg Jan 9, 2025
7bd12ab
rename
sanderegg Jan 9, 2025
613c51a
cleanup
sanderegg Jan 9, 2025
ea67118
clean
sanderegg Jan 9, 2025
561019c
refactor
sanderegg Jan 9, 2025
f3ca43c
refactor
sanderegg Jan 9, 2025
fba8d53
more cleanups
sanderegg Jan 9, 2025
fec85df
ongoing
sanderegg Jan 9, 2025
0291b7f
almost fully tested
sanderegg Jan 9, 2025
6e07765
added the blocking behavior
sanderegg Jan 9, 2025
6f0661e
rename
sanderegg Jan 9, 2025
789371b
remove duplicate of duplicate
sanderegg Jan 9, 2025
c73d34f
clean
sanderegg Jan 9, 2025
e1d78ca
remove usage of lock_context
sanderegg Jan 9, 2025
d7bae0e
mypy
sanderegg Jan 9, 2025
0be583e
fixed call
sanderegg Jan 10, 2025
167cb5a
renaming
sanderegg Jan 10, 2025
ae59f0b
move out
sanderegg Jan 10, 2025
8f89854
minor
sanderegg Jan 10, 2025
34c7d46
name the asyncio task
sanderegg Jan 10, 2025
9d83bb1
fix name
sanderegg Jan 10, 2025
6aae220
minor
sanderegg Jan 10, 2025
7628fbc
ruff
sanderegg Jan 10, 2025
563896f
refactor
sanderegg Jan 10, 2025
e6eab3e
missing
sanderegg Jan 10, 2025
4542b98
improved lock value
sanderegg Jan 10, 2025
a61a220
fixed usage
sanderegg Jan 10, 2025
18774e8
improve docs
sanderegg Jan 13, 2025
e8dff45
improve fixture
sanderegg Jan 13, 2025
184d927
docs
sanderegg Jan 13, 2025
1333a86
set envs correctly
sanderegg Jan 13, 2025
1f5d893
add waiting for calls
sanderegg Jan 13, 2025
5775b01
doc
sanderegg Jan 13, 2025
dc276f3
ongoing
sanderegg Jan 13, 2025
d4b26d3
test passes
sanderegg Jan 13, 2025
846c637
test passes
sanderegg Jan 13, 2025
14831d0
moved exclusive periodic
sanderegg Jan 13, 2025
1799991
moved exclusive start to its own module
sanderegg Jan 14, 2025
a781aa3
change usage
sanderegg Jan 14, 2025
f6f012f
renamed
sanderegg Jan 14, 2025
90365ce
doc
sanderegg Jan 14, 2025
6d9379c
fix weird issue
sanderegg Jan 14, 2025
5c7ba02
fixed test
sanderegg Jan 14, 2025
66b22a9
much faster tests
sanderegg Jan 14, 2025
2c49b08
clean
sanderegg Jan 14, 2025
d02a891
ensure healthcheck task started
sanderegg Jan 14, 2025
fb6c2bd
fixed shutdown
sanderegg Jan 14, 2025
3a41a12
name
sanderegg Jan 14, 2025
cd3e071
no use
sanderegg Jan 14, 2025
8f8c7c5
added utils to complete testing
sanderegg Jan 14, 2025
0c72d44
remove use of setup
sanderegg Jan 14, 2025
746fffb
no setup
sanderegg Jan 14, 2025
f0c0a6d
@GitHK review
sanderegg Jan 14, 2025
b30e1d6
@pcrespov review: renaming things
sanderegg Jan 14, 2025
b3c6b34
@pcrespov reviews
sanderegg Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import asyncio
import logging
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import suppress

import aio_pika
import pytest
Expand Down Expand Up @@ -141,12 +140,11 @@ async def ensure_parametrized_queue_is_empty(
rabbitmq_client = create_rabbitmq_client("pytest-purger")

async def _queue_messages_purger() -> None:
with suppress(aio_pika.exceptions.ChannelClosed):
assert rabbitmq_client._channel_pool # noqa: SLF001
async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001
assert isinstance(channel, aio_pika.RobustChannel)
queue = await channel.get_queue(queue_name)
await queue.purge()
assert rabbitmq_client._channel_pool # noqa: SLF001
async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001
assert isinstance(channel, aio_pika.RobustChannel)
queue = await channel.get_queue(queue_name)
await queue.purge()

await _queue_messages_purger()
yield
Expand Down
5 changes: 3 additions & 2 deletions packages/pytest-simcore/src/pytest_simcore/redis_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import tenacity
from pytest_mock import MockerFixture
from redis.asyncio import Redis, from_url
from servicelib.redis import _constants as redis_constants
from settings_library.basic_types import PortInt
from settings_library.redis import RedisDatabase, RedisSettings
from tenacity.before_sleep import before_sleep_log
Expand Down Expand Up @@ -119,4 +118,6 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None:
@pytest.fixture
def mock_redis_socket_timeout(mocker: MockerFixture) -> None:
# lowered to allow CI to properly shutdown RedisClientSDK instances
mocker.patch.object(redis_constants, "DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1))
mocker.patch(
"servicelib.redis._client.DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=0.25)
)
64 changes: 54 additions & 10 deletions packages/service-library/src/servicelib/async_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import asyncio
import contextlib
import datetime
import logging
from collections import deque
from collections.abc import Awaitable, Callable, Coroutine
from contextlib import suppress
from dataclasses import dataclass
from functools import wraps
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Deque
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar

from . import tracing
from .utils_profiling_middleware import dont_profile, is_profiling, profile_context

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)

P = ParamSpec("P")
R = TypeVar("R")


if TYPE_CHECKING:
Queue = asyncio.Queue
Expand Down Expand Up @@ -54,7 +61,7 @@ async def _safe_cancel(context: Context) -> None:
await context.task
except RuntimeError as e:
if "Event loop is closed" in f"{e}":
logger.warning("event loop is closed and could not cancel %s", context)
_logger.warning("event loop is closed and could not cancel %s", context)
else:
raise

Expand All @@ -65,7 +72,7 @@ async def cancel_sequential_workers() -> None:
await _safe_cancel(context)

_sequential_jobs_contexts.clear()
logger.info("All run_sequentially_in_context pending workers stopped")
_logger.info("All run_sequentially_in_context pending workers stopped")


# NOTE: If you get funny mismatches with mypy in returned values it might be due to this decorator.
Expand Down Expand Up @@ -118,25 +125,25 @@ def _get_context(args: Any, kwargs: dict) -> Context:
arg_names = decorated_function.__code__.co_varnames[
: decorated_function.__code__.co_argcount
]
search_args = dict(zip(arg_names, args))
search_args = dict(zip(arg_names, args, strict=False))
search_args.update(kwargs)

key_parts: Deque[str] = deque()
key_parts: deque[str] = deque()
for arg in target_args:
sub_args = arg.split(".")
main_arg = sub_args[0]
if main_arg not in search_args:
raise ValueError(
msg = (
f"Expected '{main_arg}' in '{decorated_function.__name__}'"
f" arguments. Got '{search_args}'"
)
raise ValueError(msg)
context_key = search_args[main_arg]
for attribute in sub_args[1:]:
potential_key = getattr(context_key, attribute)
if not potential_key:
raise ValueError(
f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments."
)
msg = f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments."
raise ValueError(msg)
context_key = potential_key

key_parts.append(f"{decorated_function.__name__}_{context_key}")
Expand Down Expand Up @@ -205,3 +212,40 @@ async def worker(in_q: Queue[QueueElement], out_q: Queue) -> None:
return wrapper

return decorator


def delayed_start(
delay: datetime.timedelta,
) -> Callable[
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
]:
def _decorator(
func: Callable[P, Coroutine[Any, Any, R]],
) -> Callable[P, Coroutine[Any, Any, R]]:
@wraps(func)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
await asyncio.sleep(delay.total_seconds())
return await func(*args, **kwargs)

return _wrapper

return _decorator


async def cancel_wait_task(
task: asyncio.Task,
*,
max_delay: float | None = None,
) -> None:
"""Cancel a asyncio.Task and waits for it to finish.

:param task: task to be canceled
:param max_delay: duration (in seconds) to wait before giving
up the cancellation. If None it waits forever.
:raises TimeoutError: raised if cannot cancel the task.
"""

task.cancel()
async with asyncio.timeout(max_delay):
with contextlib.suppress(asyncio.CancelledError):
await task
176 changes: 80 additions & 96 deletions packages/service-library/src/servicelib/background_task.py
Original file line number Diff line number Diff line change
@@ -1,136 +1,115 @@
import asyncio
import contextlib
import datetime
import functools
import logging
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import Final
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
from typing import Any, Final, ParamSpec, TypeVar

from common_library.errors_classes import OsparcErrorMixin
from tenacity import TryAgain
from tenacity.asyncio import AsyncRetrying
from tenacity.stop import stop_after_attempt
from tenacity import TryAgain, before_sleep_log, retry, retry_if_exception_type
from tenacity.wait import wait_fixed

from .decorators import async_delayed
from .logging_utils import log_catch, log_context
from .async_utils import cancel_wait_task, delayed_start
from .logging_utils import log_context

_logger = logging.getLogger(__name__)


_DEFAULT_STOP_TIMEOUT_S: Final[int] = 5
_MAX_TASK_CANCELLATION_ATTEMPTS: Final[int] = 3


class PeriodicTaskCancellationError(OsparcErrorMixin, Exception):
msg_template: str = "Could not cancel task '{task_name}'"


class SleepUsingAsyncioEvent:
"""Sleep strategy that waits on an event to be set."""
"""Sleep strategy that waits on an event to be set or sleeps."""

def __init__(self, event: "asyncio.Event") -> None:
self.event = event

async def __call__(self, timeout: float | None) -> None:
async def __call__(self, delay: float | None) -> None:
with contextlib.suppress(TimeoutError):
await asyncio.wait_for(self.event.wait(), timeout=timeout)
await asyncio.wait_for(self.event.wait(), timeout=delay)
self.event.clear()


async def _periodic_scheduled_task(
task: Callable[..., Awaitable[None]],
P = ParamSpec("P")
R = TypeVar("R")


def periodic(
*,
interval: datetime.timedelta,
task_name: str,
early_wake_up_event: asyncio.Event | None,
**task_kwargs,
) -> None:
# NOTE: This retries forever unless cancelled
nap = (
asyncio.sleep
if early_wake_up_event is None
else SleepUsingAsyncioEvent(early_wake_up_event)
)
async for attempt in AsyncRetrying(
sleep=nap,
wait=wait_fixed(interval.total_seconds()),
):
with attempt:
with log_context(
_logger,
logging.DEBUG,
msg=f"iteration {attempt.retry_state.attempt_number} of '{task_name}'",
), log_catch(_logger):
await task(**task_kwargs)
raise_on_error: bool = False,
early_wake_up_event: asyncio.Event | None = None,
) -> Callable[
[Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]]
]:
"""Calls the function periodically with a given interval.

Arguments:
interval -- the interval between calls

Keyword Arguments:
raise_on_error -- If False the function will be retried indefinitely unless cancelled.
If True the function will be retried indefinitely unless cancelled
or an exception is raised. (default: {False})
early_wake_up_event -- allows to awaken the function before the interval has passed. (default: {None})

Returns:
coroutine that will be called periodically (runs forever)
"""

def _decorator(
func: Callable[P, Coroutine[Any, Any, None]],
) -> Callable[P, Coroutine[Any, Any, None]]:
nap = (
asyncio.sleep
if early_wake_up_event is None
else SleepUsingAsyncioEvent(early_wake_up_event)
)

@retry(
sleep=nap,
wait=wait_fixed(interval.total_seconds()),
reraise=True,
retry=(
retry_if_exception_type(TryAgain)
if raise_on_error
else retry_if_exception_type()
),
before_sleep=before_sleep_log(_logger, logging.DEBUG),
)
@functools.wraps(func)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
await func(*args, **kwargs)
raise TryAgain

return _wrapper

def start_periodic_task(
return _decorator


def create_periodic_task(
task: Callable[..., Awaitable[None]],
*,
interval: datetime.timedelta,
task_name: str,
raise_on_error: bool = False,
wait_before_running: datetime.timedelta = datetime.timedelta(0),
early_wake_up_event: asyncio.Event | None = None,
**kwargs,
) -> asyncio.Task:
with log_context(
_logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'"
):
delayed_periodic_scheduled_task = async_delayed(wait_before_running)(
_periodic_scheduled_task
)
return asyncio.create_task(
delayed_periodic_scheduled_task(
task,
interval=interval,
task_name=task_name,
early_wake_up_event=early_wake_up_event,
**kwargs,
),
name=task_name,
)

@delayed_start(wait_before_running)
@periodic(
interval=interval,
raise_on_error=raise_on_error,
early_wake_up_event=early_wake_up_event,
)
async def _() -> None:
await task(**kwargs)

async def cancel_task(
task: asyncio.Task,
*,
timeout: float | None,
cancellation_attempts: int = _MAX_TASK_CANCELLATION_ATTEMPTS,
) -> None:
"""Reliable task cancellation. Some libraries will just hang without
cancelling the task. It is important to retry the operation to provide
a timeout in that situation to avoid forever pending tasks.

:param task: task to be canceled
:param timeout: total duration (in seconds) to wait before giving
up the cancellation. If None it waits forever.
:raises TryAgain: raised if cannot cancel the task.
"""
async for attempt in AsyncRetrying(
stop=stop_after_attempt(cancellation_attempts), reraise=True
):
with attempt:
task.cancel()
_, pending = await asyncio.wait((task,), timeout=timeout)
if pending:
task_name = task.get_name()
_logger.info(
"tried to cancel '%s' but timed-out! %s", task_name, pending
)
raise PeriodicTaskCancellationError(task_name=task_name)


async def stop_periodic_task(
asyncio_task: asyncio.Task, *, timeout: float | None = None
) -> None:
with log_context(
_logger,
logging.DEBUG,
msg=f"cancel periodic background task '{asyncio_task.get_name()}'",
_logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'"
):
await cancel_task(asyncio_task, timeout=timeout)
return asyncio.create_task(_(), name=task_name)


@contextlib.asynccontextmanager
Expand All @@ -140,16 +119,21 @@ async def periodic_task(
interval: datetime.timedelta,
task_name: str,
stop_timeout: float = _DEFAULT_STOP_TIMEOUT_S,
raise_on_error: bool = False,
**kwargs,
) -> AsyncIterator[asyncio.Task]:
asyncio_task: asyncio.Task | None = None
try:
asyncio_task = start_periodic_task(
task, interval=interval, task_name=task_name, **kwargs
asyncio_task = create_periodic_task(
task,
interval=interval,
task_name=task_name,
raise_on_error=raise_on_error,
**kwargs,
)
yield asyncio_task
finally:
if asyncio_task is not None:
# NOTE: this stopping is shielded to prevent the cancellation to propagate
# into the stopping procedure
await asyncio.shield(stop_periodic_task(asyncio_task, timeout=stop_timeout))
await asyncio.shield(cancel_wait_task(asyncio_task, max_delay=stop_timeout))
Loading
Loading