5
5
6
6
from simcore_service_director import config , exceptions , registry_proxy
7
7
from simcore_service_director .config import APP_REGISTRY_CACHE_DATA_KEY
8
+ from servicelib .utils import logged_gather
8
9
9
10
_logger = logging .getLogger (__name__ )
10
11
11
12
TASK_NAME : str = __name__ + "_registry_caching_task"
13
+
14
+
12
15
async def registry_caching_task (app : web .Application ) -> None :
13
16
try :
14
- _logger .info ("%s: initializing..." , TASK_NAME )
17
+ _logger .info ("%s: initializing cache ..." , TASK_NAME )
15
18
app [APP_REGISTRY_CACHE_DATA_KEY ].clear ()
19
+ await registry_proxy .list_services (app , registry_proxy .ServiceType .ALL )
16
20
_logger .info ("%s: initialisation completed" , TASK_NAME )
17
21
while True :
18
- _logger .info ("%s: waking up, cleaning registry cache..." , TASK_NAME )
19
- app [APP_REGISTRY_CACHE_DATA_KEY ].clear ()
20
- _logger .info ("%s: caching services list..." , TASK_NAME )
22
+ _logger .info ("%s: waking up, refreshing cache..." , TASK_NAME )
21
23
try :
22
- await registry_proxy .list_services (app , registry_proxy .ServiceType .ALL )
24
+ keys = []
25
+ refresh_tasks = []
26
+ for key in app [APP_REGISTRY_CACHE_DATA_KEY ]:
27
+ path , method = key .split (":" )
28
+ _logger .debug ("refresh %s:%s" , method , path )
29
+ refresh_tasks .append (
30
+ registry_proxy .registry_request (
31
+ app , path , method , no_cache = True
32
+ )
33
+ )
34
+ keys = list (app [APP_REGISTRY_CACHE_DATA_KEY ].keys ())
35
+ results = await logged_gather (* refresh_tasks )
36
+
37
+ for key , result in zip (keys , results ):
38
+ app [APP_REGISTRY_CACHE_DATA_KEY ][key ] = result
39
+
23
40
except exceptions .DirectorException :
24
41
# if the registry is temporarily not available this might happen
42
+ _logger .exception (
43
+ "%s: exception while refreshing cache, clean cache..." , TASK_NAME
44
+ )
25
45
app [APP_REGISTRY_CACHE_DATA_KEY ].clear ()
26
46
27
- _logger .info ("%s: sleeping for %ss..." , TASK_NAME , config .DIRECTOR_REGISTRY_CACHING_TTL )
47
+ _logger .info (
48
+ "cache refreshed %s: sleeping for %ss..." ,
49
+ TASK_NAME ,
50
+ config .DIRECTOR_REGISTRY_CACHING_TTL ,
51
+ )
28
52
await asyncio .sleep (config .DIRECTOR_REGISTRY_CACHING_TTL )
29
53
except asyncio .CancelledError :
30
54
_logger .info ("%s: cancelling task..." , TASK_NAME )
31
- except Exception : # pylint: disable=broad-except
32
- _logger .exception ("%s: exception while retrieving list of services in cache" , TASK_NAME )
55
+ except Exception : # pylint: disable=broad-except
56
+ _logger .exception ("%s: Unhandled exception while refreshing cache" , TASK_NAME )
33
57
finally :
34
58
_logger .info ("%s: finished task...clearing cache..." , TASK_NAME )
35
59
app [APP_REGISTRY_CACHE_DATA_KEY ].clear ()
36
60
61
+
37
62
async def setup_registry_caching_task (app : web .Application ) -> None :
38
63
app [APP_REGISTRY_CACHE_DATA_KEY ] = {}
39
64
app [TASK_NAME ] = asyncio .get_event_loop ().create_task (registry_caching_task (app ))
@@ -44,13 +69,10 @@ async def setup_registry_caching_task(app: web.Application) -> None:
44
69
task .cancel ()
45
70
await task
46
71
72
+
47
73
def setup (app : web .Application ) -> None :
48
74
if config .DIRECTOR_REGISTRY_CACHING :
49
75
app .cleanup_ctx .append (setup_registry_caching_task )
50
-
51
76
52
77
53
- __all__ = [
54
- "setup" ,
55
- "APP_REGISTRY_CACHE_DATA_KEY"
56
- ]
78
+ __all__ = ["setup" , "APP_REGISTRY_CACHE_DATA_KEY" ]
0 commit comments