Skip to content

director: renew cache in the background instead of clearing cache #1496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
},
"prjOwner": {
"type": "string",
"description": "user uuid"
"format": "idn-email",
"description": "user email"
},
"accessRights": {
"type": "object",
Expand Down Expand Up @@ -106,7 +107,7 @@
"type": "string"
},
"progress": {
"type":"number",
"type": "number",
"maximum": 100,
"minimum": 0,
"description": "the node progress value"
Expand Down Expand Up @@ -158,7 +159,10 @@
],
"properties": {
"store": {
"type": ["string", "integer"]
"type": [
"string",
"integer"
]
},
"dataset": {
"type": "string"
Expand All @@ -181,7 +185,11 @@
"patternProperties": {
"^[-_a-zA-Z0-9]+$": {
"type": "string",
"enum": ["Invisible", "ReadOnly", "ReadAndWrite"],
"enum": [
"Invisible",
"ReadOnly",
"ReadAndWrite"
],
"default": "ReadAndWrite",
"examples": [
"ReadOnly"
Expand Down Expand Up @@ -224,7 +232,10 @@
],
"properties": {
"store": {
"type": ["string", "integer"]
"type": [
"string",
"integer"
]
},
"dataset": {
"type": "string"
Expand Down Expand Up @@ -258,7 +269,10 @@
]
},
"parent": {
"type": ["string", "null"],
"type": [
"string",
"null"
],
"pattern": "^\\S+$",
"description": "Parent's (group-nodes') node ID s.",
"examples": [
Expand Down Expand Up @@ -293,4 +307,4 @@
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
from functools import wraps
from typing import Callable, Dict, Tuple
from typing import Coroutine, Dict, Tuple

from aiohttp import web

from simcore_service_director import config
from yarl import URL


def cache_requests(http_request: Callable):
@wraps(http_request)
async def wrapped(app: web.Application, url: URL, method: str ="GET") -> Tuple[Dict, Dict]:
def cache_requests(func: Coroutine, no_cache: bool = False):
@wraps(func)
async def wrapped(
app: web.Application, url: str, method: str, *args, **kwargs
) -> Tuple[Dict, Dict]:
is_cache_enabled = config.DIRECTOR_REGISTRY_CACHING and method == "GET"
if is_cache_enabled:
cache_key = f"{url}:{method}"
if is_cache_enabled and not no_cache:
cache_data = app[config.APP_REGISTRY_CACHE_DATA_KEY]
cache_key = "{}_{}".format(url, method)
if cache_key in cache_data:
return cache_data[cache_key]

resp_data, resp_headers = await http_request(app, url, method)
resp_data, resp_headers = await func(app, url, method, *args, **kwargs)

if is_cache_enabled:
cache_data = app[config.APP_REGISTRY_CACHE_DATA_KEY]
cache_data[cache_key] = (resp_data, resp_headers)

return (resp_data, resp_headers)

return wrapped

__all__ = [
"cache_requests"
]

__all__ = ["cache_requests"]
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,60 @@

from simcore_service_director import config, exceptions, registry_proxy
from simcore_service_director.config import APP_REGISTRY_CACHE_DATA_KEY
from servicelib.utils import logged_gather

_logger = logging.getLogger(__name__)

TASK_NAME: str = __name__ + "_registry_caching_task"


async def registry_caching_task(app: web.Application) -> None:
try:
_logger.info("%s: initializing...", TASK_NAME)
_logger.info("%s: initializing cache...", TASK_NAME)
app[APP_REGISTRY_CACHE_DATA_KEY].clear()
await registry_proxy.list_services(app, registry_proxy.ServiceType.ALL)
_logger.info("%s: initialisation completed", TASK_NAME)
while True:
_logger.info("%s: waking up, cleaning registry cache...", TASK_NAME)
app[APP_REGISTRY_CACHE_DATA_KEY].clear()
_logger.info("%s: caching services list...", TASK_NAME)
_logger.info("%s: waking up, refreshing cache...", TASK_NAME)
try:
await registry_proxy.list_services(app, registry_proxy.ServiceType.ALL)
keys = []
refresh_tasks = []
for key in app[APP_REGISTRY_CACHE_DATA_KEY]:
path, method = key.split(":")
_logger.debug("refresh %s:%s", method, path)
refresh_tasks.append(
registry_proxy.registry_request(
app, path, method, no_cache=True
)
)
keys = list(app[APP_REGISTRY_CACHE_DATA_KEY].keys())
results = await logged_gather(*refresh_tasks)

for key, result in zip(keys, results):
app[APP_REGISTRY_CACHE_DATA_KEY][key] = result

except exceptions.DirectorException:
# if the registry is temporarily not available this might happen
_logger.exception(
"%s: exception while refreshing cache, clean cache...", TASK_NAME
)
app[APP_REGISTRY_CACHE_DATA_KEY].clear()

_logger.info("%s: sleeping for %ss...", TASK_NAME, config.DIRECTOR_REGISTRY_CACHING_TTL)
_logger.info(
"cache refreshed %s: sleeping for %ss...",
TASK_NAME,
config.DIRECTOR_REGISTRY_CACHING_TTL,
)
await asyncio.sleep(config.DIRECTOR_REGISTRY_CACHING_TTL)
except asyncio.CancelledError:
_logger.info("%s: cancelling task...", TASK_NAME)
except Exception: #pylint: disable=broad-except
_logger.exception("%s: exception while retrieving list of services in cache", TASK_NAME)
except Exception: # pylint: disable=broad-except
_logger.exception("%s: Unhandled exception while refreshing cache", TASK_NAME)
finally:
_logger.info("%s: finished task...clearing cache...", TASK_NAME)
app[APP_REGISTRY_CACHE_DATA_KEY].clear()


async def setup_registry_caching_task(app: web.Application) -> None:
app[APP_REGISTRY_CACHE_DATA_KEY] = {}
app[TASK_NAME] = asyncio.get_event_loop().create_task(registry_caching_task(app))
Expand All @@ -44,13 +69,10 @@ async def setup_registry_caching_task(app: web.Application) -> None:
task.cancel()
await task


def setup(app: web.Application) -> None:
if config.DIRECTOR_REGISTRY_CACHING:
app.cleanup_ctx.append(setup_registry_caching_task)



__all__ = [
"setup",
"APP_REGISTRY_CACHE_DATA_KEY"
]
__all__ = ["setup", "APP_REGISTRY_CACHE_DATA_KEY"]
Loading