2
2
import logging
3
3
from contextlib import suppress
4
4
from itertools import chain
5
- from typing import Dict , List , Tuple
5
+ from typing import Dict , List , Optional , Tuple
6
6
7
+ import psycopg2
7
8
from aiohttp import web
8
- from aiopg .sa .result import RowProxy
9
9
from aioredlock import Aioredlock
10
-
11
10
from servicelib .observer import emit
12
11
from servicelib .utils import logged_gather
13
12
from simcore_service_webserver import users_exceptions
43
42
44
43
from .config import (
45
44
APP_CLIENT_REDIS_LOCK_KEY ,
46
- APP_GARBAGE_COLLECTOR_KEY ,
47
45
GUEST_USER_RC_LOCK_FORMAT ,
48
46
get_garbage_collector_interval ,
49
47
)
52
50
logger = logging .getLogger (__name__ )
53
51
54
52
55
- async def setup_garbage_collector_task (app : web .Application ):
56
- loop = asyncio .get_event_loop ()
57
- app [APP_GARBAGE_COLLECTOR_KEY ] = loop .create_task (garbage_collector_task (app ))
58
- yield
59
- task = app [APP_GARBAGE_COLLECTOR_KEY ]
60
- task .cancel ()
61
- await task
53
+ def setup_garbage_collector (app : web .Application ):
54
+ async def _setup_background_task (app : web .Application ):
55
+ # on_startup
56
+ loop = asyncio .get_event_loop ()
57
+ cgp_task = loop .create_task (collect_garbage_periodically (app ))
62
58
59
+ yield
63
60
64
- def setup_garbage_collector (app : web .Application ):
65
- app .cleanup_ctx .append (setup_garbage_collector_task )
61
+ # on_cleanup
62
+ with suppress (asyncio .CancelledError ):
63
+ cgp_task .cancel ()
64
+ await cgp_task
66
65
66
+ app .cleanup_ctx .append (_setup_background_task )
67
67
68
- async def garbage_collector_task (app : web .Application ):
69
- keep_alive = True
70
68
71
- while keep_alive :
69
+ async def collect_garbage_periodically (app : web .Application ):
70
+
71
+ while True :
72
72
logger .info ("Starting garbage collector..." )
73
73
try :
74
74
interval = get_garbage_collector_interval (app )
@@ -77,8 +77,9 @@ async def garbage_collector_task(app: web.Application):
77
77
await asyncio .sleep (interval )
78
78
79
79
except asyncio .CancelledError :
80
- keep_alive = False
81
80
logger .info ("Garbage collection task was cancelled, it will not restart!" )
81
+ # do not catch Cancellation errors
82
+ raise
82
83
83
84
except Exception : # pylint: disable=broad-except
84
85
logger .warning (
@@ -254,12 +255,10 @@ async def remove_disconnected_user_resources(
254
255
resource_name ,
255
256
keys_to_update ,
256
257
)
257
- with suppress (asyncio .CancelledError ):
258
- on_released_tasks = [
259
- registry .remove_resource (key , resource_name )
260
- for key in keys_to_update
261
- ]
262
- await logged_gather (* on_released_tasks , reraise = False )
258
+ on_released_tasks = [
259
+ registry .remove_resource (key , resource_name ) for key in keys_to_update
260
+ ]
261
+ await logged_gather (* on_released_tasks , reraise = False )
263
262
264
263
# NOTE:
265
264
# - if releasing a resource (1) fails, annotations in registry allows GC to try in next round
@@ -357,8 +356,8 @@ async def remove_orphaned_services(
357
356
logger .info ("Will remove service %s" , interactive_service ["service_host" ])
358
357
try :
359
358
await stop_service (app , node_id )
360
- except (ServiceNotFoundError , DirectorException ) as e :
361
- logger .warning ("Error while stopping service: %s" , e )
359
+ except (ServiceNotFoundError , DirectorException ) as err :
360
+ logger .warning ("Error while stopping service: %s" , err )
362
361
363
362
logger .info ("Finished orphaned services removal" )
364
363
@@ -375,11 +374,12 @@ async def remove_guest_user_with_all_its_resources(
375
374
try :
376
375
await remove_all_projects_for_user (app = app , user_id = user_id )
377
376
await remove_user (app = app , user_id = user_id )
378
- except Exception as e : # pylint: disable=broad-except
379
- logger . warning ( "%s" , e )
377
+
378
+ except psycopg2 . DatabaseError :
380
379
logger .warning (
381
380
"Could not remove GUEST with id=%s. Check the logs above for details" ,
382
381
user_id ,
382
+ exc_info = True ,
383
383
)
384
384
385
385
@@ -405,7 +405,7 @@ async def remove_all_projects_for_user(app: web.Application, user_id: int) -> No
405
405
user_id ,
406
406
)
407
407
return
408
- user_primary_gid : str = str (project_owner ["primary_gid" ])
408
+ user_primary_gid = int (project_owner ["primary_gid" ])
409
409
410
410
# fetch all projects for the user
411
411
user_project_uuids = await app [
@@ -472,8 +472,8 @@ async def get_new_project_owner_gid(
472
472
project_uuid : str ,
473
473
user_id : int ,
474
474
user_primary_gid : int ,
475
- project : RowProxy ,
476
- ) -> str :
475
+ project : Dict ,
476
+ ) -> Optional [ int ] :
477
477
"""Goes through the access rights and tries to find a new suitable owner.
478
478
The first viable user is selected as a new owner.
479
479
In order to become a new owner the user must have write access right.
@@ -533,7 +533,7 @@ async def get_new_project_owner_gid(
533
533
534
534
async def fetch_new_project_owner_from_groups (
535
535
app : web .Application , standard_groups : Dict , user_id : int
536
- ) -> int :
536
+ ) -> Optional [ int ] :
537
537
"""Iterate over all the users in a group and if the users exists in the db
538
538
return its gid"""
539
539
@@ -554,6 +554,7 @@ async def fetch_new_project_owner_from_groups(
554
554
"Could not find new owner '%s' will try a new one" ,
555
555
possible_user_id ,
556
556
)
557
+
557
558
return None
558
559
559
560
@@ -562,17 +563,19 @@ async def replace_current_owner(
562
563
project_uuid : str ,
563
564
user_primary_gid : int ,
564
565
new_project_owner_gid : str ,
565
- project : RowProxy ,
566
+ project : Dict ,
566
567
) -> None :
567
568
try :
568
569
new_project_owner_id = await get_user_id_from_gid (
569
570
app = app , primary_gid = int (new_project_owner_gid )
570
571
)
571
- except Exception : # pylint: disable=broad-except
572
+
573
+ except psycopg2 .DatabaseError :
572
574
logger .exception (
573
575
"Could not recover new user id from gid %s" , new_project_owner_gid
574
576
)
575
577
return
578
+
576
579
# the result might me none
577
580
if new_project_owner_id is None :
578
581
logger .warning (
@@ -588,13 +591,14 @@ async def replace_current_owner(
588
591
str (new_project_owner_gid )
589
592
] = ProjectAccessRights .OWNER .value
590
593
logger .error ("Syncing back project %s" , project )
594
+
591
595
# syncing back project data
592
596
try :
593
597
await app [APP_PROJECT_DBAPI ].update_project_without_enforcing_checks (
594
598
project_data = project ,
595
599
project_uuid = project_uuid ,
596
600
)
597
- except Exception : # pylint: disable=broad-except
601
+ except psycopg2 . DatabaseError :
598
602
logger .exception (
599
603
"Could not remove old owner and replaced it with user %s" ,
600
604
new_project_owner_id ,
@@ -605,7 +609,7 @@ async def remove_user(app: web.Application, user_id: int) -> None:
605
609
"""Tries to remove a user, if the users still exists a warning message will be displayed"""
606
610
try :
607
611
await delete_user (app , user_id )
608
- except Exception : # pylint: disable=broad-except
612
+ except psycopg2 . DatabaseError :
609
613
logger .warning (
610
614
"User '%s' still has some projects, could not be deleted" , user_id
611
615
)
0 commit comments