Skip to content

Commit a612a27

Browse files
authored
🐛Autoscaling: Fixes return value of Docker node activation (#6953)
1 parent 08981e0 commit a612a27

File tree

3 files changed

+31
-12
lines changed

3 files changed

+31
-12
lines changed

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -356,10 +356,10 @@ async def _activate_and_notify(
356356
app: FastAPI,
357357
auto_scaling_mode: BaseAutoscaling,
358358
drained_node: AssociatedInstance,
359-
) -> None:
359+
) -> AssociatedInstance:
360360
app_settings = get_application_settings(app)
361361
docker_client = get_docker_client(app)
362-
await asyncio.gather(
362+
updated_node, *_ = await asyncio.gather(
363363
utils_docker.set_node_osparc_ready(
364364
app_settings, docker_client, drained_node.node, ready=True
365365
),
@@ -373,6 +373,7 @@ async def _activate_and_notify(
373373
app, drained_node.assigned_tasks, progress=1.0
374374
),
375375
)
376+
return dataclasses.replace(drained_node, node=updated_node)
376377

377378

378379
async def _activate_drained_nodes(
@@ -392,13 +393,13 @@ async def _activate_drained_nodes(
392393
with log_context(
393394
_logger, logging.INFO, f"activate {len(nodes_to_activate)} drained nodes"
394395
):
395-
await asyncio.gather(
396+
activated_nodes = await asyncio.gather(
396397
*(
397398
_activate_and_notify(app, auto_scaling_mode, node)
398399
for node in nodes_to_activate
399400
)
400401
)
401-
new_active_node_ids = {node.ec2_instance.id for node in nodes_to_activate}
402+
new_active_node_ids = {node.ec2_instance.id for node in activated_nodes}
402403
remaining_drained_nodes = [
403404
node
404405
for node in cluster.drained_nodes
@@ -411,7 +412,7 @@ async def _activate_drained_nodes(
411412
]
412413
return dataclasses.replace(
413414
cluster,
414-
active_nodes=cluster.active_nodes + nodes_to_activate,
415+
active_nodes=cluster.active_nodes + activated_nodes,
415416
drained_nodes=remaining_drained_nodes,
416417
buffer_drained_nodes=remaining_reserved_drained_nodes,
417418
)
@@ -878,7 +879,7 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
878879
with log_context(
879880
_logger, logging.INFO, f"drain {len(active_empty_instances)} empty nodes"
880881
):
881-
updated_nodes: list[Node] = await asyncio.gather(
882+
updated_nodes = await asyncio.gather(
882883
*(
883884
utils_docker.set_node_osparc_ready(
884885
app_settings,
@@ -1076,7 +1077,7 @@ async def _drain_retired_nodes(
10761077
app_settings = get_application_settings(app)
10771078
docker_client = get_docker_client(app)
10781079
# drain this empty nodes
1079-
updated_nodes: list[Node] = await asyncio.gather(
1080+
updated_nodes = await asyncio.gather(
10801081
*(
10811082
utils_docker.set_node_osparc_ready(
10821083
app_settings,
@@ -1173,7 +1174,11 @@ async def _autoscale_cluster(
11731174
) -> Cluster:
11741175
# 1. check if we have pending tasks
11751176
unnasigned_pending_tasks = await auto_scaling_mode.list_unrunnable_tasks(app)
1176-
_logger.info("found %s pending tasks", len(unnasigned_pending_tasks))
1177+
_logger.info(
1178+
"found %s pending task%s",
1179+
len(unnasigned_pending_tasks),
1180+
"s" if len(unnasigned_pending_tasks) > 1 else "",
1181+
)
11771182
# NOTE: this function predicts how the backend will assign tasks
11781183
still_pending_tasks, cluster = await _assign_tasks_to_current_cluster(
11791184
app, unnasigned_pending_tasks, cluster, auto_scaling_mode

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,15 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]:
6969
_scheduler_url(app), _scheduler_auth(app)
7070
)
7171
# NOTE: any worker "processing" more than 1 task means that the other tasks are queued!
72+
# NOTE: that is not necessarily true, in cases where 1 worker takes multiple tasks?? (osparc.io)
7273
processing_tasks_by_worker = await dask.list_processing_tasks_per_worker(
7374
_scheduler_url(app), _scheduler_auth(app)
7475
)
7576
queued_tasks = []
7677
for tasks in processing_tasks_by_worker.values():
7778
queued_tasks += tasks[1:]
7879
_logger.debug(
79-
"found %s unrunnable tasks and %s potentially queued tasks",
80+
"found %s pending tasks and %s potentially queued tasks",
8081
len(unrunnable_tasks),
8182
len(queued_tasks),
8283
)

services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -575,9 +575,10 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]:
575575
available=with_drain_nodes_labelled,
576576
)
577577
# update our fake node
578+
fake_attached_node.spec.labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "true"
578579
fake_attached_node.spec.labels[
579580
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
580-
] = mock_docker_tag_node.call_args_list[0][1]["tags"][
581+
] = mock_docker_tag_node.call_args_list[2][1]["tags"][
581582
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
582583
]
583584
# check the activate time is later than attach time
@@ -590,13 +591,15 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]:
590591
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY
591592
]
592593
)
594+
fake_attached_node.spec.availability = Availability.active
593595
mock_compute_node_used_resources.assert_called_once_with(
594596
get_docker_client(initialized_app),
595597
fake_attached_node,
596598
)
597599
mock_compute_node_used_resources.reset_mock()
598600
# check activate call
599-
assert mock_docker_tag_node.call_args_list[1] == mock.call(
601+
602+
assert mock_docker_tag_node.call_args_list[2] == mock.call(
600603
get_docker_client(initialized_app),
601604
fake_attached_node,
602605
tags=fake_node.spec.labels
@@ -1766,7 +1769,17 @@ async def test__activate_drained_nodes_with_drained_node(
17661769
updated_cluster = await _activate_drained_nodes(
17671770
initialized_app, cluster_with_drained_nodes, DynamicAutoscaling()
17681771
)
1769-
assert updated_cluster.active_nodes == cluster_with_drained_nodes.drained_nodes
1772+
# they are the same nodes, but the availability might have changed here
1773+
assert updated_cluster.active_nodes != cluster_with_drained_nodes.drained_nodes
1774+
assert (
1775+
updated_cluster.active_nodes[0].assigned_tasks
1776+
== cluster_with_drained_nodes.drained_nodes[0].assigned_tasks
1777+
)
1778+
assert (
1779+
updated_cluster.active_nodes[0].ec2_instance
1780+
== cluster_with_drained_nodes.drained_nodes[0].ec2_instance
1781+
)
1782+
17701783
assert drained_host_node.spec
17711784
mock_docker_tag_node.assert_called_once_with(
17721785
mock.ANY,

0 commit comments

Comments
 (0)