Skip to content

Commit 9e0d6df

Browse files
GitHKAndrei Neagusandereggpcrespov
authored
bugfix: addressed issues when acquiring mpi_lock in sidecar (#2093)
* mpi locking got refacted to use multiprocessing * it should no longer be possible to start multiple MPI nodes in case of errors * minor refactor * added more comments and written in a more clear logic * Update mpi_lock.py updating comment * added tests for mpi_lock module * refactored fucntion names * migrated function to private * semplified mpi_lock acquisition * updated docstring and comment * final refactor to the mpi_locking using a proper * Update services/sidecar/src/simcore_service_sidecar/mpi_lock.py Co-authored-by: Sylvain <[email protected]> * inverted test oder * before trying to acquire lock, make sure redis is working * adding more debug * Git hk fix mpi sidecar locking (#2) * Minor cleanup in pytest_simcore/docker_swarm and redis_service * Minor * tests improvmentsd - adde dmore cases - patched redis config properly * improved mpi_lock for high concurrency * Update test_mpi_lock.py we do not have such high concurrency Co-authored-by: Andrei Neagu <[email protected]> Co-authored-by: Sylvain <[email protected]> Co-authored-by: Pedro Crespo-Valero <[email protected]>
1 parent d3882e3 commit 9e0d6df

File tree

5 files changed

+166
-108
lines changed

5 files changed

+166
-108
lines changed

packages/pytest-simcore/src/pytest_simcore/docker_swarm.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from datetime import datetime
99
from pathlib import Path
1010
from pprint import pprint
11-
from typing import Dict
11+
from typing import Dict, Iterator
1212

1313
import docker
1414
import pytest
@@ -20,7 +20,7 @@
2020

2121

2222
@pytest.fixture(scope="session")
23-
def docker_client() -> docker.client.DockerClient:
23+
def docker_client() -> Iterator[docker.client.DockerClient]:
2424
client = docker.from_env()
2525
yield client
2626

@@ -32,7 +32,7 @@ def keep_docker_up(request) -> bool:
3232

3333
@pytest.fixture(scope="module")
3434
def docker_swarm(
35-
docker_client: docker.client.DockerClient, keep_docker_up: bool
35+
docker_client: docker.client.DockerClient, keep_docker_up: Iterator[bool]
3636
) -> None:
3737
try:
3838
docker_client.swarm.reload()
@@ -56,7 +56,7 @@ def to_datetime(datetime_str: str) -> datetime:
5656
return datetime.strptime(datetime_str, "%Y-%m-%dT%H:%M:%S.%f")
5757

5858

59-
def by_task_update(task: Dict) -> bool:
59+
def by_task_update(task: Dict) -> datetime:
6060
datetime_str = task["Status"]["Timestamp"]
6161
return to_datetime(datetime_str)
6262

@@ -94,7 +94,7 @@ def docker_stack(
9494
core_docker_compose_file: Path,
9595
ops_docker_compose_file: Path,
9696
keep_docker_up: bool,
97-
) -> Dict:
97+
) -> Iterator[Dict]:
9898
stacks = {"simcore": core_docker_compose_file, "ops": ops_docker_compose_file}
9999

100100
# make up-version

packages/pytest-simcore/src/pytest_simcore/redis_service.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import asyncio
2-
import logging
3-
41
# pylint:disable=unused-variable
52
# pylint:disable=unused-argument
63
# pylint:disable=redefined-outer-name
7-
from typing import Dict
4+
5+
import asyncio
6+
import logging
7+
from typing import Dict, Iterator, Union
88

99
import aioredis
1010
import pytest
@@ -18,7 +18,7 @@
1818

1919

2020
@pytest.fixture(scope="module")
21-
def loop(request) -> asyncio.AbstractEventLoop:
21+
def loop(request) -> Iterator[asyncio.AbstractEventLoop]:
2222
loop = asyncio.get_event_loop_policy().new_event_loop()
2323
yield loop
2424
loop.close()
@@ -27,13 +27,12 @@ def loop(request) -> asyncio.AbstractEventLoop:
2727
@pytest.fixture(scope="module")
2828
async def redis_config(loop, docker_stack: Dict, devel_environ: Dict) -> RedisConfig:
2929
assert "simcore_redis" in docker_stack["services"]
30-
3130
# test runner is running on the host computer
3231
config = RedisConfig(
3332
host="127.0.0.1",
3433
port=get_service_published_port("simcore_redis", devel_environ["REDIS_PORT"]),
3534
)
36-
await wait_till_redis_responsive(config.dsn)
35+
await wait_till_redis_responsive(str(config.dsn))
3736
return config
3837

3938

@@ -45,7 +44,7 @@ async def redis_service(redis_config: RedisConfig, monkeypatch) -> RedisConfig:
4544

4645

4746
@pytest.fixture(scope="module")
48-
async def redis_client(loop, redis_config: RedisConfig) -> aioredis.Redis:
47+
async def redis_client(loop, redis_config: RedisConfig) -> Iterator[aioredis.Redis]:
4948
client = await aioredis.create_redis_pool(redis_config.dsn, encoding="utf-8")
5049

5150
yield client
@@ -56,13 +55,15 @@ async def redis_client(loop, redis_config: RedisConfig) -> aioredis.Redis:
5655

5756

5857
# HELPERS --
58+
59+
5960
@tenacity.retry(
6061
wait=tenacity.wait_fixed(5),
6162
stop=tenacity.stop_after_attempt(60),
6263
before_sleep=tenacity.before_sleep_log(log, logging.INFO),
6364
reraise=True,
6465
)
65-
async def wait_till_redis_responsive(redis_url: URL) -> None:
66+
async def wait_till_redis_responsive(redis_url: Union[URL, str]) -> None:
6667
client = await aioredis.create_redis_pool(str(redis_url), encoding="utf-8")
6768
client.close()
6869
await client.wait_closed()
Lines changed: 99 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,131 +1,136 @@
11
"""
22
Try to acquire a lock on the MPI resource.
33
4-
Due to pour non async implementation aioredlock will be used
4+
Due to pour non async implementation aioredlock will be used.
5+
All configuration is specified upfront
56
6-
How it works:
77
8-
- Try to acquire a lock the lock in a tight loop for about X seconds.
9-
- If it works start a task which updates the expiration every X second is spawned.
10-
- Ensures sleeper can be started as MPI sleeper again.
118
"""
129
import asyncio
13-
import datetime
1410
import logging
15-
from threading import Thread
16-
from typing import Any, Callable, Optional, Tuple
11+
import multiprocessing
12+
import os
1713

18-
from aioredlock import Aioredlock, Lock, LockError
14+
import aioredis
15+
import tenacity
16+
from aioredlock import Aioredlock, LockError
17+
from pydantic.networks import RedisDsn
1918

2019
from . import config
2120

22-
logger = logging.getLogger(__name__)
23-
21+
# ptsvd cause issues with multiprocessing
22+
# SEE: https://github.com/microsoft/ptvsd/issues/1443
23+
if os.environ.get("SC_BOOT_MODE") == "debug-ptvsd": # pragma: no cover
24+
multiprocessing.set_start_method("spawn", True)
2425

25-
async def retry_for_result(
26-
result_validator: Callable[[Any], Any], coroutine_factory: Callable
27-
) -> Tuple[bool, Any]:
28-
"""
29-
Will execute the given callback until the expected result is reached.
30-
Between each retry it will wait 1/5 of REDLOCK_REFRESH_INTERVAL_SECONDS
31-
"""
32-
sleep_interval = config.REDLOCK_REFRESH_INTERVAL_SECONDS / 5.0
33-
elapsed = 0.0
34-
start = datetime.datetime.utcnow()
35-
36-
while elapsed < config.REDLOCK_REFRESH_INTERVAL_SECONDS:
37-
result = await coroutine_factory()
38-
if result_validator(result):
39-
return True, result
40-
await asyncio.sleep(sleep_interval)
41-
elapsed = (datetime.datetime.utcnow() - start).total_seconds()
42-
43-
return False, None
26+
logger = logging.getLogger(__name__)
4427

4528

46-
def start_background_lock_extender(
47-
lock_manager: Aioredlock, lock: Lock, loop: asyncio.BaseEventLoop
29+
async def _wrapped_acquire_and_extend_lock_worker(
30+
reply_queue: multiprocessing.Queue, cpu_count: int
4831
) -> None:
49-
"""Will periodically extend the duration of the lock"""
50-
51-
async def extender_worker(lock_manager: Aioredlock):
52-
sleep_interval = 0.9 * config.REDLOCK_REFRESH_INTERVAL_SECONDS
53-
while True:
54-
await lock_manager.extend(lock, config.REDLOCK_REFRESH_INTERVAL_SECONDS)
55-
56-
await asyncio.sleep(sleep_interval)
57-
58-
loop.run_until_complete(extender_worker(lock_manager))
59-
60-
61-
def thread_worker(
62-
lock_manager: Aioredlock, lock: Lock, loop: asyncio.BaseEventLoop
32+
try:
33+
# if the lock is acquired the above function will block here
34+
await _acquire_and_extend_lock_forever(reply_queue, cpu_count)
35+
finally:
36+
# if the _acquire_and_extend_lock_forever function returns
37+
# the lock was not acquired, need to make sure the acquire_mpi_lock
38+
# always has a result to avoid issues
39+
reply_queue.put(False)
40+
41+
42+
@tenacity.retry(
43+
wait=tenacity.wait_fixed(5),
44+
stop=tenacity.stop_after_attempt(60),
45+
before_sleep=tenacity.before_sleep_log(logger, logging.INFO),
46+
reraise=True,
47+
)
48+
async def wait_till_redis_responsive(dsn: RedisDsn) -> None:
49+
logger.info("Trying to connect to %s", dsn)
50+
client = await aioredis.create_redis_pool(dsn, encoding="utf-8")
51+
client.close()
52+
await client.wait_closed()
53+
54+
55+
# trap lock_error
56+
async def _acquire_and_extend_lock_forever(
57+
reply_queue: multiprocessing.Queue, cpu_count: int
6358
) -> None:
64-
start_background_lock_extender(lock_manager, lock, loop)
59+
await wait_till_redis_responsive(config.CELERY_CONFIG.redis.dsn)
6560

61+
resource_name = f"aioredlock:mpi_lock:{cpu_count}"
62+
endpoint = [
63+
{
64+
"host": config.CELERY_CONFIG.redis.host,
65+
"port": config.CELERY_CONFIG.redis.port,
66+
"db": int(config.CELERY_CONFIG.redis.db),
67+
}
68+
]
69+
70+
logger.info("Will try to acquire an mpi_lock on %s", resource_name)
71+
logger.info("Connecting to %s", endpoint)
72+
lock_manager = Aioredlock(
73+
redis_connections=endpoint,
74+
retry_count=10,
75+
internal_lock_timeout=config.REDLOCK_REFRESH_INTERVAL_SECONDS,
76+
)
6677

67-
async def try_to_acquire_lock(
68-
lock_manager: Aioredlock, resource_name: str
69-
) -> Optional[Tuple[bool, Lock]]:
70-
# Try to acquire the lock:
78+
# Try to acquire the lock, it will retry it 5 times with
79+
# a wait between 0.1 and 0.3 seconds between each try
80+
# if the lock is not acquire a LockError is raised
7181
try:
72-
return await lock_manager.lock(
73-
resource_name, lock_timeout=config.REDLOCK_REFRESH_INTERVAL_SECONDS
74-
)
82+
lock = await lock_manager.lock(resource_name)
7583
except LockError:
76-
pass
77-
78-
return None
84+
logger.warning("Could not acquire lock on resource %s", resource_name)
85+
await lock_manager.destroy()
86+
return
7987

88+
# NOTE: in high concurrency situation you can have
89+
# multiple instances acquire the same lock
90+
# wait a tiny amount and read back the result of the lock acquisition
91+
await asyncio.sleep(0.1)
92+
# reed back result to make sure it was locked
93+
is_locked = await lock_manager.is_locked(resource_name)
8094

81-
async def acquire_lock(cpu_count: int) -> bool:
82-
resource_name = f"aioredlock:mpi_lock:{cpu_count}"
83-
lock_manager = Aioredlock([config.CELERY_CONFIG.redis.dsn])
84-
logger.info("Will try to acquire an mpi_lock")
95+
# the lock was successfully acquired, put the result in the queue
96+
reply_queue.put(is_locked)
8597

86-
def is_locked_factory():
87-
return lock_manager.is_locked(resource_name)
98+
# continue renewing the lock at regular intervals
99+
sleep_interval = 0.5 * config.REDLOCK_REFRESH_INTERVAL_SECONDS
100+
logger.info("Starting lock extention at %s seconds interval", sleep_interval)
88101

89-
is_lock_free, _ = await retry_for_result(
90-
result_validator=lambda x: x is False,
91-
coroutine_factory=is_locked_factory,
92-
)
102+
try:
103+
while True:
104+
try:
105+
await lock_manager.extend(lock)
106+
except LockError:
107+
logger.warning(
108+
"There was an error trying to extend the lock %s", resource_name
109+
)
93110

94-
if not is_lock_free:
95-
# it was not possible to acquire the lock
96-
return False
111+
await asyncio.sleep(sleep_interval)
112+
finally:
113+
# in case some other error occurs recycle all connections to redis
114+
await lock_manager.destroy()
97115

98-
def try_to_acquire_lock_factory():
99-
return try_to_acquire_lock(lock_manager, resource_name)
100116

101-
# lock is free try to acquire and start background extention
102-
managed_to_acquire_lock, lock = await retry_for_result(
103-
result_validator=lambda x: type(x) == Lock,
104-
coroutine_factory=try_to_acquire_lock_factory,
117+
def _process_worker(queue: multiprocessing.Queue, cpu_count: int) -> None:
118+
logger.info("Starting background process for mpi lock result")
119+
asyncio.get_event_loop().run_until_complete(
120+
_wrapped_acquire_and_extend_lock_worker(queue, cpu_count)
105121
)
106-
107-
if managed_to_acquire_lock:
108-
Thread(
109-
target=thread_worker,
110-
args=(
111-
lock_manager,
112-
lock,
113-
asyncio.get_event_loop(),
114-
),
115-
daemon=True,
116-
).start()
117-
118-
logger.info("mpi_lock acquisition result %s", managed_to_acquire_lock)
119-
return managed_to_acquire_lock
122+
logger.info("Background asyncio task finished. Background process will despawn.")
120123

121124

122125
def acquire_mpi_lock(cpu_count: int) -> bool:
123126
"""
124127
returns True if successfull
125128
Will try to acquire a distributed shared lock.
126-
This operation will last up to 2 x config.REDLOCK_REFRESH_INTERVAL_SECONDS
127129
"""
128-
from .utils import wrap_async_call
130+
reply_queue = multiprocessing.Queue()
131+
multiprocessing.Process(
132+
target=_process_worker, args=(reply_queue, cpu_count), daemon=True
133+
).start()
129134

130-
was_acquired = wrap_async_call(acquire_lock(cpu_count))
131-
return was_acquired
135+
lock_acquired = reply_queue.get()
136+
return lock_acquired

services/sidecar/tests/integration/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
"pytest_simcore.rabbit_service",
1919
"pytest_simcore.postgres_service",
2020
"pytest_simcore.minio_service",
21+
"pytest_simcore.simcore_services",
22+
"pytest_simcore.redis_service",
2123
"pytest_simcore.simcore_storage_service",
2224
]
2325
log = logging.getLogger(__name__)

0 commit comments

Comments
 (0)