Skip to content

Commit 60c6035

Browse files
authored
Bugfix/comp services see all inputs (#1734)
Sidecar mounts subfolder in computational service * fix issue #1723 * fix issue #1473 * diverse fixes and type annotations
2 parents e7e232f + 0b62bdf commit 60c6035

File tree

14 files changed

+207
-100
lines changed

14 files changed

+207
-100
lines changed

services/sidecar/requirements/_base.in

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
urllib3>=1.25.8 # Vulnerability
6+
aiofile
67
aio-pika
78
aiodocker
89
aiofiles
@@ -16,4 +17,4 @@ networkx
1617
packaging
1718
pydantic
1819
tenacity
19-
aioredlock
20+
aioredlock

services/sidecar/requirements/_base.txt

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#
77
aio-pika==6.6.1 # via -r requirements/_base.in
88
aiodocker==0.19.1 # via -r requirements/_base.in
9+
aiofile==3.0.0 # via -r requirements/_base.in
910
aiofiles==0.5.0 # via -r requirements/_base.in
1011
aiohttp==3.6.2 # via aiodocker
1112
aiopg==1.0.0 # via -r requirements/_base.in
@@ -16,6 +17,7 @@ amqp==2.6.1 # via kombu
1617
async-timeout==3.0.1 # via aiohttp, aioredis
1718
attrs==19.3.0 # via aiohttp, aioredlock
1819
billiard==3.6.3.0 # via celery
20+
caio==0.5.3 # via aiofile
1921
celery==4.4.7 # via -r requirements/_base.in
2022
chardet==3.0.4 # via aiohttp
2123
click==7.1.2 # via -r requirements/_base.in

services/sidecar/requirements/_test.txt

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#
77
aio-pika==6.6.1 # via -r requirements/_base.txt
88
aiodocker==0.19.1 # via -r requirements/_base.txt
9+
aiofile==3.0.0 # via -r requirements/_base.txt
910
aiofiles==0.5.0 # via -r requirements/_base.txt
1011
aiohttp==3.6.2 # via -r requirements/_base.txt, aiodocker, pytest-aiohttp
1112
aiopg==1.0.0 # via -r requirements/_base.txt, -r requirements/_test.in
@@ -17,6 +18,7 @@ astroid==2.4.2 # via pylint
1718
async-timeout==3.0.1 # via -r requirements/_base.txt, aiohttp, aioredis
1819
attrs==19.3.0 # via -r requirements/_base.txt, aiohttp, aioredlock, pytest
1920
billiard==3.6.3.0 # via -r requirements/_base.txt, celery
21+
caio==0.5.3 # via -r requirements/_base.txt, aiofile
2022
celery==4.4.7 # via -r requirements/_base.txt
2123
certifi==2020.6.20 # via requests
2224
chardet==3.0.4 # via -r requirements/_base.txt, aiohttp, requests

services/sidecar/src/simcore_service_sidecar/boot_mode.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# pylint: disable=global-statement
22

33
from enum import Enum
4+
from typing import Optional
45

56

67
class BootMode(Enum):
@@ -9,10 +10,10 @@ class BootMode(Enum):
910
MPI = "MPI"
1011

1112

12-
_sidecar_boot_mode: BootMode = None
13+
_sidecar_boot_mode: Optional[BootMode] = None
1314

1415

15-
def get_boot_mode() -> BootMode:
16+
def get_boot_mode() -> Optional[BootMode]:
1617
global _sidecar_boot_mode
1718
return _sidecar_boot_mode
1819

services/sidecar/src/simcore_service_sidecar/celery_configurator.py

+19-10
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@
77
use a look ahead function to check the type of upcoming task and
88
schedule it accordingly.
99
"""
10-
from typing import Tuple
10+
from typing import Optional, Tuple
11+
1112
from celery import Celery, states
1213
from simcore_sdk.config.rabbit import Config as RabbitConfig
14+
1315
from . import config
14-
from .cli import run_sidecar
15-
from .utils import wrap_async_call, is_gpu_node, start_as_mpi_node
16+
from .boot_mode import BootMode, get_boot_mode, set_boot_mode
1617
from .celery_log_setup import get_task_logger
17-
from .utils import assemble_celery_app
18+
from .cli import run_sidecar
1819
from .core import task_required_resources
19-
from .boot_mode import BootMode, set_boot_mode, get_boot_mode
20+
from .utils import assemble_celery_app, is_gpu_node, start_as_mpi_node, wrap_async_call
2021

2122
log = get_task_logger(__name__)
2223

@@ -71,7 +72,7 @@ def _dispatch_to_mpi_queue(user_id: str, project_id: str, node_id: str) -> None:
7172

7273

7374
def shared_task_dispatch(
74-
celery_request, user_id: str, project_id: str, node_id: str = None
75+
celery_request, user_id: str, project_id: str, node_id: Optional[str] = None
7576
) -> None:
7677
"""This is the original task which is run by either MPI, GPU or CPU node"""
7778
try:
@@ -106,11 +107,15 @@ def configure_cpu_mode() -> Tuple[RabbitConfig, Celery]:
106107

107108
# pylint: disable=unused-variable,unused-argument
108109
@app.task(name="comp.task", bind=True, ignore_result=True)
109-
def entrypoint(self, user_id: str, project_id: str, node_id: str = None) -> None:
110+
def entrypoint(
111+
self, user_id: str, project_id: str, node_id: Optional[str] = None
112+
) -> None:
110113
shared_task_dispatch(self, user_id, project_id, node_id)
111114

112115
@app.task(name="comp.task.cpu", bind=True)
113-
def pipeline(self, user_id: str, project_id: str, node_id: str = None) -> None:
116+
def pipeline(
117+
self, user_id: str, project_id: str, node_id: Optional[str] = None
118+
) -> None:
114119
shared_task_dispatch(self, user_id, project_id, node_id)
115120

116121
set_boot_mode(BootMode.CPU)
@@ -125,7 +130,9 @@ def configure_gpu_mode() -> Tuple[RabbitConfig, Celery]:
125130

126131
# pylint: disable=unused-variable
127132
@app.task(name="comp.task.gpu", bind=True)
128-
def pipeline(self, user_id: str, project_id: str, node_id: str = None) -> None:
133+
def pipeline(
134+
self, user_id: str, project_id: str, node_id: Optional[str] = None
135+
) -> None:
129136
shared_task_dispatch(self, user_id, project_id, node_id)
130137

131138
set_boot_mode(BootMode.GPU)
@@ -140,7 +147,9 @@ def configure_mpi_node() -> Tuple[RabbitConfig, Celery]:
140147

141148
# pylint: disable=unused-variable
142149
@app.task(name="comp.task.mpi", bind=True)
143-
def pipeline(self, user_id: str, project_id: str, node_id: str = None) -> None:
150+
def pipeline(
151+
self, user_id: str, project_id: str, node_id: Optional[str] = None
152+
) -> None:
144153
shared_task_dispatch(self, user_id, project_id, node_id)
145154

146155
set_boot_mode(BootMode.MPI)

services/sidecar/src/simcore_service_sidecar/cli.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import List, Tuple
2+
from typing import List, Optional, Tuple
33

44
import click
55

@@ -17,7 +17,9 @@
1717
@click.option("--user_id", default=0, type=int, help="The user ID")
1818
@click.option("--project_id", default="0", help="The project ID")
1919
@click.option("--node_id", default=None, help="The node ID or nothing")
20-
def main(job_id: str, user_id: str, project_id: str, node_id: str) -> List[str]:
20+
def main(
21+
job_id: str, user_id: str, project_id: str, node_id: str
22+
) -> Optional[List[str]]:
2123

2224
log.info(
2325
"STARTING task processing for user %s, project %s, node %s",
@@ -41,12 +43,12 @@ def main(job_id: str, user_id: str, project_id: str, node_id: str) -> List[str]:
4143

4244

4345
async def run_sidecar(
44-
job_id: str, user_id: str, project_id: str, node_id: str
45-
) -> Tuple[List[str], str]:
46+
job_id: str, user_id: str, project_id: str, node_id: Optional[str]
47+
) -> Tuple[Optional[List[str]], Optional[str]]:
4648
try:
4749
async with DBContextManager() as db_engine:
4850
async with RabbitMQ(config=RABBIT_CONFIG) as rabbit_mq:
49-
next_task_nodes = await inspect(
51+
next_task_nodes: Optional[List[str]] = await inspect(
5052
db_engine, rabbit_mq, job_id, user_id, project_id, node_id=node_id
5153
)
5254
log.info(

services/sidecar/src/simcore_service_sidecar/config.py

+15-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import logging
22
import multiprocessing
33
import os
4+
from pathlib import Path
5+
from typing import Optional
6+
47
from simcore_sdk.config.rabbit import Config as RabbitConfig
58

69
SERVICES_MAX_NANO_CPUS: int = min(
@@ -15,6 +18,16 @@
1518
)
1619
SWARM_STACK_NAME: str = os.environ.get("SWARM_STACK_NAME", "simcore")
1720

21+
SIDECAR_INPUT_FOLDER: Path = Path(
22+
os.environ.get("SIDECAR_INPUT_FOLDER", Path.home() / "input")
23+
)
24+
SIDECAR_OUTPUT_FOLDER: Path = Path(
25+
os.environ.get("SIDECAR_OUTPUT_FOLDER", Path.home() / "output")
26+
)
27+
SIDECAR_LOG_FOLDER: Path = Path(
28+
os.environ.get("SIDECAR_LOG_FOLDER", Path.home() / "log")
29+
)
30+
1831
SIDECAR_DOCKER_VOLUME_INPUT: str = os.environ.get(
1932
"SIDECAR_DOCKER_VOLUME_INPUT", f"{SWARM_STACK_NAME}_input"
2033
)
@@ -42,10 +55,9 @@
4255
logging.getLogger("sqlalchemy.pool").setLevel(SIDECAR_LOGLEVEL)
4356

4457
RABBIT_CONFIG = RabbitConfig()
45-
4658
# sidecar celery starting mode overwrite
47-
FORCE_START_CPU_MODE: str = os.environ.get("START_AS_MODE_CPU")
48-
FORCE_START_GPU_MODE: str = os.environ.get("START_AS_MODE_GPU")
59+
FORCE_START_CPU_MODE: Optional[str] = os.environ.get("START_AS_MODE_CPU")
60+
FORCE_START_GPU_MODE: Optional[str] = os.environ.get("START_AS_MODE_GPU")
4961

5062
# if a node has this amount of CPUs it will be a candidate an MPI candidate
5163
TARGET_MPI_NODE_CPU_COUNT: int = int(os.environ.get("TARGET_MPI_NODE_CPU_COUNT", "-1"))
@@ -59,4 +71,3 @@
5971
REDLOCK_REFRESH_INTERVAL_SECONDS: float = max(
6072
float(os.environ.get("REDLOCK_REFRESH_INTERVAL_SECONDS", "5.0")), 1.0
6173
) # enforce at least 1 second
62-

services/sidecar/src/simcore_service_sidecar/core.py

+19-21
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
from datetime import datetime
2-
from typing import List, Optional, Union, Dict
31
import traceback
2+
from datetime import datetime
3+
from typing import Dict, List, Optional, Union
44

55
import aiodocker
6-
import aiopg
76
import networkx as nx
8-
from celery.utils.log import get_task_logger
7+
from aiopg.sa import Engine, SAConnection
8+
from aiopg.sa.result import RowProxy
99
from sqlalchemy import and_, literal_column
1010

11+
from celery.utils.log import get_task_logger
1112
from simcore_postgres_database.sidecar_models import ( # PENDING,
1213
FAILED,
1314
RUNNING,
@@ -20,10 +21,10 @@
2021
from simcore_sdk.node_ports import log as node_port_log
2122

2223
from . import config, exceptions
24+
from .db import DBContextManager
2325
from .executor import Executor
2426
from .rabbitmq import RabbitMQ
2527
from .utils import execution_graph, find_entry_point, is_node_ready
26-
from .db import DBContextManager
2728

2829
log = get_task_logger(__name__)
2930
log.setLevel(config.SIDECAR_LOGLEVEL)
@@ -61,13 +62,12 @@ async def task_required_resources(node_id: str) -> Union[Dict[str, bool], None]:
6162

6263

6364
async def _try_get_task_from_db(
64-
db_connection: aiopg.sa.SAConnection,
65+
db_connection: SAConnection,
6566
graph: nx.DiGraph,
66-
job_request_id: int,
67+
job_request_id: str,
6768
project_id: str,
6869
node_id: str,
69-
) -> Optional[aiopg.sa.result.RowProxy]:
70-
task: aiopg.sa.result.RowProxy = None
70+
) -> Optional[RowProxy]:
7171
# Use SELECT FOR UPDATE TO lock the row
7272
result = await db_connection.execute(
7373
query=comp_tasks.select(for_update=True).where(
@@ -79,7 +79,7 @@ async def _try_get_task_from_db(
7979
)
8080
)
8181
)
82-
task = await result.fetchone()
82+
task: RowProxy = await result.fetchone()
8383

8484
if not task:
8585
log.debug("No task found")
@@ -114,9 +114,8 @@ async def _try_get_task_from_db(
114114

115115

116116
async def _get_pipeline_from_db(
117-
db_connection: aiopg.sa.SAConnection, project_id: str,
118-
) -> aiopg.sa.result.RowProxy:
119-
pipeline: aiopg.sa.result.RowProxy = None
117+
db_connection: SAConnection, project_id: str,
118+
) -> RowProxy:
120119
# get the pipeline
121120
result = await db_connection.execute(
122121
comp_pipeline.select().where(comp_pipeline.c.project_id == project_id)
@@ -126,7 +125,7 @@ async def _get_pipeline_from_db(
126125
f"Pipeline {result.rowcount} found instead of only one for project_id {project_id}"
127126
)
128127

129-
pipeline = await result.first()
128+
pipeline: RowProxy = await result.first()
130129
if not pipeline:
131130
raise exceptions.DatabaseError(f"Pipeline {project_id} not found")
132131
log.debug("found pipeline %s", pipeline)
@@ -135,12 +134,12 @@ async def _get_pipeline_from_db(
135134

136135
async def inspect(
137136
# pylint: disable=too-many-arguments
138-
db_engine: aiopg.sa.Engine,
137+
db_engine: Engine,
139138
rabbit_mq: RabbitMQ,
140-
job_request_id: int,
139+
job_request_id: str,
141140
user_id: str,
142141
project_id: str,
143-
node_id: str,
142+
node_id: Optional[str],
144143
) -> Optional[List[str]]:
145144
log.debug(
146145
"ENTERING inspect with user %s pipeline:node %s: %s",
@@ -149,11 +148,10 @@ async def inspect(
149148
node_id,
150149
)
151150

152-
pipeline: aiopg.sa.result.RowProxy = None
153-
task: aiopg.sa.result.RowProxy = None
154-
graph: nx.DiGraph = None
151+
task: Optional[RowProxy] = None
152+
graph: Optional[nx.DiGraph] = None
155153
async with db_engine.acquire() as connection:
156-
pipeline = await _get_pipeline_from_db(connection, project_id)
154+
pipeline: RowProxy = await _get_pipeline_from_db(connection, project_id)
157155
graph = execution_graph(pipeline)
158156
if not node_id:
159157
log.debug("NODE id was zero, this was the entry node id")

0 commit comments

Comments
 (0)