Skip to content

Commit 6ad5396

Browse files
authored
Merge branch 'master' into is6318/fix-catalog
2 parents e1110a2 + 5b35cfe commit 6ad5396

File tree

11 files changed

+124
-46
lines changed

11 files changed

+124
-46
lines changed

scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import distributed
66
import rich
7-
import typer
87
from mypy_boto3_ec2.service_resource import Instance
98
from pydantic import AnyUrl
109

@@ -64,25 +63,6 @@ async def dask_client(
6463
f"{url}", security=security, timeout="5", asynchronous=True
6564
)
6665
)
67-
versions = await _wrap_dask_async_call(client.get_versions())
68-
if versions["client"]["python"] != versions["scheduler"]["python"]:
69-
rich.print(
70-
f"[red]python versions do not match! TIP: install the correct version {versions['scheduler']['python']}[/red]"
71-
)
72-
raise typer.Exit(1)
73-
if (
74-
versions["client"]["distributed"]
75-
!= versions["scheduler"]["distributed"]
76-
):
77-
rich.print(
78-
f"[red]distributed versions do not match! TIP: install the correct version {versions['scheduler']['distributed']}[/red]"
79-
)
80-
raise typer.Exit(1)
81-
if versions["client"]["dask"] != versions["scheduler"]["dask"]:
82-
rich.print(
83-
f"[red]dask versions do not match! TIP: install the correct version {versions['scheduler']['dask']}[/red]"
84-
)
85-
raise typer.Exit(1)
8666
yield client
8767

8868
finally:

services/autoscaling/src/simcore_service_autoscaling/models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ class Cluster: # pylint: disable=too-many-instance-attributes
9898
"description": "This is a EC2-backed docker node which is docker drained and waiting for termination"
9999
}
100100
)
101+
retired_nodes: list[AssociatedInstance] = field(
102+
metadata={
103+
"description": "This is a EC2-backed docker node which was retired and waiting to be drained and eventually terminated or re-used"
104+
}
105+
)
101106
terminated_instances: list[NonAssociatedInstance]
102107

103108
def can_scale_down(self) -> bool:
@@ -107,6 +112,7 @@ def can_scale_down(self) -> bool:
107112
or self.drained_nodes
108113
or self.pending_ec2s
109114
or self.terminating_nodes
115+
or self.retired_nodes
110116
)
111117

112118
def total_number_of_machines(self) -> int:
@@ -119,6 +125,7 @@ def total_number_of_machines(self) -> int:
119125
+ len(self.pending_ec2s)
120126
+ len(self.broken_ec2s)
121127
+ len(self.terminating_nodes)
128+
+ len(self.retired_nodes)
122129
)
123130

124131
def __repr__(self) -> str:
@@ -137,6 +144,7 @@ def _get_instance_ids(
137144
f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, "
138145
f"disconnected-nodes: count={len(self.disconnected_nodes)}, "
139146
f"terminating-nodes: count={len(self.terminating_nodes)} {_get_instance_ids(self.terminating_nodes)}, "
147+
f"retired-nodes: count={len(self.retired_nodes)} {_get_instance_ids(self.retired_nodes)}, "
140148
f"terminated-ec2s: count={len(self.terminated_instances)} {_get_instance_ids(self.terminated_instances)})"
141149
)
142150

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
from servicelib.logging_utils import log_catch, log_context
2222
from servicelib.utils import limited_gather
2323
from servicelib.utils_formatting import timedelta_as_minute_second
24-
from ..constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME
2524
from types_aiobotocore_ec2.literals import InstanceTypeType
2625

26+
from ..constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME
2727
from ..core.errors import (
2828
Ec2InvalidDnsNameError,
2929
TaskBestFittingInstanceNotFoundError,
@@ -123,7 +123,7 @@ async def _analyze_current_cluster(
123123
]
124124

125125
# analyse attached ec2s
126-
active_nodes, pending_nodes, all_drained_nodes = [], [], []
126+
active_nodes, pending_nodes, all_drained_nodes, retired_nodes = [], [], [], []
127127
for instance in attached_ec2s:
128128
if await auto_scaling_mode.is_instance_active(app, instance):
129129
node_used_resources = await auto_scaling_mode.compute_node_used_resources(
@@ -138,6 +138,9 @@ async def _analyze_current_cluster(
138138
)
139139
elif auto_scaling_mode.is_instance_drained(instance):
140140
all_drained_nodes.append(instance)
141+
elif await auto_scaling_mode.is_instance_retired(app, instance):
142+
# it should be drained, but it is not, so we force it to be drained such that it might be re-used if needed
143+
retired_nodes.append(instance)
141144
else:
142145
pending_nodes.append(instance)
143146

@@ -159,6 +162,7 @@ async def _analyze_current_cluster(
159162
NonAssociatedInstance(ec2_instance=i) for i in terminated_ec2_instances
160163
],
161164
disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)],
165+
retired_nodes=retired_nodes,
162166
)
163167
_logger.info("current state: %s", f"{cluster!r}")
164168
return cluster
@@ -329,14 +333,14 @@ async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]:
329333
ec2_client = get_ec2_client(app)
330334

