2
2
import logging
3
3
from contextlib import suppress
4
4
from itertools import chain
5
- from typing import Dict , List , Tuple
5
+ from typing import Dict , List , Optional , Tuple
6
6
7
+ import psycopg2
7
8
from aiohttp import web
8
- from aiopg .sa .result import RowProxy
9
9
from aioredlock import Aioredlock
10
10
from servicelib .observer import emit
11
11
from servicelib .utils import logged_gather
42
42
43
43
from .config import (
44
44
APP_CLIENT_REDIS_LOCK_KEY ,
45
- APP_GARBAGE_COLLECTOR_KEY ,
46
45
GUEST_USER_RC_LOCK_FORMAT ,
47
46
get_garbage_collector_interval ,
48
47
)
51
50
logger = logging .getLogger (__name__ )
52
51
53
52
54
- async def setup_garbage_collector_task (app : web .Application ):
55
- loop = asyncio .get_event_loop ()
56
- app [APP_GARBAGE_COLLECTOR_KEY ] = loop .create_task (garbage_collector_task (app ))
57
- yield
58
- task = app [APP_GARBAGE_COLLECTOR_KEY ]
59
- task .cancel ()
60
- await task
53
+ def setup_garbage_collector (app : web .Application ):
54
+ async def _setup_background_task (app : web .Application ):
55
+ # on_startup
56
+ loop = asyncio .get_event_loop ()
57
+ cgp_task = loop .create_task (collect_garbage_periodically (app ))
61
58
59
+ yield
62
60
63
- def setup_garbage_collector (app : web .Application ):
64
- app .cleanup_ctx .append (setup_garbage_collector_task )
61
+ # on_cleanup
62
+ with suppress (asyncio .CancelledError ):
63
+ cgp_task .cancel ()
64
+ await cgp_task
65
+
66
+ app .cleanup_ctx .append (_setup_background_task )
65
67
66
68
67
- async def garbage_collector_task (app : web .Application ):
68
- keep_alive = True
69
+ async def collect_garbage_periodically (app : web .Application ):
69
70
70
- while keep_alive :
71
+ while True :
71
72
logger .info ("Starting garbage collector..." )
72
73
try :
73
74
interval = get_garbage_collector_interval (app )
@@ -76,8 +77,9 @@ async def garbage_collector_task(app: web.Application):
76
77
await asyncio .sleep (interval )
77
78
78
79
except asyncio .CancelledError :
79
- keep_alive = False
80
80
logger .info ("Garbage collection task was cancelled, it will not restart!" )
81
+ # do not catch Cancellation errors
82
+ raise
81
83
82
84
except Exception : # pylint: disable=broad-except
83
85
logger .warning (
@@ -253,12 +255,10 @@ async def remove_disconnected_user_resources(
253
255
resource_name ,
254
256
keys_to_update ,
255
257
)
256
- with suppress (asyncio .CancelledError ):
257
- on_released_tasks = [
258
- registry .remove_resource (key , resource_name )
259
- for key in keys_to_update
260
- ]
261
- await logged_gather (* on_released_tasks , reraise = False )
258
+ on_released_tasks = [
259
+ registry .remove_resource (key , resource_name ) for key in keys_to_update
260
+ ]
261
+ await logged_gather (* on_released_tasks , reraise = False )
262
262
263
263
# NOTE:
264
264
# - if releasing a resource (1) fails, annotations in registry allows GC to try in next round
@@ -373,8 +373,8 @@ async def remove_orphaned_services(
373
373
logger .info ("Will remove service %s" , service_host )
374
374
try :
375
375
await stop_service (app , node_id )
376
- except (ServiceNotFoundError , DirectorException ) as e :
377
- logger .warning ("Error while stopping service: %s" , e )
376
+ except (ServiceNotFoundError , DirectorException ) as err :
377
+ logger .warning ("Error while stopping service: %s" , err )
378
378
379
379
logger .info ("Finished orphaned services removal" )
380
380
@@ -391,11 +391,12 @@ async def remove_guest_user_with_all_its_resources(
391
391
try :
392
392
await remove_all_projects_for_user (app = app , user_id = user_id )
393
393
await remove_user (app = app , user_id = user_id )
394
- except Exception as e : # pylint: disable=broad-except
395
- logger . warning ( "%s" , e )
394
+
395
+ except psycopg2 . DatabaseError :
396
396
logger .warning (
397
397
"Could not remove GUEST with id=%s. Check the logs above for details" ,
398
398
user_id ,
399
+ exc_info = True ,
399
400
)
400
401
401
402
@@ -421,7 +422,7 @@ async def remove_all_projects_for_user(app: web.Application, user_id: int) -> No
421
422
user_id ,
422
423
)
423
424
return
424
- user_primary_gid : str = str (project_owner ["primary_gid" ])
425
+ user_primary_gid = int (project_owner ["primary_gid" ])
425
426
426
427
# fetch all projects for the user
427
428
user_project_uuids = await app [
@@ -488,8 +489,8 @@ async def get_new_project_owner_gid(
488
489
project_uuid : str ,
489
490
user_id : int ,
490
491
user_primary_gid : int ,
491
- project : RowProxy ,
492
- ) -> str :
492
+ project : Dict ,
493
+ ) -> Optional [ int ] :
493
494
"""Goes through the access rights and tries to find a new suitable owner.
494
495
The first viable user is selected as a new owner.
495
496
In order to become a new owner the user must have write access right.
@@ -549,7 +550,7 @@ async def get_new_project_owner_gid(
549
550
550
551
async def fetch_new_project_owner_from_groups (
551
552
app : web .Application , standard_groups : Dict , user_id : int
552
- ) -> int :
553
+ ) -> Optional [ int ] :
553
554
"""Iterate over all the users in a group and if the users exists in the db
554
555
return its gid"""
555
556
@@ -570,6 +571,7 @@ async def fetch_new_project_owner_from_groups(
570
571
"Could not find new owner '%s' will try a new one" ,
571
572
possible_user_id ,
572
573
)
574
+
573
575
return None
574
576
575
577
@@ -578,17 +580,19 @@ async def replace_current_owner(
578
580
project_uuid : str ,
579
581
user_primary_gid : int ,
580
582
new_project_owner_gid : str ,
581
- project : RowProxy ,
583
+ project : Dict ,
582
584
) -> None :
583
585
try :
584
586
new_project_owner_id = await get_user_id_from_gid (
585
587
app = app , primary_gid = int (new_project_owner_gid )
586
588
)
587
- except Exception : # pylint: disable=broad-except
589
+
590
+ except psycopg2 .DatabaseError :
588
591
logger .exception (
589
592
"Could not recover new user id from gid %s" , new_project_owner_gid
590
593
)
591
594
return
595
+
592
596
# the result might me none
593
597
if new_project_owner_id is None :
594
598
logger .warning (
@@ -604,13 +608,14 @@ async def replace_current_owner(
604
608
str (new_project_owner_gid )
605
609
] = ProjectAccessRights .OWNER .value
606
610
logger .error ("Syncing back project %s" , project )
611
+
607
612
# syncing back project data
608
613
try :
609
614
await app [APP_PROJECT_DBAPI ].update_project_without_enforcing_checks (
610
615
project_data = project ,
611
616
project_uuid = project_uuid ,
612
617
)
613
- except Exception : # pylint: disable=broad-except
618
+ except psycopg2 . DatabaseError :
614
619
logger .exception (
615
620
"Could not remove old owner and replaced it with user %s" ,
616
621
new_project_owner_id ,
@@ -621,7 +626,7 @@ async def remove_user(app: web.Application, user_id: int) -> None:
621
626
"""Tries to remove a user, if the users still exists a warning message will be displayed"""
622
627
try :
623
628
await delete_user (app , user_id )
624
- except Exception : # pylint: disable=broad-except
629
+ except psycopg2 . DatabaseError :
625
630
logger .warning (
626
631
"User '%s' still has some projects, could not be deleted" , user_id
627
632
)
0 commit comments