Skip to content

Commit 07c200e

Browse files
committed
šŸ›Autoscaling/Comp backend: drain retired nodes so that they can be re-used (#6345)
1 parent f6d362b commit 07c200e

File tree

8 files changed

+98
-5
lines changed

8 files changed

+98
-5
lines changed

ā€Žservices/autoscaling/src/simcore_service_autoscaling/models.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ class Cluster: # pylint: disable=too-many-instance-attributes
9898
"description": "This is a EC2-backed docker node which is docker drained and waiting for termination"
9999
}
100100
)
101+
retired_nodes: list[AssociatedInstance] = field(
102+
metadata={
103+
"description": "This is a EC2-backed docker node which was retired and waiting to be drained and eventually terminated or re-used"
104+
}
105+
)
101106
terminated_instances: list[NonAssociatedInstance]
102107

103108
def can_scale_down(self) -> bool:
@@ -107,6 +112,7 @@ def can_scale_down(self) -> bool:
107112
or self.drained_nodes
108113
or self.pending_ec2s
109114
or self.terminating_nodes
115+
or self.retired_nodes
110116
)
111117

112118
def total_number_of_machines(self) -> int:
@@ -119,6 +125,7 @@ def total_number_of_machines(self) -> int:
119125
+ len(self.pending_ec2s)
120126
+ len(self.broken_ec2s)
121127
+ len(self.terminating_nodes)
128+
+ len(self.retired_nodes)
122129
)
123130

124131
def __repr__(self) -> str:
@@ -137,7 +144,8 @@ def _get_instance_ids(
137144
f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, "
138145
f"disconnected-nodes: count={len(self.disconnected_nodes)}, "
139146
f"terminating-nodes: count={len(self.terminating_nodes)} {_get_instance_ids(self.terminating_nodes)}, "
140-
f"terminated-ec2s: count={len(self.terminated_instances)} {_get_instance_ids(self.terminated_instances)}, "
147+
f"retired-nodes: count={len(self.retired_nodes)} {_get_instance_ids(self.retired_nodes)}, "
148+
f"terminated-ec2s: count={len(self.terminated_instances)} {_get_instance_ids(self.terminated_instances)})"
141149
)
142150

143151

ā€Žservices/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ async def _analyze_current_cluster(
113113
]
114114

115115
# analyse attached ec2s
116-
active_nodes, pending_nodes, all_drained_nodes = [], [], []
116+
active_nodes, pending_nodes, all_drained_nodes, retired_nodes = [], [], [], []
117117
for instance in attached_ec2s:
118118
if await auto_scaling_mode.is_instance_active(app, instance):
119119
node_used_resources = await auto_scaling_mode.compute_node_used_resources(
@@ -128,6 +128,9 @@ async def _analyze_current_cluster(
128128
)
129129
elif auto_scaling_mode.is_instance_drained(instance):
130130
all_drained_nodes.append(instance)
131+
elif await auto_scaling_mode.is_instance_retired(app, instance):
132+
# it should be drained, but it is not, so we force it to be drained such that it might be re-used if needed
133+
retired_nodes.append(instance)
131134
else:
132135
pending_nodes.append(instance)
133136

@@ -149,6 +152,7 @@ async def _analyze_current_cluster(
149152
NonAssociatedInstance(ec2_instance=i) for i in terminated_ec2_instances
150153
],
151154
disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)],
155+
retired_nodes=retired_nodes,
152156
)
153157
_logger.info("current state: %s", f"{cluster!r}")
154158
return cluster
@@ -969,6 +973,43 @@ async def _notify_machine_creation_progress(
969973
)
970974

971975

