diff --git a/services/sidecar/requirements/_base.in b/services/sidecar/requirements/_base.in index d86e3320e6a..18fcf293c0f 100644 --- a/services/sidecar/requirements/_base.in +++ b/services/sidecar/requirements/_base.in @@ -3,6 +3,7 @@ # urllib3>=1.25.8 # Vulnerability +aiofile aio-pika aiodocker aiofiles @@ -16,4 +17,4 @@ networkx packaging pydantic tenacity -aioredlock \ No newline at end of file +aioredlock diff --git a/services/sidecar/requirements/_base.txt b/services/sidecar/requirements/_base.txt index b1c2d731949..0ed72136c27 100644 --- a/services/sidecar/requirements/_base.txt +++ b/services/sidecar/requirements/_base.txt @@ -6,6 +6,7 @@ # aio-pika==6.6.1 # via -r requirements/_base.in aiodocker==0.19.1 # via -r requirements/_base.in +aiofile==3.0.0 # via -r requirements/_base.in aiofiles==0.5.0 # via -r requirements/_base.in aiohttp==3.6.2 # via aiodocker aiopg==1.0.0 # via -r requirements/_base.in @@ -16,6 +17,7 @@ amqp==2.6.1 # via kombu async-timeout==3.0.1 # via aiohttp, aioredis attrs==19.3.0 # via aiohttp, aioredlock billiard==3.6.3.0 # via celery +caio==0.5.3 # via aiofile celery==4.4.7 # via -r requirements/_base.in chardet==3.0.4 # via aiohttp click==7.1.2 # via -r requirements/_base.in diff --git a/services/sidecar/requirements/_test.txt b/services/sidecar/requirements/_test.txt index 53c54bc1cda..67ed3be549e 100644 --- a/services/sidecar/requirements/_test.txt +++ b/services/sidecar/requirements/_test.txt @@ -6,6 +6,7 @@ # aio-pika==6.6.1 # via -r requirements/_base.txt aiodocker==0.19.1 # via -r requirements/_base.txt +aiofile==3.0.0 # via -r requirements/_base.txt aiofiles==0.5.0 # via -r requirements/_base.txt aiohttp==3.6.2 # via -r requirements/_base.txt, aiodocker, pytest-aiohttp aiopg==1.0.0 # via -r requirements/_base.txt, -r requirements/_test.in @@ -17,6 +18,7 @@ astroid==2.4.2 # via pylint async-timeout==3.0.1 # via -r requirements/_base.txt, aiohttp, aioredis attrs==19.3.0 # via -r requirements/_base.txt, aiohttp, aioredlock, pytest billiard==3.6.3.0 # via -r requirements/_base.txt, celery +caio==0.5.3 # via -r requirements/_base.txt, aiofile celery==4.4.7 # via -r requirements/_base.txt certifi==2020.6.20 # via requests chardet==3.0.4 # via -r requirements/_base.txt, aiohttp, requests diff --git a/services/sidecar/src/simcore_service_sidecar/boot_mode.py b/services/sidecar/src/simcore_service_sidecar/boot_mode.py index a715c58341d..cfaf75d36ba 100644 --- a/services/sidecar/src/simcore_service_sidecar/boot_mode.py +++ b/services/sidecar/src/simcore_service_sidecar/boot_mode.py @@ -1,6 +1,7 @@ # pylint: disable=global-statement from enum import Enum +from typing import Optional class BootMode(Enum): @@ -9,10 +10,10 @@ class BootMode(Enum): MPI = "MPI" -_sidecar_boot_mode: BootMode = None +_sidecar_boot_mode: Optional[BootMode] = None -def get_boot_mode() -> BootMode: +def get_boot_mode() -> Optional[BootMode]: global _sidecar_boot_mode return _sidecar_boot_mode diff --git a/services/sidecar/src/simcore_service_sidecar/celery_configurator.py b/services/sidecar/src/simcore_service_sidecar/celery_configurator.py index 91f5cc423b6..050a27429aa 100644 --- a/services/sidecar/src/simcore_service_sidecar/celery_configurator.py +++ b/services/sidecar/src/simcore_service_sidecar/celery_configurator.py @@ -7,16 +7,17 @@ use a look ahead function to check the type of upcoming task and schedule it accordingly. """ -from typing import Tuple +from typing import Optional, Tuple + from celery import Celery, states from simcore_sdk.config.rabbit import Config as RabbitConfig + from . import config -from .cli import run_sidecar -from .utils import wrap_async_call, is_gpu_node, start_as_mpi_node +from .boot_mode import BootMode, get_boot_mode, set_boot_mode from .celery_log_setup import get_task_logger -from .utils import assemble_celery_app +from .cli import run_sidecar from .core import task_required_resources -from .boot_mode import BootMode, set_boot_mode, get_boot_mode +from .utils import assemble_celery_app, is_gpu_node, start_as_mpi_node, wrap_async_call log = get_task_logger(__name__) @@ -71,7 +72,7 @@ def _dispatch_to_mpi_queue(user_id: str, project_id: str, node_id: str) -> None: def shared_task_dispatch( - celery_request, user_id: str, project_id: str, node_id: str = None + celery_request, user_id: str, project_id: str, node_id: Optional[str] = None ) -> None: """This is the original task which is run by either MPI, GPU or CPU node""" try: @@ -106,11 +107,15 @@ def configure_cpu_mode() -> Tuple[RabbitConfig, Celery]: # pylint: disable=unused-variable,unused-argument @app.task(name="comp.task", bind=True, ignore_result=True) - def entrypoint(self, user_id: str, project_id: str, node_id: str = None) -> None: + def entrypoint( + self, user_id: str, project_id: str, node_id: Optional[str] = None + ) -> None: shared_task_dispatch(self, user_id, project_id, node_id) @app.task(name="comp.task.cpu", bind=True) - def pipeline(self, user_id: str, project_id: str, node_id: str = None) -> None: + def pipeline( + self, user_id: str, project_id: str, node_id: Optional[str] = None + ) -> None: shared_task_dispatch(self, user_id, project_id, node_id) set_boot_mode(BootMode.CPU) @@ -125,7 +130,9 @@ def configure_gpu_mode() -> Tuple[RabbitConfig, Celery]: # pylint: disable=unused-variable @app.task(name="comp.task.gpu", bind=True) - def pipeline(self, user_id: str, project_id: str, node_id: str = None) -> None: + def pipeline( + self, user_id: str, project_id: str, node_id: Optional[str] = None + ) -> None: shared_task_dispatch(self, user_id, project_id, node_id) set_boot_mode(BootMode.GPU) @@ -140,7 +147,9 @@ def configure_mpi_node() -> Tuple[RabbitConfig, Celery]: # pylint: disable=unused-variable @app.task(name="comp.task.mpi", bind=True) - def pipeline(self, user_id: str, project_id: str, node_id: str = None) -> None: + def pipeline( + self, user_id: str, project_id: str, node_id: Optional[str] = None + ) -> None: shared_task_dispatch(self, user_id, project_id, node_id) set_boot_mode(BootMode.MPI) diff --git a/services/sidecar/src/simcore_service_sidecar/cli.py b/services/sidecar/src/simcore_service_sidecar/cli.py index 1c3bf676890..5c4c128f6c2 100644 --- a/services/sidecar/src/simcore_service_sidecar/cli.py +++ b/services/sidecar/src/simcore_service_sidecar/cli.py @@ -1,5 +1,5 @@ import logging -from typing import List, Tuple +from typing import List, Optional, Tuple import click @@ -17,7 +17,9 @@ @click.option("--user_id", default=0, type=int, help="The user ID") @click.option("--project_id", default="0", help="The project ID") @click.option("--node_id", default=None, help="The node ID or nothing") -def main(job_id: str, user_id: str, project_id: str, node_id: str) -> List[str]: +def main( + job_id: str, user_id: str, project_id: str, node_id: str +) -> Optional[List[str]]: log.info( "STARTING task processing for user %s, project %s, node %s", @@ -41,12 +43,12 @@ def main(job_id: str, user_id: str, project_id: str, node_id: str) -> List[str]: async def run_sidecar( - job_id: str, user_id: str, project_id: str, node_id: str -) -> Tuple[List[str], str]: + job_id: str, user_id: str, project_id: str, node_id: Optional[str] +) -> Tuple[Optional[List[str]], Optional[str]]: try: async with DBContextManager() as db_engine: async with RabbitMQ(config=RABBIT_CONFIG) as rabbit_mq: - next_task_nodes = await inspect( + next_task_nodes: Optional[List[str]] = await inspect( db_engine, rabbit_mq, job_id, user_id, project_id, node_id=node_id ) log.info( diff --git a/services/sidecar/src/simcore_service_sidecar/config.py b/services/sidecar/src/simcore_service_sidecar/config.py index cbba23132c7..b2bb0ae9da8 100644 --- a/services/sidecar/src/simcore_service_sidecar/config.py +++ b/services/sidecar/src/simcore_service_sidecar/config.py @@ -1,6 +1,9 @@ import logging import multiprocessing import os +from pathlib import Path +from typing import Optional + from simcore_sdk.config.rabbit import Config as RabbitConfig SERVICES_MAX_NANO_CPUS: int = min( @@ -15,6 +18,16 @@ ) SWARM_STACK_NAME: str = os.environ.get("SWARM_STACK_NAME", "simcore") +SIDECAR_INPUT_FOLDER: Path = Path( + os.environ.get("SIDECAR_INPUT_FOLDER", Path.home() / "input") +) +SIDECAR_OUTPUT_FOLDER: Path = Path( + os.environ.get("SIDECAR_OUTPUT_FOLDER", Path.home() / "output") +) +SIDECAR_LOG_FOLDER: Path = Path( + os.environ.get("SIDECAR_LOG_FOLDER", Path.home() / "log") +) + SIDECAR_DOCKER_VOLUME_INPUT: str = os.environ.get( "SIDECAR_DOCKER_VOLUME_INPUT", f"{SWARM_STACK_NAME}_input" ) @@ -42,10 +55,9 @@ logging.getLogger("sqlalchemy.pool").setLevel(SIDECAR_LOGLEVEL) RABBIT_CONFIG = RabbitConfig() - # sidecar celery starting mode overwrite -FORCE_START_CPU_MODE: str = os.environ.get("START_AS_MODE_CPU") -FORCE_START_GPU_MODE: str = os.environ.get("START_AS_MODE_GPU") +FORCE_START_CPU_MODE: Optional[str] = os.environ.get("START_AS_MODE_CPU") +FORCE_START_GPU_MODE: Optional[str] = os.environ.get("START_AS_MODE_GPU") # if a node has this amount of CPUs it will be a candidate an MPI candidate TARGET_MPI_NODE_CPU_COUNT: int = int(os.environ.get("TARGET_MPI_NODE_CPU_COUNT", "-1")) @@ -59,4 +71,3 @@ REDLOCK_REFRESH_INTERVAL_SECONDS: float = max( float(os.environ.get("REDLOCK_REFRESH_INTERVAL_SECONDS", "5.0")), 1.0 ) # enforce at least 1 second - diff --git a/services/sidecar/src/simcore_service_sidecar/core.py b/services/sidecar/src/simcore_service_sidecar/core.py index 829a500e0fb..bd392f0f71e 100644 --- a/services/sidecar/src/simcore_service_sidecar/core.py +++ b/services/sidecar/src/simcore_service_sidecar/core.py @@ -1,13 +1,14 @@ -from datetime import datetime -from typing import List, Optional, Union, Dict import traceback +from datetime import datetime +from typing import Dict, List, Optional, Union import aiodocker -import aiopg import networkx as nx -from celery.utils.log import get_task_logger +from aiopg.sa import Engine, SAConnection +from aiopg.sa.result import RowProxy from sqlalchemy import and_, literal_column +from celery.utils.log import get_task_logger from simcore_postgres_database.sidecar_models import ( # PENDING, FAILED, RUNNING, @@ -20,10 +21,10 @@ from simcore_sdk.node_ports import log as node_port_log from . import config, exceptions +from .db import DBContextManager from .executor import Executor from .rabbitmq import RabbitMQ from .utils import execution_graph, find_entry_point, is_node_ready -from .db import DBContextManager log = get_task_logger(__name__) log.setLevel(config.SIDECAR_LOGLEVEL) @@ -61,13 +62,12 @@ async def task_required_resources(node_id: str) -> Union[Dict[str, bool], None]: async def _try_get_task_from_db( - db_connection: aiopg.sa.SAConnection, + db_connection: SAConnection, graph: nx.DiGraph, - job_request_id: int, + job_request_id: str, project_id: str, node_id: str, -) -> Optional[aiopg.sa.result.RowProxy]: - task: aiopg.sa.result.RowProxy = None +) -> Optional[RowProxy]: # Use SELECT FOR UPDATE TO lock the row result = await db_connection.execute( query=comp_tasks.select(for_update=True).where( @@ -79,7 +79,7 @@ async def _try_get_task_from_db( ) ) ) - task = await result.fetchone() + task: RowProxy = await result.fetchone() if not task: log.debug("No task found") @@ -114,9 +114,8 @@ async def _try_get_task_from_db( async def _get_pipeline_from_db( - db_connection: aiopg.sa.SAConnection, project_id: str, -) -> aiopg.sa.result.RowProxy: - pipeline: aiopg.sa.result.RowProxy = None + db_connection: SAConnection, project_id: str, +) -> RowProxy: # get the pipeline result = await db_connection.execute( comp_pipeline.select().where(comp_pipeline.c.project_id == project_id) @@ -126,7 +125,7 @@ async def _get_pipeline_from_db( f"Pipeline {result.rowcount} found instead of only one for project_id {project_id}" ) - pipeline = await result.first() + pipeline: RowProxy = await result.first() if not pipeline: raise exceptions.DatabaseError(f"Pipeline {project_id} not found") log.debug("found pipeline %s", pipeline) @@ -135,12 +134,12 @@ async def _get_pipeline_from_db( async def inspect( # pylint: disable=too-many-arguments - db_engine: aiopg.sa.Engine, + db_engine: Engine, rabbit_mq: RabbitMQ, - job_request_id: int, + job_request_id: str, user_id: str, project_id: str, - node_id: str, + node_id: Optional[str], ) -> Optional[List[str]]: log.debug( "ENTERING inspect with user %s pipeline:node %s: %s", @@ -149,11 +148,10 @@ async def inspect( node_id, ) - pipeline: aiopg.sa.result.RowProxy = None - task: aiopg.sa.result.RowProxy = None - graph: nx.DiGraph = None + task: Optional[RowProxy] = None + graph: Optional[nx.DiGraph] = None async with db_engine.acquire() as connection: - pipeline = await _get_pipeline_from_db(connection, project_id) + pipeline: RowProxy = await _get_pipeline_from_db(connection, project_id) graph = execution_graph(pipeline) if not node_id: log.debug("NODE id was zero, this was the entry node id") diff --git a/services/sidecar/src/simcore_service_sidecar/executor.py b/services/sidecar/src/simcore_service_sidecar/executor.py index e1bb50bd340..98851e22cbf 100644 --- a/services/sidecar/src/simcore_service_sidecar/executor.py +++ b/services/sidecar/src/simcore_service_sidecar/executor.py @@ -3,39 +3,40 @@ import shutil import time from pathlib import Path -from typing import Dict +from typing import Dict, Optional import aiodocker import aiopg import attr -from celery.utils.log import get_task_logger from packaging import version from tenacity import retry, stop_after_attempt +from celery.utils.log import get_task_logger from servicelib.utils import fire_and_forget_task, logged_gather from simcore_sdk import node_data, node_ports from simcore_sdk.node_ports.dbmanager import DBManager from . import config, exceptions +from .boot_mode import get_boot_mode from .log_parser import LogType, monitor_logs_task from .rabbitmq import RabbitMQ -from .boot_mode import get_boot_mode +from .utils import get_volume_mount_point log = get_task_logger(__name__) @attr.s(auto_attribs=True) class TaskSharedVolumes: - input_folder: Path = None - output_folder: Path = None - log_folder: Path = None + input_folder: Optional[Path] = None + output_folder: Optional[Path] = None + log_folder: Optional[Path] = None @classmethod def from_task(cls, task: aiopg.sa.result.RowProxy): return cls( - Path.home() / f"input/{task.job_id}", - Path.home() / f"output/{task.job_id}", - Path.home() / f"log/{task.job_id}", + config.SIDECAR_INPUT_FOLDER / f"{task.job_id}", + config.SIDECAR_OUTPUT_FOLDER / f"{task.job_id}", + config.SIDECAR_LOG_FOLDER / f"{task.job_id}", ) def create(self) -> None: @@ -45,7 +46,7 @@ def create(self) -> None: self.log_folder, ]: if folder.exists(): - shutil.rmtree(folder) + shutil.rmtree(str(folder)) folder.mkdir(parents=True, exist_ok=True) @@ -68,6 +69,7 @@ async def run(self): self.task.node_id, self.task.internal_id, ) + try: await self.preprocess() await self.process() @@ -116,7 +118,7 @@ async def _process_task_input(self, port: node_ports.Port, input_ports: Dict): # the filename is not necessarily the name of the port, might be mapped mapped_filename = Path(path).name input_ports[port.key] = str(port_value) - final_path = Path(self.shared_folders.input_folder, mapped_filename) + final_path = self.shared_folders.input_folder / mapped_filename shutil.copy(str(path), str(final_path)) log.debug( "DOWNLOAD successfull from %s to %s via %s", @@ -132,7 +134,7 @@ async def _process_task_input(self, port: node_ports.Port, input_ports: Dict): async def _process_task_inputs(self) -> Dict: log.debug("Inputs parsing...") - input_ports = dict() + input_ports: Dict = {} try: PORTS = await self._get_node_ports() except node_ports.exceptions.NodeNotFound: @@ -204,6 +206,7 @@ async def _pull_image(self): raise async def _run_container(self): + # pylint: disable=too-many-statements start_time = time.perf_counter() container = None docker_image = f"{config.DOCKER_REGISTRY}/{self.task.image['name']}:{self.task.image['tag']}" @@ -215,6 +218,14 @@ async def _run_container(self): ] env_vars.append(f"SC_COMP_SERVICES_SCHEDULED_AS={get_boot_mode().value}") + host_input_path = await get_volume_mount_point( + config.SIDECAR_DOCKER_VOLUME_INPUT + ) + host_output_path = await get_volume_mount_point( + config.SIDECAR_DOCKER_VOLUME_OUTPUT + ) + host_log_path = await get_volume_mount_point(config.SIDECAR_DOCKER_VOLUME_LOG) + docker_container_config = { "Env": env_vars, "Cmd": "run", @@ -232,9 +243,11 @@ async def _run_container(self): "Init": True, "AutoRemove": False, "Binds": [ - f"{config.SIDECAR_DOCKER_VOLUME_INPUT}:/input", - f"{config.SIDECAR_DOCKER_VOLUME_OUTPUT}:/output", - f"{config.SIDECAR_DOCKER_VOLUME_LOG}:/log", + # NOTE: the docker engine is mounted, so only named volumes are usable. Therefore for a selective + # subfolder mount we need to get the path as seen from the host computer (see https://github.com/ITISFoundation/osparc-simcore/issues/1723) + f"{host_input_path}/{self.task.job_id}:/input/{self.task.job_id}", + f"{host_output_path}/{self.task.job_id}:/output/{self.task.job_id}", + f"{host_log_path}/{self.task.job_id}:/log/{self.task.job_id}", ], }, } @@ -243,6 +256,7 @@ async def _run_container(self): ) # volume paths for car container (w/o prefix) result = "FAILURE" + log_processor_task = None try: docker_client: aiodocker.Docker = aiodocker.Docker() await self._post_messages( @@ -253,9 +267,9 @@ async def _run_container(self): config=docker_container_config ) # start monitoring logs + log_file = self.shared_folders.log_folder / "log.dat" if self.integration_version == version.parse("0.0.0"): # touch output file, so it's ready for the container (v0) - log_file = self.shared_folders.log_folder / "log.dat" log_file.touch() log_processor_task = fire_and_forget_task( @@ -263,7 +277,7 @@ async def _run_container(self): ) else: log_processor_task = fire_and_forget_task( - monitor_logs_task(container, self._post_messages) + monitor_logs_task(container, self._post_messages, log_file) ) # start the container await container.start() @@ -301,7 +315,7 @@ async def _run_container(self): container_data = await container.show() if container_data["State"]["ExitCode"] > 0: raise exceptions.SidecarException( - f"{docker_image} completed with error code {container_data['State']['ExitCode']}: {container_data['State']['Error']}" + f"{docker_image} completed with error code {container_data['State']['ExitCode']}:\n {container_data['State']['Error']}\n:Last logs:\n{container.logs(stdout=True, stderr=True, tail=10)}" ) # ensure progress 1.0 is sent await self._post_messages(LogType.PROGRESS, "1.0") @@ -326,7 +340,8 @@ async def _run_container(self): # clean up the container await container.delete(force=True) # stop monitoring logs now - log_processor_task.cancel() + if log_processor_task: + log_processor_task.cancel() # instrumentation await self.rabbit_mq.post_instrumentation_message( { @@ -340,7 +355,8 @@ async def _run_container(self): "result": result, } ) - await log_processor_task + if log_processor_task: + await log_processor_task async def _process_task_output(self): """ There will be some files in the /output @@ -402,7 +418,7 @@ async def _process_task_log(self): await self._post_messages( LogType.LOG, "[sidecar]Uploading logs...", ) - if self.shared_folders.log_folder.exists(): + if self.shared_folders.log_folder and self.shared_folders.log_folder.exists(): await node_data.data_manager.push( self.shared_folders.log_folder, rename_to="logs" ) diff --git a/services/sidecar/src/simcore_service_sidecar/log_parser.py b/services/sidecar/src/simcore_service_sidecar/log_parser.py index 0076e4260be..26036718e5c 100644 --- a/services/sidecar/src/simcore_service_sidecar/log_parser.py +++ b/services/sidecar/src/simcore_service_sidecar/log_parser.py @@ -1,13 +1,15 @@ import asyncio import logging import re +import tempfile from enum import Enum from pathlib import Path -import aiodocker -from typing import Awaitable, Callable, Tuple, Union +from typing import Awaitable, Callable, Optional, Tuple, Union +import aiodocker import aiofiles from aiodocker.containers import DockerContainer +from aiofile import AIOFile, Writer from . import exceptions @@ -58,16 +60,19 @@ async def parse_line(line: str) -> Tuple[LogType, str]: async def monitor_logs_task( - log_file_or_container: Union[Path, DockerContainer], + mon_log_file_or_container: Union[Path, DockerContainer], log_cb: Awaitable[Callable[[LogType, str], None]], + out_log_file: Optional[Path] = None, ) -> None: try: - if isinstance(log_file_or_container, Path): - log.debug("start monitoring log in %s", log_file_or_container) - await _monitor_log_file(log_file_or_container, log_cb) - elif isinstance(log_file_or_container, DockerContainer): - log.debug("start monitoring docker logs of %s", log_file_or_container) - await _monitor_docker_container(log_file_or_container, log_cb) + if isinstance(mon_log_file_or_container, Path): + log.debug("start monitoring log in %s", mon_log_file_or_container) + await _monitor_log_file(mon_log_file_or_container, log_cb) + elif isinstance(mon_log_file_or_container, DockerContainer): + log.debug("start monitoring docker logs of %s", mon_log_file_or_container) + await _monitor_docker_container( + mon_log_file_or_container, log_cb, out_log_file + ) else: raise exceptions.SidecarException("Invalid log type") @@ -77,19 +82,34 @@ async def monitor_logs_task( async def _monitor_docker_container( - container: DockerContainer, log_cb: Awaitable[Callable[[LogType, str], None]] + container: DockerContainer, + log_cb: Awaitable[Callable[[LogType, str], None]], + out_log_file: Optional[Path], ) -> None: # Avoids raising UnboundLocalError: local variable 'log_type' referenced before assignment log_type, parsed_line = LogType.INSTRUMENTATION, "Undefined" + log_file = out_log_file + if not out_log_file: + temporary_file = tempfile.NamedTemporaryFile(delete=False, suffix=".dat") + temporary_file.close() + log_file = Path(temporary_file.name) + try: - async for line in container.log(stdout=True, stderr=True, follow=True): - log_type, parsed_line = await parse_line(line) - await log_cb(log_type, parsed_line) + async with AIOFile(str(log_file), "w+") as afp: + writer = Writer(afp) + async for line in container.log(stdout=True, stderr=True, follow=True): + log_type, parsed_line = await parse_line(line) + await log_cb(log_type, parsed_line) + await writer(f"{log_type.name}: {parsed_line}") except aiodocker.exceptions.DockerError as e: log_type, parsed_line = await parse_line( f"Could not recover logs because: {str(e)}" ) await log_cb(log_type, parsed_line) + finally: + # clean up + if not out_log_file and log_file: + log_file.unlink() async def _monitor_log_file( diff --git a/services/sidecar/src/simcore_service_sidecar/mpi_lock.py b/services/sidecar/src/simcore_service_sidecar/mpi_lock.py index 31db5c5e97f..2355f5ca21d 100644 --- a/services/sidecar/src/simcore_service_sidecar/mpi_lock.py +++ b/services/sidecar/src/simcore_service_sidecar/mpi_lock.py @@ -12,13 +12,13 @@ import asyncio import datetime -from typing import Any, Callable -from aioredlock import Aioredlock, LockError, Lock +import logging from threading import Thread -from typing import Tuple -from simcore_service_sidecar import config +from typing import Any, Callable, Optional, Tuple -import logging +from aioredlock import Aioredlock, Lock, LockError + +from simcore_service_sidecar import config logger = logging.getLogger(__name__) @@ -46,7 +46,7 @@ async def retry_for_result( def start_background_lock_extender( lock_manager: Aioredlock, lock: Lock, loop: asyncio.BaseEventLoop -) -> asyncio.Future: +) -> None: """Will periodically extend the duration of the lock""" async def extender_worker(lock_manager: Aioredlock): @@ -67,7 +67,7 @@ def thread_worker( async def try_to_acquire_lock( lock_manager: Aioredlock, resource_name: str -) -> Tuple[bool, Lock]: +) -> Optional[Tuple[bool, Lock]]: # Try to acquire the lock: try: return await lock_manager.lock( diff --git a/services/sidecar/src/simcore_service_sidecar/rabbitmq.py b/services/sidecar/src/simcore_service_sidecar/rabbitmq.py index 16b9fe5e263..960b3038025 100644 --- a/services/sidecar/src/simcore_service_sidecar/rabbitmq.py +++ b/services/sidecar/src/simcore_service_sidecar/rabbitmq.py @@ -1,7 +1,8 @@ import json import logging import socket -from typing import Dict, List, Optional, Union +from asyncio.futures import CancelledError +from typing import Any, Dict, List, Optional, Union import aio_pika import tenacity @@ -13,22 +14,31 @@ log = logging.getLogger(__file__) -def _close_callback(exc: Optional[BaseException]): +def _close_callback(sender: Any, exc: Optional[BaseException]): if exc: - log.error("Rabbit connection closed with exception: %s", exc) + if isinstance(exc, CancelledError): + log.info("Rabbit connection was cancelled", exc_info=True) + else: + log.error( + "Rabbit connection closed with exception from %s:", + sender, + exc_info=True, + ) else: - log.info("Rabbit connection closed") + log.info("Rabbit connection closed from %s", sender) def _reconnect_callback(): log.warning("Rabbit connection reconnected") -def _channel_close_callback(exc: Optional[BaseException]): +def _channel_close_callback(sender: Any, exc: Optional[BaseException]): if exc: - log.error("Rabbit channel closed with exception: %s", exc) + log.error( + "Rabbit channel closed with exception from %s:", sender, exc_info=True + ) else: - log.info("Rabbit channel closed") + log.info("Rabbit channel closed from %s", sender) class RabbitMQ(BaseModel): @@ -80,7 +90,9 @@ async def close(self): log.debug("Closing connection...") await self.connection.close() - async def _post_message(self, exchange: aio_pika.Exchange, data: Dict[str, str]): + async def _post_message( + self, exchange: aio_pika.Exchange, data: Dict[str, Union[str, Any]] + ): await exchange.publish( aio_pika.Message(body=json.dumps(data).encode()), routing_key="" ) @@ -118,7 +130,7 @@ async def post_progress_message( ) async def post_instrumentation_message( - self, instrumentation_data: Dict[str, str], + self, instrumentation_data: Dict, ): await self._post_message( self.instrumentation_exchange, data=instrumentation_data, diff --git a/services/sidecar/src/simcore_service_sidecar/utils.py b/services/sidecar/src/simcore_service_sidecar/utils.py index d101d392b4a..ca1ae57a092 100644 --- a/services/sidecar/src/simcore_service_sidecar/utils.py +++ b/services/sidecar/src/simcore_service_sidecar/utils.py @@ -4,16 +4,19 @@ from typing import Awaitable, List import aiodocker -import aiopg +from aiopg.sa.result import RowProxy import networkx as nx +from aiodocker.volumes import DockerVolume +from aiopg.sa import SAConnection from sqlalchemy import and_ from celery import Celery -from simcore_postgres_database.sidecar_models import SUCCESS, comp_pipeline, comp_tasks +from simcore_postgres_database.sidecar_models import SUCCESS, comp_tasks from simcore_sdk.config.rabbit import Config as RabbitConfig from simcore_service_sidecar import config from simcore_service_sidecar.mpi_lock import acquire_mpi_lock +from .exceptions import SidecarException logger = logging.getLogger(__name__) @@ -31,9 +34,9 @@ def find_entry_point(g: nx.DiGraph) -> List: async def is_node_ready( - task: comp_tasks, + task: RowProxy, graph: nx.DiGraph, - db_connection: aiopg.sa.SAConnection, + db_connection: SAConnection, _logger: logging.Logger, ) -> bool: query = comp_tasks.select().where( @@ -62,7 +65,7 @@ async def is_node_ready( return True -def execution_graph(pipeline: comp_pipeline) -> nx.DiGraph: +def execution_graph(pipeline: RowProxy) -> nx.DiGraph: d = pipeline.dag_adjacency_list G = nx.DiGraph() @@ -108,7 +111,7 @@ async def async_is_gpu_node() -> bool: def start_as_mpi_node() -> bool: """ Checks if this node can be a taraget to start as an MPI node. - If it can it will try to grab a Redlock, ensure it is the only service who can be + If it can it will try to grab a Redlock, ensure it is the only service who can be started as MPI. """ import subprocess @@ -134,3 +137,19 @@ def assemble_celery_app(task_default_queue: str, rabbit_config: RabbitConfig) -> ) app.conf.task_default_queue = task_default_queue return app + + +async def get_volume_mount_point(volume_name: str) -> str: + try: + docker_client: aiodocker.Docker = aiodocker.Docker() + volume_attributes = await DockerVolume(docker_client, volume_name).show() + return volume_attributes["Mountpoint"] + + except aiodocker.exceptions.DockerError as err: + raise SidecarException( + f"Error while retrieving docker volume {volume_name}" + ) from err + except KeyError as err: + raise SidecarException( + f"docker volume {volume_name} does not contain Mountpoint" + ) from err diff --git a/services/sidecar/tests/integration/test_sidecar.py b/services/sidecar/tests/integration/test_sidecar.py index ac32b8b36cc..3be1b29712f 100644 --- a/services/sidecar/tests/integration/test_sidecar.py +++ b/services/sidecar/tests/integration/test_sidecar.py @@ -15,7 +15,7 @@ from yarl import URL from simcore_sdk.models.pipeline_models import ComputationalPipeline, ComputationalTask -from simcore_service_sidecar import config +from simcore_service_sidecar import config, utils SIMCORE_S3_ID = 0 @@ -41,11 +41,25 @@ def user_id() -> int: return 1 +@pytest.fixture +async def mock_sidecar_get_volume_mount_point(monkeypatch): + async def mock_get_volume_mount_point(volume_name: str) -> str: + return volume_name + + monkeypatch.setattr(utils, "get_volume_mount_point", mock_get_volume_mount_point) + + # test the monkeypatching + fake_name = "blahblah" + x = await utils.get_volume_mount_point(fake_name) + assert x == fake_name + + @pytest.fixture def sidecar_config( postgres_dsn: Dict[str, str], docker_registry: str, rabbit_config: config.RabbitConfig, + mock_sidecar_get_volume_mount_point, ) -> None: # NOTE: in integration tests the sidecar runs bare-metal which means docker volume cannot be used. config.SIDECAR_DOCKER_VOLUME_INPUT = Path.home() / "input"