Skip to content

Bugfix/comp services see all inputs #1734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion services/sidecar/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

urllib3>=1.25.8 # Vulnerability
aiofile
aio-pika
aiodocker
aiofiles
Expand All @@ -16,4 +17,4 @@ networkx
packaging
pydantic
tenacity
aioredlock
aioredlock
2 changes: 2 additions & 0 deletions services/sidecar/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#
aio-pika==6.6.1 # via -r requirements/_base.in
aiodocker==0.19.1 # via -r requirements/_base.in
aiofile==3.0.0 # via -r requirements/_base.in
aiofiles==0.5.0 # via -r requirements/_base.in
aiohttp==3.6.2 # via aiodocker
aiopg==1.0.0 # via -r requirements/_base.in
Expand All @@ -16,6 +17,7 @@ amqp==2.6.1 # via kombu
async-timeout==3.0.1 # via aiohttp, aioredis
attrs==19.3.0 # via aiohttp, aioredlock
billiard==3.6.3.0 # via celery
caio==0.5.3 # via aiofile
celery==4.4.7 # via -r requirements/_base.in
chardet==3.0.4 # via aiohttp
click==7.1.2 # via -r requirements/_base.in
Expand Down
2 changes: 2 additions & 0 deletions services/sidecar/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#
aio-pika==6.6.1 # via -r requirements/_base.txt
aiodocker==0.19.1 # via -r requirements/_base.txt
aiofile==3.0.0 # via -r requirements/_base.txt
aiofiles==0.5.0 # via -r requirements/_base.txt
aiohttp==3.6.2 # via -r requirements/_base.txt, aiodocker, pytest-aiohttp
aiopg==1.0.0 # via -r requirements/_base.txt, -r requirements/_test.in
Expand All @@ -17,6 +18,7 @@ astroid==2.4.2 # via pylint
async-timeout==3.0.1 # via -r requirements/_base.txt, aiohttp, aioredis
attrs==19.3.0 # via -r requirements/_base.txt, aiohttp, aioredlock, pytest
billiard==3.6.3.0 # via -r requirements/_base.txt, celery
caio==0.5.3 # via -r requirements/_base.txt, aiofile
celery==4.4.7 # via -r requirements/_base.txt
certifi==2020.6.20 # via requests
chardet==3.0.4 # via -r requirements/_base.txt, aiohttp, requests
Expand Down
5 changes: 3 additions & 2 deletions services/sidecar/src/simcore_service_sidecar/boot_mode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint: disable=global-statement

from enum import Enum
from typing import Optional


class BootMode(Enum):
Expand All @@ -9,10 +10,10 @@ class BootMode(Enum):
MPI = "MPI"


_sidecar_boot_mode: BootMode = None
_sidecar_boot_mode: Optional[BootMode] = None


def get_boot_mode() -> BootMode:
def get_boot_mode() -> Optional[BootMode]:
global _sidecar_boot_mode
return _sidecar_boot_mode