331335
# some instances might be able to run several tasks
332-
allowed_instance_types: list[EC2InstanceType] = (
333-
await ec2_client.get_ec2_instance_capabilities(
334-
cast(
335-
set[InstanceTypeType],
336-
set(
337-
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES,
338-
),
339-
)
336+
allowed_instance_types: list[
337+
EC2InstanceType
338+
] = await ec2_client.get_ec2_instance_capabilities(
339+
cast(
340+
set[InstanceTypeType],
341+
set(
342+
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES,
343+
),
340344
)
341345
)
342346

@@ -1081,6 +1085,43 @@ async def _notify_machine_creation_progress(
10811085
)
10821086

10831087

1088+
async def _drain_retired_nodes(
1089+
app: FastAPI,
1090+
cluster: Cluster,
1091+
) -> Cluster:
1092+
if not cluster.retired_nodes:
1093+
return cluster
1094+
1095+
app_settings = get_application_settings(app)
1096+
docker_client = get_docker_client(app)
1097+
# drain this empty nodes
1098+
updated_nodes: list[Node] = await asyncio.gather(
1099+
*(
1100+
utils_docker.set_node_osparc_ready(
1101+
app_settings,
1102+
docker_client,
1103+
node.node,
1104+
ready=False,
1105+
)
1106+
for node in cluster.retired_nodes
1107+
)
1108+
)
1109+
if updated_nodes:
1110+
_logger.info(
1111+
"following nodes were set to drain: '%s'",
1112+
f"{[node.Description.Hostname for node in updated_nodes if node.Description]}",
1113+
)
1114+
newly_drained_instances = [
1115+
AssociatedInstance(node=node, ec2_instance=instance.ec2_instance)
1116+
for instance, node in zip(cluster.retired_nodes, updated_nodes, strict=True)
1117+
]
1118+
return dataclasses.replace(
1119+
cluster,
1120+
retired_nodes=[],
1121+
drained_nodes=cluster.drained_nodes + newly_drained_instances,
1122+
)
1123+
1124+
10841125
async def _autoscale_cluster(
10851126
app: FastAPI,
10861127
cluster: Cluster,
@@ -1187,6 +1228,7 @@ async def auto_scale_cluster(
11871228
cluster = await _try_attach_pending_ec2s(
11881229
app, cluster, auto_scaling_mode, allowed_instance_types
11891230
)
1231+
cluster = await _drain_retired_nodes(app, cluster)
11901232

11911233
cluster = await _autoscale_cluster(
11921234
app, cluster, auto_scaling_mode, allowed_instance_types

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ async def compute_cluster_total_resources(
8686
async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool:
8787
...
8888

89+
@staticmethod
90+
@abstractmethod
91+
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
92+
...
93+
8994
@staticmethod
9095
def is_instance_drained(instance: AssociatedInstance) -> bool:
9196
return not utils_docker.is_node_osparc_ready(instance.node)

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,14 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool
177177
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
178178
)
179179

180+
@staticmethod
181+
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
182+
if not utils_docker.is_node_osparc_ready(instance.node):
183+
return False
184+
return await dask.is_worker_retired(
185+
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
186+
)
187+
180188
@staticmethod
181189
async def try_retire_nodes(app: FastAPI) -> None:
182190
await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app))

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool
102102
assert app # nosec
103103
return utils_docker.is_node_osparc_ready(instance.node)
104104

105+
@staticmethod
106+
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
107+
assert app # nosec
108+
assert instance # nosec
109+
# nothing to do here
110+
return False
111+
105112
@staticmethod
106113
async def try_retire_nodes(app: FastAPI) -> None:
107114
assert app # nosec

services/autoscaling/src/simcore_service_autoscaling/modules/dask.py

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import distributed.scheduler
1212
from aws_library.ec2 import EC2InstanceData, Resources
1313
from dask_task_models_library.resource_constraints import DaskTaskResources
14+
from distributed.core import Status
1415
from models_library.clusters import InternalClusterAuthentication, TLSAuthentication
1516
from pydantic import AnyUrl, ByteSize, parse_obj_as
1617

@@ -120,8 +121,28 @@ async def is_worker_connected(
120121
) -> bool:
121122
with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError):
122123
async with _scheduler_client(scheduler_url, authentication) as client:
123-
_dask_worker_from_ec2_instance(client, worker_ec2_instance)
124-
return True
124+
_, worker_details = _dask_worker_from_ec2_instance(
125+
client, worker_ec2_instance
126+
)
127+
return Status(worker_details["status"]) == Status.running
128+
return False
129+
130+
131+
async def is_worker_retired(
132+
scheduler_url: AnyUrl,
133+
authentication: InternalClusterAuthentication,
134+
worker_ec2_instance: EC2InstanceData,
135+
) -> bool:
136+
with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError):
137+
async with _scheduler_client(scheduler_url, authentication) as client:
138+
_, worker_details = _dask_worker_from_ec2_instance(
139+
client, worker_ec2_instance
140+
)
141+
return Status(worker_details["status"]) in {
142+
Status.closed,
143+
Status.closing,
144+
Status.closing_gracefully,
145+
}
125146
return False
126147

127148

@@ -152,9 +173,9 @@ def _list_tasks(
152173
}
153174

154175
async with _scheduler_client(scheduler_url, authentication) as client:
155-
list_of_tasks: dict[
156-
dask.typing.Key, DaskTaskResources
157-
] = await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks))
176+
list_of_tasks: dict[dask.typing.Key, DaskTaskResources] = (
177+
await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks))
178+
)
158179
_logger.debug("found unrunnable tasks: %s", list_of_tasks)
159180
return [
160181
DaskTask(
@@ -186,10 +207,10 @@ def _list_processing_tasks(
186207
return worker_to_processing_tasks
187208

188209
async with _scheduler_client(scheduler_url, authentication) as client:
189-
worker_to_tasks: dict[
190-
str, list[tuple[dask.typing.Key, DaskTaskResources]]
191-
] = await _wrap_client_async_routine(
192-
client.run_on_scheduler(_list_processing_tasks)
210+
worker_to_tasks: dict[str, list[tuple[dask.typing.Key, DaskTaskResources]]] = (
211+
await _wrap_client_async_routine(
212+
client.run_on_scheduler(_list_processing_tasks)
213+
)
193214
)
194215
_logger.debug("found processing tasks: %s", worker_to_tasks)
195216
tasks_per_worker = defaultdict(list)
@@ -255,12 +276,12 @@ def _list_processing_tasks_on_worker(
255276
_logger.debug("looking for processing tasksfor %s", f"{worker_url=}")
256277

257278
# now get the used resources
258-
worker_processing_tasks: list[
259-
tuple[dask.typing.Key, DaskTaskResources]
260-
] = await _wrap_client_async_routine(
261-
client.run_on_scheduler(
262-
_list_processing_tasks_on_worker, worker_url=worker_url
263-
),
279+
worker_processing_tasks: list[tuple[dask.typing.Key, DaskTaskResources]] = (
280+
await _wrap_client_async_routine(
281+
client.run_on_scheduler(
282+
_list_processing_tasks_on_worker, worker_url=worker_url
283+
),
284+
)
264285
)
265286

266287
total_resources_used: collections.Counter[str] = collections.Counter()

services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
"Number of EC2-backed docker nodes that started the termination process",
4343
EC2_INSTANCE_LABELS,
4444
),
45+
"retired_nodes": (
46+
"Number of EC2-backed docker nodes that were actively retired and waiting for draining and termination or re-use",
47+
EC2_INSTANCE_LABELS,
48+
),
4549
"terminated_instances": (
4650
"Number of EC2 instances that were terminated (they are typically visible 1 hour)",
4751
EC2_INSTANCE_LABELS,

services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class ClusterMetrics(MetricsBase): # pylint: disable=too-many-instance-attribut
2929
buffer_ec2s: TrackedGauge = field(init=False)
3030
disconnected_nodes: TrackedGauge = field(init=False)
3131
terminating_nodes: TrackedGauge = field(init=False)
32+
retired_nodes: TrackedGauge = field(init=False)
3233
terminated_instances: TrackedGauge = field(init=False)
3334

3435
def __post_init__(self) -> None:

services/autoscaling/tests/manual/docker-compose-computational.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
services:
22
autoscaling:
33
environment:
4+
- AUTOSCALING_DASK={}
45
- DASK_MONITORING_URL=tcp://dask-scheduler:8786
5-
- DASK_SCHEDULER_AUTH='{}'
6+
- DASK_SCHEDULER_AUTH={}
67
dask-sidecar:
78
image: itisfoundation/dask-sidecar:master-github-latest
89
init: true

services/autoscaling/tests/unit/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,7 @@ def _creator(**cluter_overrides) -> Cluster:
799799
buffer_ec2s=[],
800800
disconnected_nodes=[],
801801
terminating_nodes=[],
802+
retired_nodes=[],
802803
terminated_instances=[],
803804
),
804805
**cluter_overrides,

0 commit comments

Comments
 (0)