Skip to content

Commit 8bcfa6b

Browse files
GitHKAndrei Neagu
and
Andrei Neagu
authored
comp_task requests are sequentially committed for each node in the project (#2043)
* implementing upsert pattern * was inserting null keys * refactoring * reverging to previous implementation * added a copy of the run_sequentially_in_context * refactored to work with new decorator * refactoring and getting codeclimente unstuck * moving the deletin of the nodes in the same sequential context * add pice lost in merge * adding whitespace back Co-authored-by: Andrei Neagu <[email protected]>
1 parent 78b2e8c commit 8bcfa6b

File tree

2 files changed

+151
-11
lines changed

2 files changed

+151
-11
lines changed

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks.py

+38-11
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from ....models.schemas.services import NodeRequirement, ServiceExtras
2020
from ....utils.computations import to_node_class
2121
from ....utils.logging_utils import log_decorator
22+
from ....utils.async_utils import run_sequentially_in_context
2223
from ...director_v0 import DirectorV0Client
2324
from ..tables import NodeClass, StateType, comp_tasks
2425
from ._base import BaseRepository
@@ -73,19 +74,23 @@ async def get_comp_tasks(
7374

7475
return tasks
7576

76-
@log_decorator(logger=logger)
77-
async def upsert_tasks_from_project(
78-
self, project: ProjectAtDB, director_client: DirectorV0Client, publish: bool
77+
# pylint: disable=unused-argument
78+
@run_sequentially_in_context(target_args=["str_project_uuid"])
79+
async def _sequentially_upsert_tasks_from_project(
80+
self,
81+
project: ProjectAtDB,
82+
director_client: DirectorV0Client,
83+
publish: bool,
84+
str_project_uuid: str,
7985
) -> None:
8086
# start by removing the old tasks if they exist
8187
await self.connection.execute(
8288
sa.delete(comp_tasks).where(comp_tasks.c.project_id == str(project.uuid))
8389
)
8490
# create the tasks
85-
workbench = project.workbench
86-
internal_id = 1
87-
for node_id in workbench:
88-
node: Node = workbench[node_id]
91+
92+
for internal_id, node_id in enumerate(project.workbench, 1):
93+
node: Node = project.workbench[node_id]
8994

9095
service_key_version = ServiceKeyVersion(
9196
key=node.key,
@@ -129,20 +134,42 @@ async def upsert_tasks_from_project(
129134
outputs=node.outputs,
130135
image=image,
131136
submit=datetime.utcnow(),
132-
state=comp_state
133-
if node_class == NodeClass.COMPUTATIONAL
134-
else RunningState.NOT_STARTED,
137+
state=(
138+
comp_state
139+
if node_class == NodeClass.COMPUTATIONAL
140+
else RunningState.NOT_STARTED
141+
),
135142
internal_id=internal_id,
136143
node_class=node_class,
137144
)
138-
internal_id = internal_id + 1
139145

140146
await self.connection.execute(
141147
insert(comp_tasks).values(
142148
**task_db.dict(by_alias=True, exclude_unset=True)
143149
)
144150
)
145151

152+
@log_decorator(logger=logger)
153+
async def upsert_tasks_from_project(
154+
self, project: ProjectAtDB, director_client: DirectorV0Client, publish: bool
155+
) -> None:
156+
157+
# only used by the decorator on the "_sequentially_upsert_tasks_from_project"
158+
str_project_uuid: str = str(project.uuid)
159+
160+
# It is guaranteed that in the context of this application no 2 updates
161+
# will run in parallel.
162+
#
163+
# If we need to scale this service or the same comp_task entry is used in
164+
# a different service an implementation of "therun_sequentially_in_context"
165+
# based on redis queues needs to be put in place.
166+
await self._sequentially_upsert_tasks_from_project(
167+
project=project,
168+
director_client=director_client,
169+
publish=publish,
170+
str_project_uuid=str_project_uuid,
171+
)
172+
146173
@log_decorator(logger=logger)
147174
async def mark_project_tasks_as_aborted(self, project: ProjectAtDB) -> None:
148175
# block all pending tasks, so the sidecars stop taking them
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import asyncio
2+
from functools import wraps
3+
from collections import deque
4+
import attr
5+
from typing import List, Dict
6+
7+
8+
@attr.s(auto_attribs=True)
9+
class Context:
10+
in_queue: asyncio.Queue
11+
out_queue: asyncio.Queue
12+
initialized: bool
13+
14+
15+
def run_sequentially_in_context(target_args: List[str] = None):
16+
"""All request to function with same calling context will be run sequentially.
17+
18+
Example:
19+
20+
Given the following decorated function
21+
22+
@run_sequentially_in_context(target_args=["param3", "param1"])
23+
async def func(param1, param2, param3):
24+
await asyncio.sleep(1)
25+
26+
The context will be formed by the values of the arguments "param3" and "param1".
27+
The values must be serializable as they will be converted to string
28+
and put together as storage key for the context.
29+
30+
The below calls will all run in a sequence:
31+
32+
functions = [
33+
func(1, "something", 3),
34+
func(1, "else", 3),
35+
func(1, "here", 3),
36+
]
37+
await asyncio.gather(*functions)
38+
39+
The following calls will run in parallel, because they have different contexts:
40+
41+
functions = [
42+
func(1, "something", 3),
43+
func(2, "else", 3),
44+
func(3, "here", 3),
45+
]
46+
await asyncio.gather(*functions)
47+
48+
"""
49+
target_args = [] if target_args is None else target_args
50+
51+
def internal(decorated_function):
52+
contexts = {}
53+
54+
def get_context(args, kwargs: Dict) -> Context:
55+
arg_names = decorated_function.__code__.co_varnames[
56+
: decorated_function.__code__.co_argcount
57+
]
58+
search_args = dict(zip(arg_names, args))
59+
search_args.update(kwargs)
60+
61+
key_parts = deque()
62+
for arg in target_args:
63+
if arg not in search_args:
64+
message = (
65+
f"Expected '{arg}' in '{decorated_function.__name__}'"
66+
f" arguments. Got '{search_args}'"
67+
)
68+
raise ValueError(message)
69+
key_parts.append(search_args[arg])
70+
71+
key = ":".join(map(str, key_parts))
72+
73+
if key not in contexts:
74+
contexts[key] = Context(
75+
in_queue=asyncio.Queue(),
76+
out_queue=asyncio.Queue(),
77+
initialized=False,
78+
)
79+
80+
return contexts[key]
81+
82+
@wraps(decorated_function)
83+
async def wrapper(*args, **kwargs):
84+
context: Context = get_context(args, kwargs)
85+
86+
if not context.initialized:
87+
context.initialized = True
88+
89+
async def worker(in_q: asyncio.Queue, out_q: asyncio.Queue):
90+
while True:
91+
awaitable = await in_q.get()
92+
in_q.task_done()
93+
try:
94+
result = await awaitable
95+
except Exception as e: # pylint: disable=broad-except
96+
result = e
97+
await out_q.put(result)
98+
99+
asyncio.get_event_loop().create_task(
100+
worker(context.in_queue, context.out_queue)
101+
)
102+
103+
await context.in_queue.put(decorated_function(*args, **kwargs))
104+
105+
wrapped_result = await context.out_queue.get()
106+
if isinstance(wrapped_result, Exception):
107+
raise wrapped_result
108+
109+
return wrapped_result
110+
111+
return wrapper
112+
113+
return internal

0 commit comments

Comments
 (0)