Skip to content

comp_task requests are sequentially committed for each node in the project #2043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 11, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ....models.schemas.services import NodeRequirement, ServiceExtras
from ....utils.computations import to_node_class
from ....utils.logging_utils import log_decorator
from ....utils.async_utils import run_sequentially_in_context
from ...director_v0 import DirectorV0Client
from ..tables import NodeClass, StateType, comp_tasks
from ._base import BaseRepository
Expand Down Expand Up @@ -73,19 +74,23 @@ async def get_comp_tasks(

return tasks

@log_decorator(logger=logger)
async def upsert_tasks_from_project(
self, project: ProjectAtDB, director_client: DirectorV0Client, publish: bool
# pylint: disable=unused-argument
@run_sequentially_in_context(target_args=["str_project_uuid"])
async def _sequentially_upsert_tasks_from_project(
self,
project: ProjectAtDB,
director_client: DirectorV0Client,
publish: bool,
str_project_uuid: str,
) -> None:
# start by removing the old tasks if they exist
await self.connection.execute(
sa.delete(comp_tasks).where(comp_tasks.c.project_id == str(project.uuid))
)
# create the tasks
workbench = project.workbench
internal_id = 1
for node_id in workbench:
node: Node = workbench[node_id]

for internal_id, node_id in enumerate(project.workbench, 1):
node: Node = project.workbench[node_id]

service_key_version = ServiceKeyVersion(
key=node.key,
Expand Down Expand Up @@ -129,20 +134,42 @@ async def upsert_tasks_from_project(
outputs=node.outputs,
image=image,
submit=datetime.utcnow(),
state=comp_state
if node_class == NodeClass.COMPUTATIONAL
else RunningState.NOT_STARTED,
state=(
comp_state
if node_class == NodeClass.COMPUTATIONAL
else RunningState.NOT_STARTED
),
internal_id=internal_id,
node_class=node_class,
)
internal_id = internal_id + 1

await self.connection.execute(
insert(comp_tasks).values(
**task_db.dict(by_alias=True, exclude_unset=True)
)
)

@log_decorator(logger=logger)
async def upsert_tasks_from_project(
self, project: ProjectAtDB, director_client: DirectorV0Client, publish: bool
) -> None:

# only used by the decorator on the "_sequentially_upsert_tasks_from_project"
str_project_uuid: str = str(project.uuid)

# It is guaranteed that in the context of this application no 2 updates
# will run in parallel.
#
# If we need to scale this service or the same comp_task entry is used in
# a different service an implementation of "therun_sequentially_in_context"
# based on redis queues needs to be put in place.
await self._sequentially_upsert_tasks_from_project(
project=project,
director_client=director_client,
publish=publish,
str_project_uuid=str_project_uuid,
)

@log_decorator(logger=logger)
async def mark_project_tasks_as_aborted(self, project: ProjectAtDB) -> None:
# block all pending tasks, so the sidecars stop taking them
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import asyncio
from functools import wraps
from collections import deque
import attr
from typing import List, Dict


@attr.s(auto_attribs=True)
class Context:
in_queue: asyncio.Queue
out_queue: asyncio.Queue
initialized: bool


def run_sequentially_in_context(target_args: List[str] = None):
"""All request to function with same calling context will be run sequentially.

Example:

Given the following decorated function

@run_sequentially_in_context(target_args=["param3", "param1"])
async def func(param1, param2, param3):
await asyncio.sleep(1)

The context will be formed by the values of the arguments "param3" and "param1".
The values must be serializable as they will be converted to string
and put together as storage key for the context.

The below calls will all run in a sequence:

functions = [
func(1, "something", 3),
func(1, "else", 3),
func(1, "here", 3),
]
await asyncio.gather(*functions)

The following calls will run in parallel, because they have different contexts:

functions = [
func(1, "something", 3),
func(2, "else", 3),
func(3, "here", 3),
]
await asyncio.gather(*functions)

"""
target_args = [] if target_args is None else target_args

def internal(decorated_function):
contexts = {}

def get_context(args, kwargs: Dict) -> Context:
arg_names = decorated_function.__code__.co_varnames[
: decorated_function.__code__.co_argcount
]
search_args = dict(zip(arg_names, args))
search_args.update(kwargs)

key_parts = deque()
for arg in target_args:
if arg not in search_args:
message = (
f"Expected '{arg}' in '{decorated_function.__name__}'"
f" arguments. Got '{search_args}'"
)
raise ValueError(message)
key_parts.append(search_args[arg])

key = ":".join(map(str, key_parts))

if key not in contexts:
contexts[key] = Context(
in_queue=asyncio.Queue(),
out_queue=asyncio.Queue(),
initialized=False,
)

return contexts[key]

@wraps(decorated_function)
async def wrapper(*args, **kwargs):
context: Context = get_context(args, kwargs)

if not context.initialized:
context.initialized = True

async def worker(in_q: asyncio.Queue, out_q: asyncio.Queue):
while True:
awaitable = await in_q.get()
in_q.task_done()
try:
result = await awaitable
except Exception as e: # pylint: disable=broad-except
result = e
await out_q.put(result)

asyncio.get_event_loop().create_task(
worker(context.in_queue, context.out_queue)
)

await context.in_queue.put(decorated_function(*args, **kwargs))

wrapped_result = await context.out_queue.get()
if isinstance(wrapped_result, Exception):
raise wrapped_result

return wrapped_result

return wrapper

return internal