|
12 | 12 | from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
|
13 | 13 | from servicelib.logging_errors import create_troubleshotting_log_kwargs
|
14 | 14 | from servicelib.logging_utils import log_catch, log_context
|
15 |
| -from servicelib.utils import limited_as_completed, logged_gather |
| 15 | +from servicelib.utils import limited_as_completed, limited_gather |
16 | 16 |
|
17 | 17 | from ..dynamic_scheduler import api as dynamic_scheduler_service
|
18 | 18 | from ..projects._projects_service import (
|
@@ -50,10 +50,10 @@ async def _remove_service(
|
50 | 50 | except (UserNotFoundError, ValueError):
|
51 | 51 | save_service_state = False
|
52 | 52 |
|
53 |
| - with log_context( |
| 53 | + with log_catch(_logger, reraise=False), log_context( |
54 | 54 | _logger,
|
55 | 55 | logging.INFO,
|
56 |
| - msg=f"removing {(service.node_uuid, service.host)} with {save_service_state=}", |
| 56 | + f"removing {(service.node_uuid, service.host)} with {save_service_state=}", |
57 | 57 | ):
|
58 | 58 | await dynamic_scheduler_service.stop_dynamic_service(
|
59 | 59 | app,
|
@@ -89,73 +89,72 @@ async def remove_orphaned_services(
|
89 | 89 | # otherwise if some time goes it could very well be that there are new projects opened
|
90 | 90 | # in between and the GC would remove services that actually should be running.
|
91 | 91 |
|
92 |
| - with log_catch(_logger, reraise=False): |
93 |
| - running_services = await dynamic_scheduler_service.list_dynamic_services(app) |
94 |
| - if not running_services: |
95 |
| - # nothing to do |
96 |
| - return |
97 |
| - _logger.debug( |
98 |
| - "Actual running dynamic services: %s", |
99 |
| - [(x.node_uuid, x.host) for x in running_services], |
100 |
| - ) |
101 |
| - running_services_by_id: dict[NodeID, DynamicServiceGet] = { |
102 |
| - service.node_uuid: service for service in running_services |
103 |
| - } |
104 |
| - |
105 |
| - known_opened_project_ids = await _list_opened_project_ids(registry) |
106 |
| - |
107 |
| - # NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error. |
108 |
| - # Why? If a service is running but the nodes form the correspondign project cannot be listed, |
109 |
| - # the service will be considered as orphaned and closed. |
110 |
| - potentially_running_service_ids: list[set[NodeID]] = [] |
111 |
| - async for project_nodes_future in limited_as_completed( |
112 |
| - ( |
113 |
| - list_node_ids_in_project(app, project_id) |
114 |
| - for project_id in known_opened_project_ids |
115 |
| - ), |
116 |
| - limit=_MAX_CONCURRENT_CALLS, |
117 |
| - ): |
118 |
| - try: |
119 |
| - project_nodes = await project_nodes_future |
120 |
| - potentially_running_service_ids.append(project_nodes) |
121 |
| - except BaseException as e: # pylint:disable=broad-exception-caught |
122 |
| - _logger.warning( |
123 |
| - create_troubleshotting_log_kwargs( |
124 |
| - ( |
125 |
| - "Skipping orpahn services removal, call to " |
126 |
| - "`list_node_ids_in_project` raised" |
127 |
| - ), |
128 |
| - error=e, |
129 |
| - error_context={ |
130 |
| - "running_services": running_services, |
131 |
| - "running_services_by_id": running_services_by_id, |
132 |
| - "known_opened_project_ids": known_opened_project_ids, |
133 |
| - }, |
| 92 | + running_services = await dynamic_scheduler_service.list_dynamic_services(app) |
| 93 | + if not running_services: |
| 94 | + # nothing to do |
| 95 | + return |
| 96 | + _logger.debug( |
| 97 | + "Actual running dynamic services: %s", |
| 98 | + [(x.node_uuid, x.host) for x in running_services], |
| 99 | + ) |
| 100 | + running_services_by_id: dict[NodeID, DynamicServiceGet] = { |
| 101 | + service.node_uuid: service for service in running_services |
| 102 | + } |
| 103 | + |
| 104 | + known_opened_project_ids = await _list_opened_project_ids(registry) |
| 105 | + |
| 106 | + # NOTE: Always skip orphan repmoval when `list_node_ids_in_project` raises an error. |
| 107 | + # Why? If a service is running but the nodes form the correspondign project cannot be listed, |
| 108 | + # the service will be considered as orphaned and closed. |
| 109 | + potentially_running_service_ids: list[set[NodeID]] = [] |
| 110 | + async for project_nodes_future in limited_as_completed( |
| 111 | + ( |
| 112 | + list_node_ids_in_project(app, project_id) |
| 113 | + for project_id in known_opened_project_ids |
| 114 | + ), |
| 115 | + limit=_MAX_CONCURRENT_CALLS, |
| 116 | + ): |
| 117 | + try: |
| 118 | + project_nodes = await project_nodes_future |
| 119 | + potentially_running_service_ids.append(project_nodes) |
| 120 | + except BaseException as e: # pylint:disable=broad-exception-caught |
| 121 | + _logger.warning( |
| 122 | + create_troubleshotting_log_kwargs( |
| 123 | + ( |
| 124 | + "Skipping orpahn services removal, call to " |
| 125 | + "`list_node_ids_in_project` raised" |
134 | 126 | ),
|
135 |
| - exc_info=True, |
136 |
| - ) |
137 |
| - return |
138 |
| - |
139 |
| - potentially_running_service_ids_set: set[NodeID] = set().union( |
140 |
| - *(node_id for node_id in potentially_running_service_ids) |
141 |
| - ) |
142 |
| - _logger.debug( |
143 |
| - "Allowed service UUIDs from known opened projects: %s", |
144 |
| - potentially_running_service_ids_set, |
145 |
| - ) |
146 |
| - |
147 |
| - # compute the difference to find the orphaned services |
148 |
| - orphaned_running_service_ids = ( |
149 |
| - set(running_services_by_id) - potentially_running_service_ids_set |
150 |
| - ) |
151 |
| - _logger.debug("Found orphaned services: %s", orphaned_running_service_ids) |
152 |
| - # NOTE: no need to not reraise here, since we catch everything above |
153 |
| - # and logged_gather first runs everything |
154 |
| - await logged_gather( |
155 |
| - *( |
156 |
| - _remove_service(app, node_id, running_services_by_id[node_id]) |
157 |
| - for node_id in orphaned_running_service_ids |
158 |
| - ), |
159 |
| - log=_logger, |
160 |
| - max_concurrency=_MAX_CONCURRENT_CALLS, |
161 |
| - ) |
| 127 | + error=e, |
| 128 | + error_context={ |
| 129 | + "running_services": running_services, |
| 130 | + "running_services_by_id": running_services_by_id, |
| 131 | + "known_opened_project_ids": known_opened_project_ids, |
| 132 | + }, |
| 133 | + ), |
| 134 | + exc_info=True, |
| 135 | + ) |
| 136 | + continue |
| 137 | + |
| 138 | + potentially_running_service_ids_set: set[NodeID] = set().union( |
| 139 | + *(node_id for node_id in potentially_running_service_ids) |
| 140 | + ) |
| 141 | + _logger.debug( |
| 142 | + "Allowed service UUIDs from known opened projects: %s", |
| 143 | + potentially_running_service_ids_set, |
| 144 | + ) |
| 145 | + |
| 146 | + # compute the difference to find the orphaned services |
| 147 | + orphaned_running_service_ids = ( |
| 148 | + set(running_services_by_id) - potentially_running_service_ids_set |
| 149 | + ) |
| 150 | + _logger.debug("Found orphaned services: %s", orphaned_running_service_ids) |
| 151 | + # NOTE: no need to not reraise here, since we catch everything above |
| 152 | + # and logged_gather first runs everything |
| 153 | + await limited_gather( |
| 154 | + *( |
| 155 | + _remove_service(app, node_id, running_services_by_id[node_id]) |
| 156 | + for node_id in orphaned_running_service_ids |
| 157 | + ), |
| 158 | + log=_logger, |
| 159 | + limit=_MAX_CONCURRENT_CALLS, |
| 160 | + ) |
0 commit comments