976+
async def _drain_retired_nodes(
977+
app: FastAPI,
978+
cluster: Cluster,
979+
) -> Cluster:
980+
if not cluster.retired_nodes:
981+
return cluster
982+
983+
app_settings = get_application_settings(app)
984+
docker_client = get_docker_client(app)
985+
# drain this empty nodes
986+
updated_nodes: list[Node] = await asyncio.gather(
987+
*(
988+
utils_docker.set_node_osparc_ready(
989+
app_settings,
990+
docker_client,
991+
node.node,
992+
ready=False,
993+
)
994+
for node in cluster.retired_nodes
995+
)
996+
)
997+
if updated_nodes:
998+
_logger.info(
999+
"following nodes were set to drain: '%s'",
1000+
f"{[node.Description.Hostname for node in updated_nodes if node.Description]}",
1001+
)
1002+
newly_drained_instances = [
1003+
AssociatedInstance(node=node, ec2_instance=instance.ec2_instance)
1004+
for instance, node in zip(cluster.retired_nodes, updated_nodes, strict=True)
1005+
]
1006+
return dataclasses.replace(
1007+
cluster,
1008+
retired_nodes=[],
1009+
drained_nodes=cluster.drained_nodes + newly_drained_instances,
1010+
)
1011+
1012+
9721013
async def _autoscale_cluster(
9731014
app: FastAPI,
9741015
cluster: Cluster,
@@ -1071,6 +1112,7 @@ async def auto_scale_cluster(
10711112
cluster = await _try_attach_pending_ec2s(
10721113
app, cluster, auto_scaling_mode, allowed_instance_types
10731114
)
1115+
cluster = await _drain_retired_nodes(app, cluster)
10741116

10751117
cluster = await _autoscale_cluster(
10761118
app, cluster, auto_scaling_mode, allowed_instance_types

ā€Žservices/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ async def compute_cluster_total_resources(
8686
async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool:
8787
...
8888

89+
@staticmethod
90+
@abstractmethod
91+
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
92+
...
93+
8994
@staticmethod
9095
def is_instance_drained(instance: AssociatedInstance) -> bool:
9196
return not utils_docker.is_node_osparc_ready(instance.node)

ā€Žservices/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,14 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool
177177
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
178178
)
179179

180+
@staticmethod
181+
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
182+
if not utils_docker.is_node_osparc_ready(instance.node):
183+
return False
184+
return await dask.is_worker_retired(
185+
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
186+
)
187+
180188
@staticmethod
181189
async def try_retire_nodes(app: FastAPI) -> None:
182190
await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app))

ā€Žservices/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool
102102
assert app # nosec
103103
return utils_docker.is_node_osparc_ready(instance.node)
104104

105+
@staticmethod
106+
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
107+
assert app # nosec
108+
assert instance # nosec
109+
# nothing to do here
110+
return False
111+
105112
@staticmethod
106113
async def try_retire_nodes(app: FastAPI) -> None:
107114
assert app # nosec

ā€Žservices/autoscaling/src/simcore_service_autoscaling/modules/dask.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import distributed.scheduler
1212
from aws_library.ec2.models import EC2InstanceData, Resources
1313
from dask_task_models_library.resource_constraints import DaskTaskResources
14+
from distributed.core import Status
1415
from models_library.clusters import InternalClusterAuthentication, TLSAuthentication
1516
from pydantic import AnyUrl, ByteSize, parse_obj_as
1617

@@ -120,8 +121,28 @@ async def is_worker_connected(
120121
) -> bool:
121122
with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError):
122123
async with _scheduler_client(scheduler_url, authentication) as client:
123-
_dask_worker_from_ec2_instance(client, worker_ec2_instance)
124-
return True
124+
_, worker_details = _dask_worker_from_ec2_instance(
125+
client, worker_ec2_instance
126+
)
127+
return Status(worker_details["status"]) == Status.running
128+
return False
129+
130+
131+
async def is_worker_retired(
132+
scheduler_url: AnyUrl,
133+
authentication: InternalClusterAuthentication,
134+
worker_ec2_instance: EC2InstanceData,
135+
) -> bool:
136+
with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError):
137+
async with _scheduler_client(scheduler_url, authentication) as client:
138+
_, worker_details = _dask_worker_from_ec2_instance(
139+
client, worker_ec2_instance
140+
)
141+
return Status(worker_details["status"]) in {
142+
Status.closed,
143+
Status.closing,
144+
Status.closing_gracefully,
145+
}
125146
return False
126147

127148

ā€Žservices/autoscaling/tests/manual/docker-compose-computational.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
services:
22
autoscaling:
33
environment:
4+
- AUTOSCALING_DASK={}
45
- DASK_MONITORING_URL=tcp://dask-scheduler:8786
5-
- DASK_SCHEDULER_AUTH='{}'
6+
- DASK_SCHEDULER_AUTH={}
67
dask-sidecar:
78
image: itisfoundation/dask-sidecar:master-github-latest
89
init: true

ā€Žservices/autoscaling/tests/unit/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,7 @@ def _creator(**cluter_overrides) -> Cluster:
738738
buffer_ec2s=[],
739739
disconnected_nodes=[],
740740
terminating_nodes=[],
741+
retired_nodes=[],
741742
terminated_instances=[],
742743
),
743744
**cluter_overrides,

0 commit comments

Comments
Ā (0)