From 33c4e264817be520564ac19c981b2f0571952320 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 10 Sep 2024 18:13:05 +0200 Subject: [PATCH 1/7] testing --- .../src/simcore_service_autoscaling/models.py | 5 ++++ .../modules/auto_scaling_core.py | 24 +++++++++++-------- .../modules/auto_scaling_mode_base.py | 5 ++++ .../modules/auto_scaling_mode_dynamic.py | 7 ++++++ .../modules/dask.py | 19 +++++++++++++++ 5 files changed, 50 insertions(+), 10 deletions(-) diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index c51c05680ec..a11ba88863e 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -98,6 +98,11 @@ class Cluster: # pylint: disable=too-many-instance-attributes "description": "This is a EC2-backed docker node which is docker drained and waiting for termination" } ) + retired_nodes: list[AssociatedInstance] = field( + metadata={ + "description": "This is a EC2-backed docker node which was retired and waiting to be drained and eventually terminated or re-used" + } + ) terminated_instances: list[NonAssociatedInstance] def can_scale_down(self) -> bool: diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 6eb9fdcb4c6..2136c724dfd 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -21,9 +21,9 @@ from servicelib.logging_utils import log_catch, log_context from servicelib.utils import limited_gather from servicelib.utils_formatting import timedelta_as_minute_second -from ..constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME from types_aiobotocore_ec2.literals import InstanceTypeType +from ..constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME from ..core.errors import ( Ec2InvalidDnsNameError, TaskBestFittingInstanceNotFoundError, @@ -123,7 +123,7 @@ async def _analyze_current_cluster( ] # analyse attached ec2s - active_nodes, pending_nodes, all_drained_nodes = [], [], [] + active_nodes, pending_nodes, all_drained_nodes, retired_nodes = [], [], [], [] for instance in attached_ec2s: if await auto_scaling_mode.is_instance_active(app, instance): node_used_resources = await auto_scaling_mode.compute_node_used_resources( @@ -138,6 +138,9 @@ async def _analyze_current_cluster( ) elif auto_scaling_mode.is_instance_drained(instance): all_drained_nodes.append(instance) + elif await auto_scaling_mode.is_instance_retired(app, instance): + # it should be drained, but it is not, so we force it to be drained such that it might be re-used if needed + retired_nodes.append(instance) else: pending_nodes.append(instance) @@ -159,6 +162,7 @@ async def _analyze_current_cluster( NonAssociatedInstance(ec2_instance=i) for i in terminated_ec2_instances ], disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)], + retired_nodes=retired_nodes, ) _logger.info("current state: %s", f"{cluster!r}") return cluster @@ -329,14 +333,14 @@ async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]: ec2_client = get_ec2_client(app) # some instances might be able to run several tasks - allowed_instance_types: list[EC2InstanceType] = ( - await ec2_client.get_ec2_instance_capabilities( - cast( - set[InstanceTypeType], - set( - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES, - ), - ) + allowed_instance_types: list[ + EC2InstanceType + ] = await ec2_client.get_ec2_instance_capabilities( + cast( + set[InstanceTypeType], + set( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES, + ), ) ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py index 7c30b1c61db..921af2cdd01 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py @@ -86,6 +86,11 @@ async def compute_cluster_total_resources( async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool: ... + @staticmethod + @abstractmethod + async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + ... + @staticmethod def is_instance_drained(instance: AssociatedInstance) -> bool: return not utils_docker.is_node_osparc_ready(instance.node) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py index 4f6cd21006b..f267ebe013c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py @@ -102,6 +102,13 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool assert app # nosec return utils_docker.is_node_osparc_ready(instance.node) + @staticmethod + async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + assert app # nosec + assert instance # nosec + # nothing to do here + return False + @staticmethod async def try_retire_nodes(app: FastAPI) -> None: assert app # nosec diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index edb02d40e2f..40bb58a2229 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -11,6 +11,7 @@ import distributed.scheduler from aws_library.ec2 import EC2InstanceData, Resources from dask_task_models_library.resource_constraints import DaskTaskResources +from distributed.core import Status from models_library.clusters import InternalClusterAuthentication, TLSAuthentication from pydantic import AnyUrl, ByteSize, parse_obj_as @@ -125,6 +126,24 @@ async def is_worker_connected( return False +async def is_worker_retired( + scheduler_url: AnyUrl, + authentication: InternalClusterAuthentication, + worker_ec2_instance: EC2InstanceData, +) -> bool: + with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError): + async with _scheduler_client(scheduler_url, authentication) as client: + _, worker_details = _dask_worker_from_ec2_instance( + client, worker_ec2_instance + ) + return Status(worker_details["status"]) in { + Status.closed, + Status.closing, + Status.closing_gracefully, + } + return False + + def _dask_key_to_dask_task_id(key: dask.typing.Key) -> DaskTaskId: if isinstance(key, bytes): return key.decode("utf-8") From bc87e67040293182a4002eb3e28a10e992751d60 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 10 Sep 2024 18:16:17 +0200 Subject: [PATCH 2/7] added retired nodes --- .../autoscaling/src/simcore_service_autoscaling/models.py | 3 +++ .../modules/instrumentation/_constants.py | 4 ++++ .../modules/instrumentation/_models.py | 1 + services/autoscaling/tests/unit/conftest.py | 1 + 4 files changed, 9 insertions(+) diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index a11ba88863e..c77f9fe349c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -112,6 +112,7 @@ def can_scale_down(self) -> bool: or self.drained_nodes or self.pending_ec2s or self.terminating_nodes + or self.retired_nodes ) def total_number_of_machines(self) -> int: @@ -124,6 +125,7 @@ def total_number_of_machines(self) -> int: + len(self.pending_ec2s) + len(self.broken_ec2s) + len(self.terminating_nodes) + + len(self.retired_nodes) ) def __repr__(self) -> str: @@ -142,6 +144,7 @@ def _get_instance_ids( f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, " f"disconnected-nodes: count={len(self.disconnected_nodes)}, " f"terminating-nodes: count={len(self.terminating_nodes)} {_get_instance_ids(self.terminating_nodes)}, " + f"retired-nodes: count={len(self.retired_nodes)} {_get_instance_ids(self.retired_nodes)}, " f"terminated-ec2s: count={len(self.terminated_instances)} {_get_instance_ids(self.terminated_instances)})" ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py index 4d8f53025b5..1cfa96a6f36 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py @@ -42,6 +42,10 @@ "Number of EC2-backed docker nodes that started the termination process", EC2_INSTANCE_LABELS, ), + "retired_nodes": ( + "Number of EC2-backed docker nodes that were actively retired and waiting for draining and termination or re-use", + EC2_INSTANCE_LABELS, + ), "terminated_instances": ( "Number of EC2 instances that were terminated (they are typically visible 1 hour)", EC2_INSTANCE_LABELS, diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py index 70b96cabfe2..056a77ea2a5 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py @@ -29,6 +29,7 @@ class ClusterMetrics(MetricsBase): # pylint: disable=too-many-instance-attribut buffer_ec2s: TrackedGauge = field(init=False) disconnected_nodes: TrackedGauge = field(init=False) terminating_nodes: TrackedGauge = field(init=False) + retired_nodes: TrackedGauge = field(init=False) terminated_instances: TrackedGauge = field(init=False) def __post_init__(self) -> None: diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 4ad72c33d99..97e709c2dba 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -799,6 +799,7 @@ def _creator(**cluter_overrides) -> Cluster: buffer_ec2s=[], disconnected_nodes=[], terminating_nodes=[], + retired_nodes=[], terminated_instances=[], ), **cluter_overrides, From 0705f2955dfd99e0aaeca302176eee2bc7c38bd3 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 10 Sep 2024 18:21:48 +0200 Subject: [PATCH 3/7] do it --- .../modules/auto_scaling_core.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 2136c724dfd..b1e629f4e7a 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -1085,6 +1085,43 @@ async def _notify_machine_creation_progress( ) +async def _drain_retired_nodes( + app: FastAPI, + cluster: Cluster, +) -> Cluster: + if not cluster.retired_nodes: + return cluster + + app_settings = get_application_settings(app) + docker_client = get_docker_client(app) + # drain this empty nodes + updated_nodes: list[Node] = await asyncio.gather( + *( + utils_docker.set_node_osparc_ready( + app_settings, + docker_client, + node.node, + ready=False, + ) + for node in cluster.retired_nodes + ) + ) + if updated_nodes: + _logger.info( + "following nodes were set to drain: '%s'", + f"{[node.Description.Hostname for node in updated_nodes if node.Description]}", + ) + newly_drained_instances = [ + AssociatedInstance(node=node, ec2_instance=instance.ec2_instance) + for instance, node in zip(cluster.retired_nodes, updated_nodes, strict=True) + ] + return dataclasses.replace( + cluster, + retired_nodes=[], + drained_nodes=cluster.drained_nodes + newly_drained_instances, + ) + + async def _autoscale_cluster( app: FastAPI, cluster: Cluster, @@ -1191,6 +1228,7 @@ async def auto_scale_cluster( cluster = await _try_attach_pending_ec2s( app, cluster, auto_scaling_mode, allowed_instance_types ) + cluster = await _drain_retired_nodes(app, cluster) cluster = await _autoscale_cluster( app, cluster, auto_scaling_mode, allowed_instance_types From 1ec58ae1d18e78351d3e8ef45c6d78f5bc434d20 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 10 Sep 2024 18:26:43 +0200 Subject: [PATCH 4/7] missing file --- .../modules/auto_scaling_mode_computational.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index 0e3862e51b1..cd5c0f69d04 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -177,6 +177,16 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance ) + @staticmethod + async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + # NOTE: we assume the node is active + assert ( + ComputationalAutoscaling.is_instance_active(app, instance) is True + ) # nosec + return await dask.is_worker_retired( + _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance + ) + @staticmethod async def try_retire_nodes(app: FastAPI) -> None: await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app)) From 380481f7376ef60c7784e6a9cf9dc5a4a77d2e6c Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:40:23 +0000 Subject: [PATCH 5/7] make sure an active node has a running dask-sidecar --- .../auto_scaling_mode_computational.py | 6 ++-- .../modules/dask.py | 32 ++++++++++--------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index cd5c0f69d04..ecddfc5e8ec 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -179,10 +179,8 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool @staticmethod async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: - # NOTE: we assume the node is active - assert ( - ComputationalAutoscaling.is_instance_active(app, instance) is True - ) # nosec + if not utils_docker.is_node_osparc_ready(instance.node): + return False return await dask.is_worker_retired( _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index 40bb58a2229..5e1c7e2f0c7 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -121,8 +121,10 @@ async def is_worker_connected( ) -> bool: with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError): async with _scheduler_client(scheduler_url, authentication) as client: - _dask_worker_from_ec2_instance(client, worker_ec2_instance) - return True + _, worker_details = _dask_worker_from_ec2_instance( + client, worker_ec2_instance + ) + return Status(worker_details["status"]) == Status.running return False @@ -171,9 +173,9 @@ def _list_tasks( } async with _scheduler_client(scheduler_url, authentication) as client: - list_of_tasks: dict[ - dask.typing.Key, DaskTaskResources - ] = await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) + list_of_tasks: dict[dask.typing.Key, DaskTaskResources] = ( + await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) + ) _logger.debug("found unrunnable tasks: %s", list_of_tasks) return [ DaskTask( @@ -205,10 +207,10 @@ def _list_processing_tasks( return worker_to_processing_tasks async with _scheduler_client(scheduler_url, authentication) as client: - worker_to_tasks: dict[ - str, list[tuple[dask.typing.Key, DaskTaskResources]] - ] = await _wrap_client_async_routine( - client.run_on_scheduler(_list_processing_tasks) + worker_to_tasks: dict[str, list[tuple[dask.typing.Key, DaskTaskResources]]] = ( + await _wrap_client_async_routine( + client.run_on_scheduler(_list_processing_tasks) + ) ) _logger.debug("found processing tasks: %s", worker_to_tasks) tasks_per_worker = defaultdict(list) @@ -274,12 +276,12 @@ def _list_processing_tasks_on_worker( _logger.debug("looking for processing tasksfor %s", f"{worker_url=}") # now get the used resources - worker_processing_tasks: list[ - tuple[dask.typing.Key, DaskTaskResources] - ] = await _wrap_client_async_routine( - client.run_on_scheduler( - _list_processing_tasks_on_worker, worker_url=worker_url - ), + worker_processing_tasks: list[tuple[dask.typing.Key, DaskTaskResources]] = ( + await _wrap_client_async_routine( + client.run_on_scheduler( + _list_processing_tasks_on_worker, worker_url=worker_url + ), + ) ) total_resources_used: collections.Counter[str] = collections.Counter() From 4955e173320053777e78c001f953fb01c294976d Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:40:30 +0000 Subject: [PATCH 6/7] minor --- .../autoscaling/tests/manual/docker-compose-computational.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/autoscaling/tests/manual/docker-compose-computational.yml b/services/autoscaling/tests/manual/docker-compose-computational.yml index 29575c76f7e..d97387ca95b 100644 --- a/services/autoscaling/tests/manual/docker-compose-computational.yml +++ b/services/autoscaling/tests/manual/docker-compose-computational.yml @@ -1,8 +1,9 @@ services: autoscaling: environment: + - AUTOSCALING_DASK={} - DASK_MONITORING_URL=tcp://dask-scheduler:8786 - - DASK_SCHEDULER_AUTH='{}' + - DASK_SCHEDULER_AUTH={} dask-sidecar: image: itisfoundation/dask-sidecar:master-github-latest init: true From 18cce86729618249c384912d4dc0c148f6a19fcd Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:31:20 +0200 Subject: [PATCH 7/7] monitoring script --- .../autoscaled_monitor/dask.py | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py b/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py index d6e8859869a..e18c2beb831 100644 --- a/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py +++ b/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py @@ -4,7 +4,6 @@ import distributed import rich -import typer from mypy_boto3_ec2.service_resource import Instance from pydantic import AnyUrl @@ -64,25 +63,6 @@ async def dask_client( f"{url}", security=security, timeout="5", asynchronous=True ) ) - versions = await _wrap_dask_async_call(client.get_versions()) - if versions["client"]["python"] != versions["scheduler"]["python"]: - rich.print( - f"[red]python versions do not match! TIP: install the correct version {versions['scheduler']['python']}[/red]" - ) - raise typer.Exit(1) - if ( - versions["client"]["distributed"] - != versions["scheduler"]["distributed"] - ): - rich.print( - f"[red]distributed versions do not match! TIP: install the correct version {versions['scheduler']['distributed']}[/red]" - ) - raise typer.Exit(1) - if versions["client"]["dask"] != versions["scheduler"]["dask"]: - rich.print( - f"[red]dask versions do not match! TIP: install the correct version {versions['scheduler']['dask']}[/red]" - ) - raise typer.Exit(1) yield client finally: