Skip to content

Commit ccefa73

Browse files
committed
ensure exclusive task does not raise
1 parent f3c64d9 commit ccefa73

File tree

2 files changed

+25
-10
lines changed

2 files changed

+25
-10
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313

1414
import asyncio
15+
import contextlib
1516
import datetime
1617
import functools
1718
import logging
@@ -34,7 +35,7 @@
3435
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
3536
from servicelib.logging_utils import log_context
3637
from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient
37-
from servicelib.redis import RedisClientSDK
38+
from servicelib.redis import CouldNotAcquireLockError, RedisClientSDK
3839
from servicelib.redis_utils import exclusive
3940

4041
from ...constants import UNDEFINED_STR_METADATA
@@ -281,16 +282,29 @@ def _start_scheduling(
281282
project_id: ProjectID,
282283
iteration: Iteration,
283284
) -> None:
284-
# create a new schedule task
285-
p = functools.partial(
286-
self._schedule_pipeline,
287-
user_id=user_id,
288-
project_id=project_id,
289-
iteration=iteration,
290-
pipeline_params=pipeline_params,
291-
)
285+
async def _exclusive_safe_schedule_pipeline(
286+
*,
287+
user_id: UserID,
288+
project_id: ProjectID,
289+
iteration: Iteration,
290+
pipeline_params: ScheduledPipelineParams,
291+
) -> None:
292+
with contextlib.suppress(CouldNotAcquireLockError):
293+
await self._schedule_pipeline(
294+
user_id=user_id,
295+
project_id=project_id,
296+
iteration=iteration,
297+
pipeline_params=pipeline_params,
298+
)
299+
292300
pipeline_params.scheduler_task = start_periodic_task(
293-
p,
301+
functools.partial(
302+
_exclusive_safe_schedule_pipeline,
303+
user_id=user_id,
304+
project_id=project_id,
305+
iteration=iteration,
306+
pipeline_params=pipeline_params,
307+
),
294308
interval=_SCHEDULER_INTERVAL,
295309
task_name=_TASK_NAME_TEMPLATE.format(
296310
user_id=user_id, project_id=project_id, iteration=iteration

services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ async def schedule_all_pipelines(scheduler: BaseCompScheduler) -> None:
190190
total_results_count = len(results)
191191

192192
# Check if 2/3 of the results are CouldNotAcquireLockError
193+
# checks that scheduling is done exclusively
193194
assert could_not_acquire_lock_count == (2 / 3) * total_results_count
194195

195196

0 commit comments

Comments
 (0)