diff --git a/packages/models-library/src/models_library/settings/rabbit.py b/packages/models-library/src/models_library/settings/rabbit.py index 05cd4116988..480c010cb56 100644 --- a/packages/models-library/src/models_library/settings/rabbit.py +++ b/packages/models-library/src/models_library/settings/rabbit.py @@ -1,7 +1,7 @@ import warnings -from typing import Dict, Optional +from typing import Dict -from pydantic import BaseSettings, Extra, validator +from pydantic import BaseSettings, Extra from pydantic.networks import AnyUrl from pydantic.types import PositiveInt, SecretStr @@ -25,28 +25,21 @@ class RabbitConfig(BaseSettings): user: str = "simcore" password: SecretStr = SecretStr("simcore") - dsn: Optional[RabbitDsn] = None - # channels channels: Dict[str, str] = { "log": "comp.backend.channels.log", "instrumentation": "comp.backend.channels.instrumentation", } - @validator("dsn", pre=True) - @classmethod - def autofill_dsn(cls, v, values): - if not v and all( - key in values for key in cls.__fields__ if key not in ["dsn", "channels"] - ): - return RabbitDsn.build( - scheme="amqp", - user=values["user"], - password=values["password"].get_secret_value(), - host=values["host"], - port=f"{values['port']}", - ) - return v + @property + def dsn(self) -> str: + return RabbitDsn.build( + scheme="amqp", + user=self.user, + password=self.password.get_secret_value(), + host=self.host, + port=f"{self.port}", + ) class Config: env_prefix = "RABBIT_" diff --git a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py index 5b7a7f62077..7578328a3ae 100644 --- a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py @@ -6,12 +6,15 @@ import json import logging import socket -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Iterable, Optional, Tuple import aio_pika import pytest import tenacity from models_library.settings.rabbit import RabbitConfig +from tenacity.before_sleep import before_sleep_log +from tenacity.stop import stop_after_attempt +from tenacity.wait import wait_fixed from .helpers.utils_docker import get_service_published_port @@ -31,16 +34,12 @@ async def rabbit_config( ) -> RabbitConfig: prefix = testing_environ_vars["SWARM_STACK_NAME"] assert f"{prefix}_rabbit" in docker_stack["services"] - rabbit_config = RabbitConfig( user=testing_environ_vars["RABBIT_USER"], password=testing_environ_vars["RABBIT_PASSWORD"], host="127.0.0.1", port=get_service_published_port("rabbit", testing_environ_vars["RABBIT_PORT"]), - channels={ - "log": "logs_channel", - "instrumentation": "instrumentation_channel", - }, + channels=json.loads(testing_environ_vars["RABBIT_CHANNELS"]), ) url = rabbit_config.dsn @@ -57,7 +56,7 @@ async def rabbit_service(rabbit_config: RabbitConfig, monkeypatch) -> RabbitConf monkeypatch.setenv("RABBIT_PASSWORD", rabbit_config.password.get_secret_value()) monkeypatch.setenv("RABBIT_CHANNELS", json.dumps(rabbit_config.channels)) - return RabbitConfig + return rabbit_config @pytest.fixture(scope="function") @@ -126,7 +125,7 @@ async def rabbit_exchange( async def rabbit_queue( rabbit_channel: aio_pika.Channel, rabbit_exchange: Tuple[aio_pika.Exchange, aio_pika.Exchange], -) -> aio_pika.Queue: +) -> Iterable[aio_pika.Queue]: (logs_exchange, instrumentation_exchange) = rabbit_exchange # declare queue queue = await rabbit_channel.declare_queue(exclusive=True) @@ -141,9 +140,9 @@ async def rabbit_queue( @tenacity.retry( - wait=tenacity.wait_fixed(5), - stop=tenacity.stop_after_attempt(60), - before_sleep=tenacity.before_sleep_log(log, logging.INFO), + wait=wait_fixed(5), + stop=stop_after_attempt(60), + before_sleep=before_sleep_log(log, logging.INFO), reraise=True, ) async def wait_till_rabbit_responsive(url: str) -> None: diff --git a/packages/pytest-simcore/src/pytest_simcore/simcore_storage_service.py b/packages/pytest-simcore/src/pytest_simcore/simcore_storage_service.py index f7c06caa6da..d7aaca32b85 100644 --- a/packages/pytest-simcore/src/pytest_simcore/simcore_storage_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/simcore_storage_service.py @@ -3,7 +3,7 @@ # pylint:disable=redefined-outer-name import os from copy import deepcopy -from typing import Dict +from typing import Dict, Iterable import aiohttp import pytest @@ -16,7 +16,7 @@ @pytest.fixture(scope="module") -def storage_endpoint(docker_stack: Dict, testing_environ_vars: Dict) -> URL: +def storage_endpoint(docker_stack: Dict, testing_environ_vars: Dict) -> Iterable[URL]: prefix = testing_environ_vars["SWARM_STACK_NAME"] assert f"{prefix}_storage" in docker_stack["services"] @@ -39,7 +39,7 @@ async def storage_service( ) -> URL: await wait_till_storage_responsive(storage_endpoint) - yield storage_endpoint + return storage_endpoint # HELPERS -- diff --git a/packages/pytest-simcore/src/pytest_simcore/websocket_client.py b/packages/pytest-simcore/src/pytest_simcore/websocket_client.py index c54e26f7b53..202704fbbcf 100644 --- a/packages/pytest-simcore/src/pytest_simcore/websocket_client.py +++ b/packages/pytest-simcore/src/pytest_simcore/websocket_client.py @@ -2,7 +2,7 @@ # pylint:disable=unused-argument # pylint:disable=redefined-outer-name -from typing import Callable, Optional +from typing import AsyncIterable, Awaitable, Callable, Iterable, Optional from uuid import uuid4 import pytest @@ -23,7 +23,7 @@ def create() -> str: @pytest.fixture() -def socketio_url_factory(client) -> Callable: +def socketio_url_factory(client) -> Iterable[Callable[[Optional[TestClient]], str]]: def create_url(client_override: Optional[TestClient] = None) -> str: SOCKET_IO_PATH = "/socket.io/" return str((client_override or client).make_url(SOCKET_IO_PATH)) @@ -32,7 +32,9 @@ def create_url(client_override: Optional[TestClient] = None) -> str: @pytest.fixture() -async def security_cookie_factory(client: TestClient) -> Callable: +async def security_cookie_factory( + client: TestClient, +) -> AsyncIterable[Callable[[Optional[TestClient]], Awaitable[str]]]: async def creator(client_override: Optional[TestClient] = None) -> str: # get the cookie by calling the root entrypoint resp = await (client_override or client).get("/v0/") @@ -55,7 +57,9 @@ async def socketio_client_factory( socketio_url_factory: Callable, security_cookie_factory: Callable, client_session_id_factory: Callable, -) -> Callable[[str, Optional[TestClient]], socketio.AsyncClient]: +) -> AsyncIterable[ + Callable[[Optional[str], Optional[TestClient]], Awaitable[socketio.AsyncClient]] +]: clients = [] async def connect( @@ -67,6 +71,7 @@ async def connect( sio = socketio.AsyncClient(ssl_verify=False) # enginio 3.10.0 introduced ssl verification + assert client_session_id url = str( URL(socketio_url_factory(client)).with_query( {"client_session_id": client_session_id} diff --git a/packages/service-library/src/servicelib/archiving_utils.py b/packages/service-library/src/servicelib/archiving_utils.py index 22d07f57c4b..195ad5c52b7 100644 --- a/packages/service-library/src/servicelib/archiving_utils.py +++ b/packages/service-library/src/servicelib/archiving_utils.py @@ -22,8 +22,8 @@ def _full_file_path_from_dir_and_subdirs(dir_path: Path) -> Iterator[Path]: def _strip_directory_from_path(input_path: Path, to_strip: Path) -> Path: - to_strip = f"{str(to_strip)}/" - return Path(str(input_path).replace(to_strip, "")) + _to_strip = f"{str(to_strip)}/" + return Path(str(input_path).replace(_to_strip, "")) def _read_in_chunks(file_object, chunk_size=1024 * 8): diff --git a/packages/settings-library/src/settings_library/logging_utils.py b/packages/settings-library/src/settings_library/logging_utils.py index 9fbb22af5d9..4d925809e9f 100644 --- a/packages/settings-library/src/settings_library/logging_utils.py +++ b/packages/settings-library/src/settings_library/logging_utils.py @@ -1,10 +1,11 @@ import logging from functools import cached_property +from typing import Any class MixinLoggingSettings: @classmethod - def validate_log_level(cls, value) -> str: + def validate_log_level(cls, value: Any) -> str: try: getattr(logging, value.upper()) except AttributeError as err: diff --git a/packages/simcore-sdk/Makefile b/packages/simcore-sdk/Makefile index 64e3b015ef2..4826ab05739 100644 --- a/packages/simcore-sdk/Makefile +++ b/packages/simcore-sdk/Makefile @@ -8,6 +8,7 @@ # by sanderegg, pcrespov # include ../../scripts/common.Makefile +include ../../scripts/common-package.Makefile .PHONY: install-dev install-prod install-ci diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/links.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/links.py index 572ba9f210f..5f75b36e6b7 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/links.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/links.py @@ -3,7 +3,7 @@ from models_library.projects_nodes_io import UUID_REGEX, BaseFileLink, DownloadLink from models_library.projects_nodes_io import PortLink as BasePortLink -from pydantic import Extra, Field, StrictBool, StrictFloat, StrictInt, StrictStr +from pydantic import AnyUrl, Extra, Field, StrictBool, StrictFloat, StrictInt, StrictStr class PortLink(BasePortLink): @@ -11,7 +11,7 @@ class PortLink(BasePortLink): class FileLink(BaseFileLink): - """ allow all kind of file links """ + """allow all kind of file links""" class Config: extra = Extra.allow @@ -22,5 +22,6 @@ class Config: ] ItemConcreteValue = Union[int, float, bool, str, Path] +ItemValue = Union[int, float, bool, str, AnyUrl] __all__ = ["FileLink", "DownloadLink", "PortLink", "DataItemValue", "ItemConcreteValue"] diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py index 761c225b1eb..5abb8ccaec1 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py @@ -1,14 +1,14 @@ import logging from collections import deque from pathlib import Path -from typing import Any, Callable, Coroutine, Dict, Type +from typing import Any, Callable, Coroutine, Dict, Optional, Type from pydantic import BaseModel, Field from servicelib.utils import logged_gather from ..node_ports_common.dbmanager import DBManager from ..node_ports_common.exceptions import PortNotFound, UnboundPortError -from .links import ItemConcreteValue +from .links import ItemConcreteValue, ItemValue from .port_utils import is_file_type from .ports_mapping import InputsList, OutputsList @@ -55,7 +55,16 @@ async def outputs(self) -> OutputsList: await self._auto_update_from_db() return self.internal_outputs - async def get(self, item_key: str) -> ItemConcreteValue: + async def get_value_link(self, item_key: str) -> Optional[ItemValue]: + try: + return await (await self.inputs)[item_key].get_value() + except UnboundPortError: + # not available try outputs + pass + # if this fails it will raise an exception + return await (await self.outputs)[item_key].get_value() + + async def get(self, item_key: str) -> Optional[ItemConcreteValue]: try: return await (await self.inputs)[item_key].get() except UnboundPortError: diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py index 99197d79ffa..1e52dc8f2af 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py @@ -1,14 +1,21 @@ import logging from pathlib import Path from pprint import pformat -from typing import Any, Dict, Optional, Tuple, Type +from typing import Any, Callable, Dict, Optional, Tuple, Type from models_library.services import PROPERTY_KEY_RE, ServiceProperty -from pydantic import Field, PrivateAttr, validator +from pydantic import AnyUrl, Field, PrivateAttr, validator from ..node_ports_common.exceptions import InvalidItemTypeError from . import port_utils -from .links import DataItemValue, DownloadLink, FileLink, ItemConcreteValue, PortLink +from .links import ( + DataItemValue, + DownloadLink, + FileLink, + ItemConcreteValue, + ItemValue, + PortLink, +) log = logging.getLogger(__name__) @@ -28,8 +35,8 @@ class Port(ServiceProperty): default_value: Optional[DataItemValue] = Field(None, alias="defaultValue") value: Optional[DataItemValue] - _py_value_type: Tuple[Type[ItemConcreteValue]] = PrivateAttr() - _py_value_converter: Type[ItemConcreteValue] = PrivateAttr() + _py_value_type: Tuple[Type[ItemConcreteValue], ...] = PrivateAttr() + _py_value_converter: Callable[[Any], ItemConcreteValue] = PrivateAttr() _node_ports = PrivateAttr() _used_default_value: bool = PrivateAttr(False) @@ -66,6 +73,35 @@ def __init__(self, **data: Any): self.value = self.default_value self._used_default_value = True + async def get_value(self) -> Optional[ItemValue]: + """returns the value of the link after resolving the port links""" + log.debug( + "getting value of %s[%s] containing '%s'", + self.key, + self.property_type, + pformat(self.value), + ) + + if isinstance(self.value, PortLink): + # this is a link to another node + return await port_utils.get_value_link_from_port_link( + # pylint: disable=protected-access + self.value, + self._node_ports._node_ports_creator_cb, + ) + if isinstance(self.value, FileLink): + # let's get the download/upload link from storage + return await port_utils.get_download_link_from_storage( + user_id=self._node_ports.user_id, + value=self.value, + ) + + if isinstance(self.value, DownloadLink): + # this is a downloadable link + return self.value.download_link + + return self.value + async def get(self) -> Optional[ItemConcreteValue]: log.debug( "getting %s[%s] with value %s", @@ -82,20 +118,25 @@ async def get(self) -> Optional[ItemConcreteValue]: # this is a link to another node value = await port_utils.get_value_from_link( # pylint: disable=protected-access - self.key, - self.value, - self.file_to_key_map, - self._node_ports._node_ports_creator_cb, + key=self.key, + value=self.value, + fileToKeyMap=self.file_to_key_map, + node_port_creator=self._node_ports._node_ports_creator_cb, ) elif isinstance(self.value, FileLink): # this is a link from storage value = await port_utils.pull_file_from_store( - self._node_ports.user_id, self.key, self.file_to_key_map, self.value + user_id=self._node_ports.user_id, + key=self.key, + fileToKeyMap=self.file_to_key_map, + value=self.value, ) elif isinstance(self.value, DownloadLink): # this is a downloadable link value = await port_utils.pull_file_from_download_link( - self.key, self.file_to_key_map, self.value + key=self.key, + fileToKeyMap=self.file_to_key_map, + value=self.value, ) else: # this is directly the value @@ -108,7 +149,10 @@ async def get(self) -> Optional[ItemConcreteValue]: async def _set(self, new_value: ItemConcreteValue) -> None: log.debug( - "setting %s[%s] with value %s", self.key, self.property_type, new_value + "setting %s[%s] with concrete value %s", + self.key, + self.property_type, + new_value, ) final_value: Optional[DataItemValue] = None if new_value is not None: @@ -119,10 +163,10 @@ async def _set(self, new_value: ItemConcreteValue) -> None: if not converted_value.exists() or not converted_value.is_file(): raise InvalidItemTypeError(self.property_type, str(new_value)) final_value = await port_utils.push_file_to_store( - converted_value, - self._node_ports.user_id, - self._node_ports.project_id, - self._node_ports.node_uuid, + file=converted_value, + user_id=self._node_ports.user_id, + project_id=self._node_ports.project_id, + node_id=self._node_ports.node_uuid, ) else: final_value = converted_value @@ -134,3 +178,25 @@ async def set(self, new_value: ItemConcreteValue) -> None: """sets a value to the port, by default it is also stored in the database""" await self._set(new_value) await self._node_ports.save_to_db_cb(self._node_ports) + + async def set_value(self, new_value: Optional[ItemValue]) -> None: + """set the value on the port using a value (e.g. link for the file)""" + log.debug( + "setting %s[%s] with value %s", self.key, self.property_type, new_value + ) + final_value: Optional[DataItemValue] = None + if port_utils.is_file_type(self.property_type) and new_value is not None: + if not isinstance(new_value, AnyUrl): + raise InvalidItemTypeError(self.property_type, f"{new_value}") + final_value = await port_utils.get_file_link_from_url( + new_value, + self._node_ports.user_id, + self._node_ports.project_id, + self._node_ports.node_uuid, + ) + else: + final_value = self._py_value_converter(new_value) + + self.value = final_value + self._used_default_value = False + await self._node_ports.save_to_db_cb(self._node_ports) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py index c0ce9e6db86..62567549bfe 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port_utils.py @@ -3,51 +3,101 @@ from pathlib import Path from typing import Any, Callable, Coroutine, Dict, Optional +from pydantic import AnyUrl +from pydantic.tools import parse_obj_as from yarl import URL from ..node_ports_common import config, data_items_utils, filemanager -from .links import DownloadLink, FileLink, ItemConcreteValue, PortLink +from .links import DownloadLink, FileLink, ItemConcreteValue, ItemValue, PortLink log = logging.getLogger(__name__) +async def get_value_link_from_port_link( + value: PortLink, + node_port_creator: Callable[[str], Coroutine[Any, Any, Any]], +) -> Optional[ItemValue]: + log.debug("Getting value link %s", value) + # create a node ports for the other node + other_nodeports = await node_port_creator(value.node_uuid) + # get the port value through that guy + log.debug("Received node from DB %s, now returning value link", other_nodeports) + + other_value: Optional[ItemValue] = await other_nodeports.get_value_link( + value.output + ) + return other_value + + async def get_value_from_link( key: str, value: PortLink, fileToKeyMap: Optional[Dict[str, str]], node_port_creator: Callable[[str], Coroutine[Any, Any, Any]], -) -> ItemConcreteValue: +) -> Optional[ItemConcreteValue]: log.debug("Getting value %s", value) # create a node ports for the other node other_nodeports = await node_port_creator(value.node_uuid) # get the port value through that guy log.debug("Received node from DB %s, now returning value", other_nodeports) - value = await other_nodeports.get(value.output) - if isinstance(value, Path): - file_name = value.name + other_value: Optional[ItemConcreteValue] = await other_nodeports.get(value.output) + if isinstance(other_value, Path): + file_name = other_value.name # move the file to the right final location # if a file alias is present use it if fileToKeyMap: file_name = next(iter(fileToKeyMap)) file_path = data_items_utils.create_file_path(key, file_name) - if value == file_path: + if other_value == file_path: # this is a corner case: in case the output key of the other node has the same name as the input key - return value + return other_value if file_path.exists(): file_path.unlink() file_path.parent.mkdir(parents=True, exist_ok=True) - shutil.move(str(value), str(file_path)) - value = file_path + shutil.move(f"{other_value}", file_path) + other_value = file_path - return value + return other_value + + +async def get_download_link_from_storage( + user_id: int, + value: FileLink, +) -> Optional[AnyUrl]: + log.debug("getting link to file from storage %s", value) + link = await filemanager.get_download_link_from_s3( + user_id=user_id, + store_id=f"{value.store}", + s3_object=value.path, + ) + return parse_obj_as(AnyUrl, f"{link}") if link else None + + +async def get_upload_link_from_storage( + user_id: int, + project_id: str, + node_id: str, + file_name: str, +) -> AnyUrl: + log.debug("getting link to file from storage for %s", file_name) + s3_object = data_items_utils.encode_file_id(Path(file_name), project_id, node_id) + _, link = await filemanager.get_upload_link_from_s3( + user_id=user_id, + store_name=config.STORE, + s3_object=s3_object, + ) + return parse_obj_as(AnyUrl, f"{link}") async def pull_file_from_store( - user_id: int, key: str, fileToKeyMap: Optional[Dict[str, str]], value: FileLink + user_id: int, + key: str, + fileToKeyMap: Optional[Dict[str, str]], + value: FileLink, ) -> Path: - log.debug("Getting value from storage %s", value) + log.debug("pulling file from storage %s", value) # do not make any assumption about s3_path, it is a str containing stuff that can be anything depending on the store local_path = data_items_utils.create_folder_path(key) downloaded_file = await filemanager.download_file_from_s3( @@ -69,7 +119,10 @@ async def pull_file_from_store( async def push_file_to_store( - file: Path, user_id: int, project_id: str, node_id: str + file: Path, + user_id: int, + project_id: str, + node_id: str, ) -> FileLink: log.debug("file path %s will be uploaded to s3", file) s3_object = data_items_utils.encode_file_id(file, project_id, node_id) @@ -84,7 +137,9 @@ async def push_file_to_store( async def pull_file_from_download_link( - key: str, fileToKeyMap: Optional[Dict[str, str]], value: DownloadLink + key: str, + fileToKeyMap: Optional[Dict[str, str]], + value: DownloadLink, ) -> Path: log.debug( "Getting value from download link [%s] with label %s", @@ -112,3 +167,22 @@ async def pull_file_from_download_link( def is_file_type(port_type: str) -> bool: return port_type.startswith("data:") + + +async def get_file_link_from_url( + new_value: AnyUrl, + user_id: int, + project_id: str, + node_id: str, +) -> FileLink: + log.debug("url %s will now be converted to a file link", new_value) + s3_object = data_items_utils.encode_file_id( + Path(new_value.path), project_id, node_id + ) + store_id, e_tag = await filemanager.get_file_metadata( + user_id=user_id, + store_id="0", + s3_object=s3_object, + ) + log.debug("file meta data for %s found, received ETag %s", new_value, e_tag) + return FileLink(store=store_id, path=s3_object, e_tag=e_tag) diff --git a/services/director/src/simcore_service_director/cache_request_decorator.py b/services/director/src/simcore_service_director/cache_request_decorator.py index d4da7644d29..431a7216e90 100644 --- a/services/director/src/simcore_service_director/cache_request_decorator.py +++ b/services/director/src/simcore_service_director/cache_request_decorator.py @@ -2,7 +2,6 @@ from typing import Coroutine, Dict, Tuple from aiohttp import web - from simcore_service_director import config @@ -20,7 +19,7 @@ async def wrapped( resp_data, resp_headers = await func(app, url, method, *args, **kwargs) - if is_cache_enabled: + if is_cache_enabled and not no_cache: cache_data = app[config.APP_REGISTRY_CACHE_DATA_KEY] cache_data[cache_key] = (resp_data, resp_headers) diff --git a/services/director/src/simcore_service_director/main.py b/services/director/src/simcore_service_director/main.py index 51e9314d8a8..0bf6edccc57 100644 --- a/services/director/src/simcore_service_director/main.py +++ b/services/director/src/simcore_service_director/main.py @@ -10,6 +10,8 @@ from simcore_service_director.monitoring import setup_app_monitoring from simcore_service_director.rest import routing +from .registry_proxy import setup_registry + log = logging.getLogger(__name__) @@ -19,6 +21,7 @@ def setup_app() -> web.Application: # NOTE: ensure client session is context is run first, then any further get_client_sesions will be correctly closed app.cleanup_ctx.append(persistent_client_session) + app.cleanup_ctx.append(setup_registry) registry_cache_task.setup(app) diff --git a/services/director/src/simcore_service_director/registry_cache_task.py b/services/director/src/simcore_service_director/registry_cache_task.py index 2545fdbd483..10eca38b2b7 100644 --- a/services/director/src/simcore_service_director/registry_cache_task.py +++ b/services/director/src/simcore_service_director/registry_cache_task.py @@ -1,11 +1,11 @@ import asyncio import logging +from typing import AsyncIterator from aiohttp import web - +from servicelib.utils import logged_gather from simcore_service_director import config, exceptions, registry_proxy from simcore_service_director.config import APP_REGISTRY_CACHE_DATA_KEY -from servicelib.utils import logged_gather _logger = logging.getLogger(__name__) @@ -14,6 +14,7 @@ async def registry_caching_task(app: web.Application) -> None: try: + _logger.info("%s: initializing cache...", TASK_NAME) app[APP_REGISTRY_CACHE_DATA_KEY].clear() await registry_proxy.list_services(app, registry_proxy.ServiceType.ALL) @@ -59,7 +60,7 @@ async def registry_caching_task(app: web.Application) -> None: app[APP_REGISTRY_CACHE_DATA_KEY].clear() -async def setup_registry_caching_task(app: web.Application) -> None: +async def setup_registry_caching_task(app: web.Application) -> AsyncIterator[None]: app[APP_REGISTRY_CACHE_DATA_KEY] = {} app[TASK_NAME] = asyncio.get_event_loop().create_task(registry_caching_task(app)) diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index ce46b42bcd7..b1eb08bb9aa 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -6,11 +6,16 @@ import re from http import HTTPStatus from pprint import pformat -from typing import Any, Dict, List, Tuple +from typing import Any, AsyncIterator, Dict, List, Tuple from aiohttp import BasicAuth, ClientSession, client_exceptions, web +from aiohttp.client import ClientTimeout from simcore_service_director import config, exceptions from simcore_service_director.cache_request_decorator import cache_requests +from tenacity import retry +from tenacity.before_sleep import before_sleep_log +from tenacity.retry import retry_if_result +from tenacity.wait import wait_fixed from yarl import URL from .config import APP_CLIENT_SESSION_KEY @@ -28,13 +33,13 @@ class ServiceType(enum.Enum): - ALL: str = "" - COMPUTATIONAL: str = "comp" - DYNAMIC: str = "dynamic" + ALL = "" + COMPUTATIONAL = "comp" + DYNAMIC = "dynamic" async def _basic_auth_registry_request( - app: web.Application, path: str, method: str + app: web.Application, path: str, method: str, **session_kwargs ) -> Tuple[Dict, Dict]: if not config.REGISTRY_URL: raise exceptions.DirectorException("URL to registry is not defined") @@ -42,6 +47,7 @@ async def _basic_auth_registry_request( url = URL( f"{'https' if config.REGISTRY_SSL else 'http'}://{config.REGISTRY_URL}{path}" ) + logger.debug("Requesting registry using %s", url) # try the registry with basic authentication first, spare 1 call resp_data: Dict = {} resp_headers: Dict = {} @@ -53,12 +59,14 @@ async def _basic_auth_registry_request( session = app[APP_CLIENT_SESSION_KEY] try: - async with session.request(method.lower(), url, auth=auth) as response: + async with session.request( + method.lower(), url, auth=auth, **session_kwargs + ) as response: if response.status == HTTPStatus.UNAUTHORIZED: logger.debug("Registry unauthorized request: %s", await response.text()) # basic mode failed, test with other auth mode resp_data, resp_headers = await _auth_registry_request( - url, method, response.headers, session + url, method, response.headers, session, **session_kwargs ) elif response.status == HTTPStatus.NOT_FOUND: @@ -85,7 +93,7 @@ async def _basic_auth_registry_request( async def _auth_registry_request( - url: URL, method: str, auth_headers: Dict, session: ClientSession + url: URL, method: str, auth_headers: Dict, session: ClientSession, **kwargs ) -> Tuple[Dict, Dict]: if not config.REGISTRY_AUTH or not config.REGISTRY_USER or not config.REGISTRY_PW: raise exceptions.RegistryConnectionError( @@ -114,7 +122,7 @@ async def _auth_registry_request( token_url = URL(auth_details["realm"]).with_query( service=auth_details["service"], scope=auth_details["scope"] ) - async with session.get(token_url, auth=auth) as token_resp: + async with session.get(token_url, auth=auth, **kwargs) as token_resp: if not token_resp.status == HTTPStatus.OK: raise exceptions.RegistryConnectionError( "Unknown error while authentifying with registry: {}".format( @@ -124,7 +132,7 @@ async def _auth_registry_request( bearer_code = (await token_resp.json())["token"] headers = {"Authorization": "Bearer {}".format(bearer_code)} async with getattr(session, method.lower())( - url, headers=headers + url, headers=headers, **kwargs ) as resp_wtoken: if resp_wtoken.status == HTTPStatus.NOT_FOUND: logger.exception("path to registry not found: %s", url) @@ -140,7 +148,9 @@ async def _auth_registry_request( return (resp_data, resp_headers) elif auth_type == "Basic": # basic authentication should not be since we tried already... - async with getattr(session, method.lower())(url, auth=auth) as resp_wbasic: + async with getattr(session, method.lower())( + url, auth=auth, **kwargs + ) as resp_wbasic: if resp_wbasic.status == HTTPStatus.NOT_FOUND: logger.exception("path to registry not found: %s", url) raise exceptions.ServiceNotAvailableError(str(url)) @@ -159,16 +169,49 @@ async def _auth_registry_request( async def registry_request( - app: web.Application, path: str, method: str = "GET", no_cache: bool = False + app: web.Application, + path: str, + method: str = "GET", + no_cache: bool = False, + **session_kwargs, ) -> Tuple[Dict, Dict]: logger.debug( "Request to registry: path=%s, method=%s. no_cache=%s", path, method, no_cache ) return await cache_requests(_basic_auth_registry_request, no_cache)( - app, path, method + app, path, method, **session_kwargs ) +async def is_registry_responsive(app: web.Application) -> bool: + path = "/v2/" + try: + await registry_request( + app, path, no_cache=True, timeout=ClientTimeout(total=1.0) + ) + return True + except (exceptions.DirectorException, asyncio.TimeoutError) as exc: + logger.debug("Registry not responsive: %s", exc) + return False + + +async def setup_registry(app: web.Application) -> AsyncIterator[None]: + logger.debug("pinging registry...") + + @retry( + wait=wait_fixed(2), + before_sleep=before_sleep_log(logger, logging.WARNING), + retry=retry_if_result(lambda result: result == False), + reraise=True, + ) + async def wait_until_registry_responsive(app: web.Application) -> bool: + return await is_registry_responsive(app) + + await wait_until_registry_responsive(app) + logger.info("Connected to docker registry") + yield + + async def _list_repositories(app: web.Application) -> List[str]: logger.debug("listing repositories") # if there are more repos, the Link will be available in the response headers until none available diff --git a/services/web/server/tests/integration/01/test_garbage_collection.py b/services/web/server/tests/integration/01/test_garbage_collection.py index 0592f3aac7e..7e61f591447 100644 --- a/services/web/server/tests/integration/01/test_garbage_collection.py +++ b/services/web/server/tests/integration/01/test_garbage_collection.py @@ -29,6 +29,7 @@ ) from simcore_service_webserver.login.module_setup import setup_login from simcore_service_webserver.projects.module_setup import setup_projects +from simcore_service_webserver.resource_manager import garbage_collector from simcore_service_webserver.resource_manager.module_setup import ( setup_resource_manager, ) @@ -55,7 +56,7 @@ GARBAGE_COLLECTOR_INTERVAL = 1 SERVICE_DELETION_DELAY = 1 # ensure enough time has passed and GC was triggered -WAIT_FOR_COMPLETE_GC_CYCLE = GARBAGE_COLLECTOR_INTERVAL + SERVICE_DELETION_DELAY + 1 +WAIT_FOR_COMPLETE_GC_CYCLE = GARBAGE_COLLECTOR_INTERVAL + SERVICE_DELETION_DELAY + 2 @pytest.fixture(autouse=True) @@ -347,18 +348,31 @@ async def assert_one_owner_for_project( ################ end utils +@pytest.fixture +def mock_garbage_collector_task(mocker): + """patch the setup of the garbage collector so we can call it manually""" + mocker.patch( + "simcore_service_webserver.resource_manager.module_setup.setup_garbage_collector", + return_value="", + ) + + async def test_t1_while_guest_is_connected_no_resources_are_removed( - client, socketio_client_factory: Callable, aiopg_engine, redis_client + mock_garbage_collector_task, + client, + socketio_client_factory: Callable, + aiopg_engine, + redis_client, ): """while a GUEST user is connected GC will not remove none of its projects nor the user itself""" logged_guest_user = await login_guest_user(client) empty_guest_user_project = await new_project(client, logged_guest_user) - assert await assert_users_count(aiopg_engine, 1) is True assert await assert_projects_count(aiopg_engine, 1) is True await connect_to_socketio(client, logged_guest_user, socketio_client_factory) - await asyncio.sleep(WAIT_FOR_COMPLETE_GC_CYCLE) + await asyncio.sleep(SERVICE_DELETION_DELAY + 1) + await garbage_collector.collect_garbage(app=client.app) assert await assert_user_in_database(aiopg_engine, logged_guest_user) is True assert ( @@ -367,6 +381,7 @@ async def test_t1_while_guest_is_connected_no_resources_are_removed( async def test_t2_cleanup_resources_after_browser_is_closed( + mock_garbage_collector_task, simcore_services, client, socketio_client_factory: Callable, @@ -382,7 +397,8 @@ async def test_t2_cleanup_resources_after_browser_is_closed( sio_connection_data = await connect_to_socketio( client, logged_guest_user, socketio_client_factory ) - await asyncio.sleep(WAIT_FOR_COMPLETE_GC_CYCLE) + await asyncio.sleep(SERVICE_DELETION_DELAY + 1) + await garbage_collector.collect_garbage(app=client.app) # check user and project are still in the DB assert await assert_user_in_database(aiopg_engine, logged_guest_user) is True @@ -391,7 +407,8 @@ async def test_t2_cleanup_resources_after_browser_is_closed( ) await disconnect_user_from_socketio(client, sio_connection_data) - await asyncio.sleep(WAIT_FOR_COMPLETE_GC_CYCLE) + await asyncio.sleep(SERVICE_DELETION_DELAY + 1) + await garbage_collector.collect_garbage(app=client.app) # check user and project are no longer in the DB async with aiopg_engine.acquire() as conn: diff --git a/services/web/server/tests/integration/02/test_computation.py b/services/web/server/tests/integration/02/test_computation.py index 1902bc9ef91..e6b1b98d9d9 100644 --- a/services/web/server/tests/integration/02/test_computation.py +++ b/services/web/server/tests/integration/02/test_computation.py @@ -38,7 +38,8 @@ from simcore_service_webserver.socketio.module_setup import setup_socketio from simcore_service_webserver.users import setup_users from socketio.exceptions import ConnectionError as SocketConnectionError -from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed +from tenacity import retry, retry_if_exception_type, wait_fixed +from tenacity.stop import stop_after_delay from yarl import URL current_dir = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve().parent @@ -160,7 +161,7 @@ def _assert_sleeper_services_completed( @retry( reraise=True, - stop=stop_after_attempt(TIMEOUT_SECONDS / WAIT_TIME), + stop=stop_after_delay(TIMEOUT_SECONDS), wait=wait_fixed(WAIT_TIME), retry=retry_if_exception_type(AssertionError), ) @@ -259,18 +260,13 @@ async def test_start_pipeline( ) if not error: - # starting again should be disallowed + # starting again should be disallowed, since it's already running resp = await client.post(url_start) assert ( resp.status == web.HTTPForbidden.status_code if user_role == UserRole.GUEST else expected.forbidden.status_code ) - # FIXME: to PC: I have an issue here with how the webserver middleware transforms errors (see director_v2.py) - # await assert_status( - # resp, - # web.HTTPForbidden if user_role == UserRole.GUEST else expected.forbidden, - # ) assert "pipeline_id" in data assert data["pipeline_id"] == project_id @@ -282,9 +278,11 @@ async def test_start_pipeline( mock_workbench_adjacency_list, check_outputs=False, ) + # wait for the computation to stop _assert_sleeper_services_completed( project_id, postgres_session, StateType.SUCCESS, mock_workbench_payload ) + # restart the computation resp = await client.post(url_start) data, error = await assert_status( resp, web.HTTPCreated if user_role == UserRole.GUEST else expected.created @@ -302,6 +300,7 @@ async def test_start_pipeline( resp, web.HTTPNoContent if user_role == UserRole.GUEST else expected.no_content ) if not error: + # now wait for it to stop _assert_sleeper_services_completed( project_id, postgres_session, StateType.ABORTED, mock_workbench_payload ) diff --git a/services/web/server/tests/integration/conftest.py b/services/web/server/tests/integration/conftest.py index a9380f403b0..59ed6eb8493 100644 --- a/services/web/server/tests/integration/conftest.py +++ b/services/web/server/tests/integration/conftest.py @@ -18,10 +18,12 @@ from pathlib import Path from pprint import pprint from typing import Dict, List +from unittest import mock import pytest import trafaret_config import yaml +from pytest_mock import MockerFixture from pytest_simcore.helpers import FIXTURE_CONFIG_CORE_SERVICES_SELECTION from pytest_simcore.helpers.utils_docker import get_service_published_port from pytest_simcore.helpers.utils_login import NewUser @@ -167,7 +169,7 @@ def app_config(_webserver_dev_config: Dict, aiohttp_unused_port) -> Dict: @pytest.fixture -def mock_orphaned_services(mocker): +def mock_orphaned_services(mocker: MockerFixture) -> mock.Mock: remove_orphaned_services = mocker.patch( "simcore_service_webserver.resource_manager.garbage_collector.remove_orphaned_services", return_value="",