Expand Down
29 changes: 19 additions & 10 deletions services/sidecar/src/simcore_service_sidecar/celery_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
use a look ahead function to check the type of upcoming task and
schedule it accordingly.
"""
from typing import Tuple
from typing import Optional, Tuple

from celery import Celery, states
from simcore_sdk.config.rabbit import Config as RabbitConfig

from . import config
from .cli import run_sidecar
from .utils import wrap_async_call, is_gpu_node, start_as_mpi_node
from .boot_mode import BootMode, get_boot_mode, set_boot_mode
from .celery_log_setup import get_task_logger
from .utils import assemble_celery_app
from .cli import run_sidecar
from .core import task_required_resources
from .boot_mode import BootMode, set_boot_mode, get_boot_mode
from .utils import assemble_celery_app, is_gpu_node, start_as_mpi_node, wrap_async_call

log = get_task_logger(__name__)

Expand Down Expand Up @@ -71,7 +72,7 @@ def _dispatch_to_mpi_queue(user_id: str, project_id: str, node_id: str) -> None:


def shared_task_dispatch(
celery_request, user_id: str, project_id: str, node_id: str = None
celery_request, user_id: str, project_id: str, node_id: Optional[str] = None
) -> None:
"""This is the original task which is run by either MPI, GPU or CPU node"""
try:
Expand Down Expand Up @@ -106,11 +107,15 @@ def configure_cpu_mode() -> Tuple[RabbitConfig, Celery]:

# pylint: disable=unused-variable,unused-argument
@app.task(name="comp.task", bind=True, ignore_result=True)
def entrypoint(self, user_id: str, project_id: str, node_id: str = None) -> None:
def entrypoint(
self, user_id: str, project_id: str, node_id: Optional[str] = None
) -> None:
shared_task_dispatch(self, user_id, project_id, node_id)

@app.task(name="comp.task.cpu", bind=True)
def pipeline(self, user_id: str, project_id: str, node_id: str = None) -> None:
def pipeline(
self, user_id: str, project_id: str, node_id: Optional[str] = None
) -> None:
shared_task_dispatch(self, user_id, project_id, node_id)

set_boot_mode(BootMode.CPU)
Expand All @@ -125,7 +130,9 @@ def configure_gpu_mode() -> Tuple[RabbitConfig, Celery]:

# pylint: disable=unused-variable
@app.task(name="comp.task.gpu", bind=True)
def pipeline(self, user_id: str, project_id: str, node_id: str = None) -> None:
def pipeline(
self, user_id: str, project_id: str, node_id: Optional[str] = None
) -> None:
shared_task_dispatch(self, user_id, project_id, node_id)

set_boot_mode(BootMode.GPU)
Expand All @@ -140,7 +147,9 @@ def configure_mpi_node() -> Tuple[RabbitConfig, Celery]:

# pylint: disable=unused-variable
@app.task(name="comp.task.mpi", bind=True)
def pipeline(self, user_id: str, project_id: str, node_id: str = None) -> None:
def pipeline(
self, user_id: str, project_id: str, node_id: Optional[str] = None
) -> None:
shared_task_dispatch(self, user_id, project_id, node_id)

set_boot_mode(BootMode.MPI)
Expand Down
12 changes: 7 additions & 5 deletions services/sidecar/src/simcore_service_sidecar/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import List, Tuple
from typing import List, Optional, Tuple

import click

Expand All @@ -17,7 +17,9 @@
@click.option("--user_id", default=0, type=int, help="The user ID")
@click.option("--project_id", default="0", help="The project ID")
@click.option("--node_id", default=None, help="The node ID or nothing")
def main(job_id: str, user_id: str, project_id: str, node_id: str) -> List[str]:
def main(
job_id: str, user_id: str, project_id: str, node_id: str
) -> Optional[List[str]]:

log.info(
"STARTING task processing for user %s, project %s, node %s",
Expand All @@ -41,12 +43,12 @@ def main(job_id: str, user_id: str, project_id: str, node_id: str) -> List[str]:


async def run_sidecar(
job_id: str, user_id: str, project_id: str, node_id: str
) -> Tuple[List[str], str]:
job_id: str, user_id: str, project_id: str, node_id: Optional[str]
) -> Tuple[Optional[List[str]], Optional[str]]:
try:
async with DBContextManager() as db_engine:
async with RabbitMQ(config=RABBIT_CONFIG) as rabbit_mq:
next_task_nodes = await inspect(
next_task_nodes: Optional[List[str]] = await inspect(
db_engine, rabbit_mq, job_id, user_id, project_id, node_id=node_id
)
log.info(
Expand Down
19 changes: 15 additions & 4 deletions services/sidecar/src/simcore_service_sidecar/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import multiprocessing
import os
from pathlib import Path
from typing import Optional

from simcore_sdk.config.rabbit import Config as RabbitConfig

SERVICES_MAX_NANO_CPUS: int = min(
Expand All @@ -15,6 +18,16 @@
)
SWARM_STACK_NAME: str = os.environ.get("SWARM_STACK_NAME", "simcore")

SIDECAR_INPUT_FOLDER: Path = Path(
os.environ.get("SIDECAR_INPUT_FOLDER", Path.home() / "input")
)
SIDECAR_OUTPUT_FOLDER: Path = Path(
os.environ.get("SIDECAR_OUTPUT_FOLDER", Path.home() / "output")
)
SIDECAR_LOG_FOLDER: Path = Path(
os.environ.get("SIDECAR_LOG_FOLDER", Path.home() / "log")
)

SIDECAR_DOCKER_VOLUME_INPUT: str = os.environ.get(
"SIDECAR_DOCKER_VOLUME_INPUT", f"{SWARM_STACK_NAME}_input"
)
Expand Down Expand Up @@ -42,10 +55,9 @@
logging.getLogger("sqlalchemy.pool").setLevel(SIDECAR_LOGLEVEL)

RABBIT_CONFIG = RabbitConfig()

# sidecar celery starting mode overwrite
FORCE_START_CPU_MODE: str = os.environ.get("START_AS_MODE_CPU")
FORCE_START_GPU_MODE: str = os.environ.get("START_AS_MODE_GPU")
FORCE_START_CPU_MODE: Optional[str] = os.environ.get("START_AS_MODE_CPU")
FORCE_START_GPU_MODE: Optional[str] = os.environ.get("START_AS_MODE_GPU")

# if a node has this amount of CPUs it will be a candidate an MPI candidate
TARGET_MPI_NODE_CPU_COUNT: int = int(os.environ.get("TARGET_MPI_NODE_CPU_COUNT", "-1"))
Expand All @@ -59,4 +71,3 @@
REDLOCK_REFRESH_INTERVAL_SECONDS: float = max(
float(os.environ.get("REDLOCK_REFRESH_INTERVAL_SECONDS", "5.0")), 1.0
) # enforce at least 1 second

40 changes: 19 additions & 21 deletions services/sidecar/src/simcore_service_sidecar/core.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from datetime import datetime
from typing import List, Optional, Union, Dict
import traceback
from datetime import datetime
from typing import Dict, List, Optional, Union

import aiodocker
import aiopg
import networkx as nx
from celery.utils.log import get_task_logger
from aiopg.sa import Engine, SAConnection
from aiopg.sa.result import RowProxy
from sqlalchemy import and_, literal_column

from celery.utils.log import get_task_logger
from simcore_postgres_database.sidecar_models import ( # PENDING,
FAILED,
RUNNING,
Expand All @@ -20,10 +21,10 @@
from simcore_sdk.node_ports import log as node_port_log

from . import config, exceptions
from .db import DBContextManager
from .executor import Executor
from .rabbitmq import RabbitMQ
from .utils import execution_graph, find_entry_point, is_node_ready
from .db import DBContextManager

log = get_task_logger(__name__)
log.setLevel(config.SIDECAR_LOGLEVEL)
Expand Down Expand Up @@ -61,13 +62,12 @@ async def task_required_resources(node_id: str) -> Union[Dict[str, bool], None]:


async def _try_get_task_from_db(
db_connection: aiopg.sa.SAConnection,
db_connection: SAConnection,
graph: nx.DiGraph,
job_request_id: int,
job_request_id: str,
project_id: str,
node_id: str,
) -> Optional[aiopg.sa.result.RowProxy]:
task: aiopg.sa.result.RowProxy = None
) -> Optional[RowProxy]:
# Use SELECT FOR UPDATE TO lock the row
result = await db_connection.execute(
query=comp_tasks.select(for_update=True).where(
Expand All @@ -79,7 +79,7 @@ async def _try_get_task_from_db(
)
)
)
task = await result.fetchone()
task: RowProxy = await result.fetchone()

if not task:
log.debug("No task found")
Expand Down Expand Up @@ -114,9 +114,8 @@ async def _try_get_task_from_db(


async def _get_pipeline_from_db(
db_connection: aiopg.sa.SAConnection, project_id: str,
) -> aiopg.sa.result.RowProxy:
pipeline: aiopg.sa.result.RowProxy = None
db_connection: SAConnection, project_id: str,
) -> RowProxy:
# get the pipeline
result = await db_connection.execute(
comp_pipeline.select().where(comp_pipeline.c.project_id == project_id)
Expand All @@ -126,7 +125,7 @@ async def _get_pipeline_from_db(
f"Pipeline {result.rowcount} found instead of only one for project_id {project_id}"
)

pipeline = await result.first()
pipeline: RowProxy = await result.first()
if not pipeline:
raise exceptions.DatabaseError(f"Pipeline {project_id} not found")
log.debug("found pipeline %s", pipeline)
Expand All @@ -135,12 +134,12 @@ async def _get_pipeline_from_db(

async def inspect(
# pylint: disable=too-many-arguments
db_engine: aiopg.sa.Engine,
db_engine: Engine,
rabbit_mq: RabbitMQ,
job_request_id: int,
job_request_id: str,
user_id: str,
project_id: str,
node_id: str,
node_id: Optional[str],
) -> Optional[List[str]]:
log.debug(
"ENTERING inspect with user %s pipeline:node %s: %s",
Expand All @@ -149,11 +148,10 @@ async def inspect(
node_id,
)

pipeline: aiopg.sa.result.RowProxy = None
task: aiopg.sa.result.RowProxy = None
graph: nx.DiGraph = None
task: Optional[RowProxy] = None
graph: Optional[nx.DiGraph] = None
async with db_engine.acquire() as connection:
pipeline = await _get_pipeline_from_db(connection, project_id)
pipeline: RowProxy = await _get_pipeline_from_db(connection, project_id)
graph = execution_graph(pipeline)
if not node_id:
log.debug("NODE id was zero, this was the entry node id")
Expand Down
Loading