diff --git a/services/director/src/simcore_service_director/api/v0/schemas/project-v0.0.1.json b/services/director/src/simcore_service_director/api/v0/schemas/project-v0.0.1.json index 253d6ce7c47..d40e6ceb1a4 100644 --- a/services/director/src/simcore_service_director/api/v0/schemas/project-v0.0.1.json +++ b/services/director/src/simcore_service_director/api/v0/schemas/project-v0.0.1.json @@ -41,7 +41,8 @@ }, "prjOwner": { "type": "string", - "description": "user uuid" + "format": "idn-email", + "description": "user email" }, "accessRights": { "type": "object", @@ -106,7 +107,7 @@ "type": "string" }, "progress": { - "type":"number", + "type": "number", "maximum": 100, "minimum": 0, "description": "the node progress value" @@ -158,7 +159,10 @@ ], "properties": { "store": { - "type": ["string", "integer"] + "type": [ + "string", + "integer" + ] }, "dataset": { "type": "string" @@ -181,7 +185,11 @@ "patternProperties": { "^[-_a-zA-Z0-9]+$": { "type": "string", - "enum": ["Invisible", "ReadOnly", "ReadAndWrite"], + "enum": [ + "Invisible", + "ReadOnly", + "ReadAndWrite" + ], "default": "ReadAndWrite", "examples": [ "ReadOnly" @@ -224,7 +232,10 @@ ], "properties": { "store": { - "type": ["string", "integer"] + "type": [ + "string", + "integer" + ] }, "dataset": { "type": "string" @@ -258,7 +269,10 @@ ] }, "parent": { - "type": ["string", "null"], + "type": [ + "string", + "null" + ], "pattern": "^\\S+$", "description": "Parent's (group-nodes') node ID s.", "examples": [ @@ -293,4 +307,4 @@ } } } -} +} \ No newline at end of file diff --git a/services/director/src/simcore_service_director/cache_request_decorator.py b/services/director/src/simcore_service_director/cache_request_decorator.py index 5ca18fcb17e..d4da7644d29 100644 --- a/services/director/src/simcore_service_director/cache_request_decorator.py +++ b/services/director/src/simcore_service_director/cache_request_decorator.py @@ -1,30 +1,32 @@ from functools import wraps -from typing import Callable, Dict, Tuple +from typing import Coroutine, Dict, Tuple from aiohttp import web + from simcore_service_director import config -from yarl import URL -def cache_requests(http_request: Callable): - @wraps(http_request) - async def wrapped(app: web.Application, url: URL, method: str ="GET") -> Tuple[Dict, Dict]: +def cache_requests(func: Coroutine, no_cache: bool = False): + @wraps(func) + async def wrapped( + app: web.Application, url: str, method: str, *args, **kwargs + ) -> Tuple[Dict, Dict]: is_cache_enabled = config.DIRECTOR_REGISTRY_CACHING and method == "GET" - if is_cache_enabled: + cache_key = f"{url}:{method}" + if is_cache_enabled and not no_cache: cache_data = app[config.APP_REGISTRY_CACHE_DATA_KEY] - cache_key = "{}_{}".format(url, method) if cache_key in cache_data: return cache_data[cache_key] - resp_data, resp_headers = await http_request(app, url, method) + resp_data, resp_headers = await func(app, url, method, *args, **kwargs) if is_cache_enabled: + cache_data = app[config.APP_REGISTRY_CACHE_DATA_KEY] cache_data[cache_key] = (resp_data, resp_headers) return (resp_data, resp_headers) return wrapped -__all__ = [ - "cache_requests" -] + +__all__ = ["cache_requests"] diff --git a/services/director/src/simcore_service_director/registry_cache_task.py b/services/director/src/simcore_service_director/registry_cache_task.py index e5e25d9018c..2545fdbd483 100644 --- a/services/director/src/simcore_service_director/registry_cache_task.py +++ b/services/director/src/simcore_service_director/registry_cache_task.py @@ -5,35 +5,60 @@ from simcore_service_director import config, exceptions, registry_proxy from simcore_service_director.config import APP_REGISTRY_CACHE_DATA_KEY +from servicelib.utils import logged_gather _logger = logging.getLogger(__name__) TASK_NAME: str = __name__ + "_registry_caching_task" + + async def registry_caching_task(app: web.Application) -> None: try: - _logger.info("%s: initializing...", TASK_NAME) + _logger.info("%s: initializing cache...", TASK_NAME) app[APP_REGISTRY_CACHE_DATA_KEY].clear() + await registry_proxy.list_services(app, registry_proxy.ServiceType.ALL) _logger.info("%s: initialisation completed", TASK_NAME) while True: - _logger.info("%s: waking up, cleaning registry cache...", TASK_NAME) - app[APP_REGISTRY_CACHE_DATA_KEY].clear() - _logger.info("%s: caching services list...", TASK_NAME) + _logger.info("%s: waking up, refreshing cache...", TASK_NAME) try: - await registry_proxy.list_services(app, registry_proxy.ServiceType.ALL) + keys = [] + refresh_tasks = [] + for key in app[APP_REGISTRY_CACHE_DATA_KEY]: + path, method = key.split(":") + _logger.debug("refresh %s:%s", method, path) + refresh_tasks.append( + registry_proxy.registry_request( + app, path, method, no_cache=True + ) + ) + keys = list(app[APP_REGISTRY_CACHE_DATA_KEY].keys()) + results = await logged_gather(*refresh_tasks) + + for key, result in zip(keys, results): + app[APP_REGISTRY_CACHE_DATA_KEY][key] = result + except exceptions.DirectorException: # if the registry is temporarily not available this might happen + _logger.exception( + "%s: exception while refreshing cache, clean cache...", TASK_NAME + ) app[APP_REGISTRY_CACHE_DATA_KEY].clear() - _logger.info("%s: sleeping for %ss...", TASK_NAME, config.DIRECTOR_REGISTRY_CACHING_TTL) + _logger.info( + "cache refreshed %s: sleeping for %ss...", + TASK_NAME, + config.DIRECTOR_REGISTRY_CACHING_TTL, + ) await asyncio.sleep(config.DIRECTOR_REGISTRY_CACHING_TTL) except asyncio.CancelledError: _logger.info("%s: cancelling task...", TASK_NAME) - except Exception: #pylint: disable=broad-except - _logger.exception("%s: exception while retrieving list of services in cache", TASK_NAME) + except Exception: # pylint: disable=broad-except + _logger.exception("%s: Unhandled exception while refreshing cache", TASK_NAME) finally: _logger.info("%s: finished task...clearing cache...", TASK_NAME) app[APP_REGISTRY_CACHE_DATA_KEY].clear() + async def setup_registry_caching_task(app: web.Application) -> None: app[APP_REGISTRY_CACHE_DATA_KEY] = {} app[TASK_NAME] = asyncio.get_event_loop().create_task(registry_caching_task(app)) @@ -44,13 +69,10 @@ async def setup_registry_caching_task(app: web.Application) -> None: task.cancel() await task + def setup(app: web.Application) -> None: if config.DIRECTOR_REGISTRY_CACHING: app.cleanup_ctx.append(setup_registry_caching_task) - -__all__ = [ - "setup", - "APP_REGISTRY_CACHE_DATA_KEY" -] +__all__ = ["setup", "APP_REGISTRY_CACHE_DATA_KEY"] diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index 746014c80fd..b01e18e4afa 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -1,4 +1,4 @@ -#pylint: disable=C0111 +# pylint: disable=C0111 import asyncio import enum import json @@ -12,142 +12,195 @@ from .config import APP_CLIENT_SESSION_KEY -DEPENDENCIES_LABEL_KEY: str = 'simcore.service.dependencies' +DEPENDENCIES_LABEL_KEY: str = "simcore.service.dependencies" NUMBER_OF_RETRIEVED_REPOS: int = 50 NUMBER_OF_RETRIEVED_TAGS: int = 50 _logger = logging.getLogger(__name__) + class ServiceType(enum.Enum): ALL: str = "" COMPUTATIONAL: str = "comp" DYNAMIC: str = "dynamic" -async def _auth_registry_request(url: URL, method: str, auth_headers: Dict, session: ClientSession) -> Tuple[Dict, Dict]: + +async def _basic_auth_registry_request( + app: web.Application, path: str, method: str +) -> Tuple[Dict, Dict]: + if not config.REGISTRY_URL: + raise exceptions.DirectorException("URL to registry is not defined") + + url = URL( + f"{'https' if config.REGISTRY_SSL else 'http'}://{config.REGISTRY_URL}{path}" + ) + # try the registry with basic authentication first, spare 1 call + resp_data: Dict = {} + resp_headers: Dict = {} + auth = ( + BasicAuth(login=config.REGISTRY_USER, password=config.REGISTRY_PW) + if config.REGISTRY_AUTH and config.REGISTRY_USER and config.REGISTRY_PW + else None + ) + + session = app[APP_CLIENT_SESSION_KEY] + try: + async with getattr(session, method.lower())(url, auth=auth) as response: + if response.status == 404: + _logger.exception("path to registry not found: %s", url) + raise exceptions.ServiceNotAvailableError(str(path)) + if response.status == 401: + # basic mode failed, test with other auth mode + resp_data, resp_headers = await _auth_registry_request( + url, method, response.headers, session + ) + elif response.status > 399: + _logger.exception( + "Unknown error while accessing registry: %s", str(response) + ) + raise exceptions.RegistryConnectionError(str(response)) + else: + # registry that does not need an auth + resp_data = await response.json(content_type=None) + resp_headers = response.headers + + return (resp_data, resp_headers) + except client_exceptions.ClientError as exc: + _logger.exception("Unknown error while accessing registry: %s", str(exc)) + raise exceptions.DirectorException( + f"Unknown error while accessing registry: {str(exc)}" + ) + + +async def _auth_registry_request( + url: URL, method: str, auth_headers: Dict, session: ClientSession +) -> Tuple[Dict, Dict]: if not config.REGISTRY_AUTH or not config.REGISTRY_USER or not config.REGISTRY_PW: - raise exceptions.RegistryConnectionError("Wrong configuration: Authentication to registry is needed!") + raise exceptions.RegistryConnectionError( + "Wrong configuration: Authentication to registry is needed!" + ) # auth issue let's try some authentication get the auth type auth_type = None auth_details: Dict[str, str] = {} for key in auth_headers: if str(key).lower() == "www-authenticate": auth_type, auth_value = str(auth_headers[key]).split(" ", 1) - auth_details = {x.split("=")[0]:x.split("=")[1].strip('"') for x in auth_value.split(",")} + auth_details = { + x.split("=")[0]: x.split("=")[1].strip('"') + for x in auth_value.split(",") + } break if not auth_type: - raise exceptions.RegistryConnectionError("Unknown registry type: cannot deduce authentication method!") + raise exceptions.RegistryConnectionError( + "Unknown registry type: cannot deduce authentication method!" + ) auth = BasicAuth(login=config.REGISTRY_USER, password=config.REGISTRY_PW) # bearer type, it needs a token with all communications if auth_type == "Bearer": # get the token - token_url = URL(auth_details["realm"]).with_query(service=auth_details["service"], scope=auth_details["scope"]) + token_url = URL(auth_details["realm"]).with_query( + service=auth_details["service"], scope=auth_details["scope"] + ) async with session.get(token_url, auth=auth) as token_resp: if not token_resp.status == 200: - raise exceptions.RegistryConnectionError("Unknown error while authentifying with registry: {}".format(str(token_resp))) + raise exceptions.RegistryConnectionError( + "Unknown error while authentifying with registry: {}".format( + str(token_resp) + ) + ) bearer_code = (await token_resp.json())["token"] headers = {"Authorization": "Bearer {}".format(bearer_code)} - async with getattr(session, method.lower())(url, headers=headers) as resp_wtoken: + async with getattr(session, method.lower())( + url, headers=headers + ) as resp_wtoken: if resp_wtoken.status == 404: _logger.exception("path to registry not found: %s", url) raise exceptions.ServiceNotAvailableError(str(url)) if resp_wtoken.status > 399: - _logger.exception("Unknown error while accessing with token authorized registry: %s", str(resp_wtoken)) + _logger.exception( + "Unknown error while accessing with token authorized registry: %s", + str(resp_wtoken), + ) raise exceptions.RegistryConnectionError(str(resp_wtoken)) resp_data = await resp_wtoken.json(content_type=None) resp_headers = resp_wtoken.headers return (resp_data, resp_headers) elif auth_type == "Basic": - # basic authentication + # basic authentication should not be since we tried already... async with getattr(session, method.lower())(url, auth=auth) as resp_wbasic: if resp_wbasic.status == 404: _logger.exception("path to registry not found: %s", url) raise exceptions.ServiceNotAvailableError(str(url)) if resp_wbasic.status > 399: - _logger.exception("Unknown error while accessing with token authorized registry: %s", str(resp_wbasic)) + _logger.exception( + "Unknown error while accessing with token authorized registry: %s", + str(resp_wbasic), + ) raise exceptions.RegistryConnectionError(str(resp_wbasic)) resp_data = await resp_wbasic.json(content_type=None) resp_headers = resp_wbasic.headers return (resp_data, resp_headers) - raise exceptions.RegistryConnectionError("Unknown registry authentification type: {}".format(url)) - - -@cache_requests -async def _registry_request(app: web.Application, path: URL, method: str ="GET") -> Tuple[Dict, Dict]: - if not config.REGISTRY_URL: - raise exceptions.DirectorException("URL to registry is not defined") + raise exceptions.RegistryConnectionError( + f"Unknown registry authentification type: {url}" + ) - url = URL("{scheme}://{url}".format(scheme="https" if config.REGISTRY_SSL else "http", - url=config.REGISTRY_URL)) - url = url.join(path) - # try the registry with basic authentication first, spare 1 call - resp_data: Dict = {} - resp_headers: Dict = {} - auth = BasicAuth(login=config.REGISTRY_USER, password=config.REGISTRY_PW) \ - if config.REGISTRY_AUTH and config.REGISTRY_USER and config.REGISTRY_PW \ - else None - session = app[APP_CLIENT_SESSION_KEY] - try: - async with getattr(session, method.lower())(url, auth=auth) as response: - if response.status == 404: - _logger.exception("path to registry not found: %s", url) - raise exceptions.ServiceNotAvailableError(str(path)) - if response.status == 401: - resp_data, resp_headers = await _auth_registry_request(url, method, response.headers, session) - elif response.status > 399: - _logger.exception("Unknown error while accessing registry: %s", str(response)) - raise exceptions.RegistryConnectionError(str(response)) - else: - # registry that does not need an auth - resp_data = await response.json(content_type=None) - resp_headers = response.headers +async def registry_request( + app: web.Application, path: str, method: str = "GET", no_cache: bool = False +) -> Tuple[Dict, Dict]: + return await cache_requests(_basic_auth_registry_request, no_cache)( + app, path, method + ) - return (resp_data, resp_headers) - except client_exceptions.ClientError as exc: - _logger.exception("Unknown error while accessing registry: %s", str(exc)) - raise exceptions.RegistryConnectionError(f"Unknown error while accessing registry: {str(exc)}") async def _list_repositories(app: web.Application) -> List[str]: _logger.debug("listing repositories") # if there are more repos, the Link will be available in the response headers until none available - path = URL("v2/_catalog?n={}".format(NUMBER_OF_RETRIEVED_REPOS)) + path = f"/v2/_catalog?n={NUMBER_OF_RETRIEVED_REPOS}" repos_list: List = [] while True: - result, headers = await _registry_request(app, path) + result, headers = await registry_request(app, path) if result["repositories"]: repos_list.extend(result["repositories"]) if "Link" not in headers: break - path = URL(str(headers["Link"]).split(";")[0].strip("<>")) + path = str(headers["Link"]).split(";")[0].strip("<>") _logger.debug("listed %s repositories", len(repos_list)) return repos_list + async def list_image_tags(app: web.Application, image_key: str) -> List[str]: _logger.debug("listing image tags in %s", image_key) image_tags: List = [] # get list of image tags - path = URL("v2/{}/tags/list?n={}".format(image_key, NUMBER_OF_RETRIEVED_TAGS)) + path = f"/v2/{image_key}/tags/list?n={NUMBER_OF_RETRIEVED_TAGS}" while True: - tags, headers = await _registry_request(app, path) + tags, headers = await registry_request(app, path) if tags["tags"]: image_tags.extend(tags["tags"]) if "Link" not in headers: break - path = URL(str(headers["Link"]).split(";")[0].strip("<>")) + path = str(headers["Link"]).split(";")[0].strip("<>") _logger.debug("Found %s image tags in %s", len(image_tags), image_key) return image_tags + async def get_image_labels(app: web.Application, image: str, tag: str) -> Dict: _logger.debug("getting image labels of %s:%s", image, tag) - path = URL("v2/{}/manifests/{}".format(image, tag)) - request_result, _ = await _registry_request(app, path) - labels = json.loads(request_result["history"][0]["v1Compatibility"])["container_config"]["Labels"] + path = f"/v2/{image}/manifests/{tag}" + request_result, _ = await registry_request(app, path) + labels = json.loads(request_result["history"][0]["v1Compatibility"])[ + "container_config" + ]["Labels"] _logger.debug("retrieved labels of image %s:%s: %s", image, tag, request_result) return labels -async def get_image_details(app: web.Application, image_key: str, image_tag: str) -> Dict: + +async def get_image_details( + app: web.Application, image_key: str, image_tag: str +) -> Dict: image_tags: Dict = {} labels = await get_image_labels(app, image_key, image_tag) if not labels: @@ -160,12 +213,17 @@ async def get_image_details(app: web.Application, image_key: str, image_tag: str for label_key in label_data.keys(): image_tags[label_key] = label_data[label_key] except json.decoder.JSONDecodeError: - logging.exception("Error while decoding json formatted data from %s:%s", image_key, image_tag) + logging.exception( + "Error while decoding json formatted data from %s:%s", + image_key, + image_tag, + ) # silently skip this repo return {} return image_tags + async def get_repo_details(app: web.Application, image_key: str) -> List[Dict]: repo_details = [] image_tags = await list_image_tags(app, image_key) @@ -176,6 +234,7 @@ async def get_repo_details(app: web.Application, image_key: str) -> List[Dict]: repo_details.append(image_details) return repo_details + async def list_services(app: web.Application, service_type: ServiceType) -> List[Dict]: _logger.debug("getting list of services") repos = await _list_repositories(app) @@ -199,40 +258,59 @@ async def list_services(app: web.Application, service_type: ServiceType) -> List _logger.error("Exception occured while listing services %s", repo_details) return services -async def list_interactive_service_dependencies(app: web.Application, service_key: str, service_tag: str) -> List[Dict]: + +async def list_interactive_service_dependencies( + app: web.Application, service_key: str, service_tag: str +) -> List[Dict]: image_labels = await get_image_labels(app, service_key, service_tag) dependency_keys = [] if DEPENDENCIES_LABEL_KEY in image_labels: try: dependencies = json.loads(image_labels[DEPENDENCIES_LABEL_KEY]) for dependency in dependencies: - dependency_keys.append({"key":dependency['key'], "tag":dependency['tag']}) + dependency_keys.append( + {"key": dependency["key"], "tag": dependency["tag"]} + ) except json.decoder.JSONDecodeError: - logging.exception("Incorrect json formatting in %s, skipping...", image_labels[DEPENDENCIES_LABEL_KEY]) + logging.exception( + "Incorrect json formatting in %s, skipping...", + image_labels[DEPENDENCIES_LABEL_KEY], + ) return dependency_keys + def _get_prefix(service_type: ServiceType) -> str: return "{}/{}/".format(config.SIMCORE_SERVICES_PREFIX, service_type.value) + def get_service_first_name(image_key: str) -> str: if str(image_key).startswith(_get_prefix(ServiceType.DYNAMIC)): - service_name_suffixes = str(image_key)[len(_get_prefix(ServiceType.DYNAMIC)):] + service_name_suffixes = str(image_key)[len(_get_prefix(ServiceType.DYNAMIC)) :] elif str(image_key).startswith(_get_prefix(ServiceType.COMPUTATIONAL)): - service_name_suffixes = str(image_key)[len(_get_prefix(ServiceType.COMPUTATIONAL)):] + service_name_suffixes = str(image_key)[ + len(_get_prefix(ServiceType.COMPUTATIONAL)) : + ] else: return "invalid service" - _logger.debug("retrieved service name from repo %s : %s", image_key, service_name_suffixes) - return service_name_suffixes.split('/')[0] + _logger.debug( + "retrieved service name from repo %s : %s", image_key, service_name_suffixes + ) + return service_name_suffixes.split("/")[0] + def get_service_last_names(image_key: str) -> str: if str(image_key).startswith(_get_prefix(ServiceType.DYNAMIC)): - service_name_suffixes = str(image_key)[len(_get_prefix(ServiceType.DYNAMIC)):] + service_name_suffixes = str(image_key)[len(_get_prefix(ServiceType.DYNAMIC)) :] elif str(image_key).startswith(_get_prefix(ServiceType.COMPUTATIONAL)): - service_name_suffixes = str(image_key)[len(_get_prefix(ServiceType.COMPUTATIONAL)):] + service_name_suffixes = str(image_key)[ + len(_get_prefix(ServiceType.COMPUTATIONAL)) : + ] else: return "invalid service" service_last_name = str(service_name_suffixes).replace("/", "_") - _logger.debug("retrieved service last name from repo %s : %s", image_key, service_last_name) + _logger.debug( + "retrieved service last name from repo %s : %s", image_key, service_last_name + ) return service_last_name diff --git a/services/director/tests/test_registry_cache_task.py b/services/director/tests/test_registry_cache_task.py index ced8f50e479..e14db5fb914 100644 --- a/services/director/tests/test_registry_cache_task.py +++ b/services/director/tests/test_registry_cache_task.py @@ -26,6 +26,7 @@ def client( yield loop.run_until_complete(aiohttp_client(app, server_kwargs=server_kwargs)) + async def test_registry_caching_task(loop, client, push_services): app = client.app assert app @@ -46,7 +47,7 @@ async def test_registry_caching_task(loop, client, push_services): number_of_computational_services=1, number_of_interactive_services=1 ) # the services shall be updated - await sleep(config.DIRECTOR_REGISTRY_CACHING_TTL) + await sleep(config.DIRECTOR_REGISTRY_CACHING_TTL * 1.1) # NOTE: this can take some time. Sleep increased by 10%. list_of_services = await registry_proxy.list_services( app, registry_proxy.ServiceType.ALL )