diff --git a/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py b/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py index d6e8859869a..e18c2beb831 100644 --- a/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py +++ b/scripts/maintenance/computational-clusters/autoscaled_monitor/dask.py @@ -4,7 +4,6 @@ import distributed import rich -import typer from mypy_boto3_ec2.service_resource import Instance from pydantic import AnyUrl @@ -64,25 +63,6 @@ async def dask_client( f"{url}", security=security, timeout="5", asynchronous=True ) ) - versions = await _wrap_dask_async_call(client.get_versions()) - if versions["client"]["python"] != versions["scheduler"]["python"]: - rich.print( - f"[red]python versions do not match! TIP: install the correct version {versions['scheduler']['python']}[/red]" - ) - raise typer.Exit(1) - if ( - versions["client"]["distributed"] - != versions["scheduler"]["distributed"] - ): - rich.print( - f"[red]distributed versions do not match! TIP: install the correct version {versions['scheduler']['distributed']}[/red]" - ) - raise typer.Exit(1) - if versions["client"]["dask"] != versions["scheduler"]["dask"]: - rich.print( - f"[red]dask versions do not match! TIP: install the correct version {versions['scheduler']['dask']}[/red]" - ) - raise typer.Exit(1) yield client finally: diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index c51c05680ec..c77f9fe349c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -98,6 +98,11 @@ class Cluster: # pylint: disable=too-many-instance-attributes "description": "This is a EC2-backed docker node which is docker drained and waiting for termination" } ) + retired_nodes: list[AssociatedInstance] = field( + metadata={ + "description": "This is a EC2-backed docker node which was retired and waiting to be drained and eventually terminated or re-used" + } + ) terminated_instances: list[NonAssociatedInstance] def can_scale_down(self) -> bool: @@ -107,6 +112,7 @@ def can_scale_down(self) -> bool: or self.drained_nodes or self.pending_ec2s or self.terminating_nodes + or self.retired_nodes ) def total_number_of_machines(self) -> int: @@ -119,6 +125,7 @@ def total_number_of_machines(self) -> int: + len(self.pending_ec2s) + len(self.broken_ec2s) + len(self.terminating_nodes) + + len(self.retired_nodes) ) def __repr__(self) -> str: @@ -137,6 +144,7 @@ def _get_instance_ids( f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, " f"disconnected-nodes: count={len(self.disconnected_nodes)}, " f"terminating-nodes: count={len(self.terminating_nodes)} {_get_instance_ids(self.terminating_nodes)}, " + f"retired-nodes: count={len(self.retired_nodes)} {_get_instance_ids(self.retired_nodes)}, " f"terminated-ec2s: count={len(self.terminated_instances)} {_get_instance_ids(self.terminated_instances)})" ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 6eb9fdcb4c6..b1e629f4e7a 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -21,9 +21,9 @@ from servicelib.logging_utils import log_catch, log_context from servicelib.utils import limited_gather from servicelib.utils_formatting import timedelta_as_minute_second -from ..constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME from types_aiobotocore_ec2.literals import InstanceTypeType +from ..constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME from ..core.errors import ( Ec2InvalidDnsNameError, TaskBestFittingInstanceNotFoundError, @@ -123,7 +123,7 @@ async def _analyze_current_cluster( ] # analyse attached ec2s - active_nodes, pending_nodes, all_drained_nodes = [], [], [] + active_nodes, pending_nodes, all_drained_nodes, retired_nodes = [], [], [], [] for instance in attached_ec2s: if await auto_scaling_mode.is_instance_active(app, instance): node_used_resources = await auto_scaling_mode.compute_node_used_resources( @@ -138,6 +138,9 @@ async def _analyze_current_cluster( ) elif auto_scaling_mode.is_instance_drained(instance): all_drained_nodes.append(instance) + elif await auto_scaling_mode.is_instance_retired(app, instance): + # it should be drained, but it is not, so we force it to be drained such that it might be re-used if needed + retired_nodes.append(instance) else: pending_nodes.append(instance) @@ -159,6 +162,7 @@ async def _analyze_current_cluster( NonAssociatedInstance(ec2_instance=i) for i in terminated_ec2_instances ], disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)], + retired_nodes=retired_nodes, ) _logger.info("current state: %s", f"{cluster!r}") return cluster @@ -329,14 +333,14 @@ async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]: ec2_client = get_ec2_client(app) # some instances might be able to run several tasks - allowed_instance_types: list[EC2InstanceType] = ( - await ec2_client.get_ec2_instance_capabilities( - cast( - set[InstanceTypeType], - set( - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES, - ), - ) + allowed_instance_types: list[ + EC2InstanceType + ] = await ec2_client.get_ec2_instance_capabilities( + cast( + set[InstanceTypeType], + set( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES, + ), ) ) @@ -1081,6 +1085,43 @@ async def _notify_machine_creation_progress( ) +async def _drain_retired_nodes( + app: FastAPI, + cluster: Cluster, +) -> Cluster: + if not cluster.retired_nodes: + return cluster + + app_settings = get_application_settings(app) + docker_client = get_docker_client(app) + # drain this empty nodes + updated_nodes: list[Node] = await asyncio.gather( + *( + utils_docker.set_node_osparc_ready( + app_settings, + docker_client, + node.node, + ready=False, + ) + for node in cluster.retired_nodes + ) + ) + if updated_nodes: + _logger.info( + "following nodes were set to drain: '%s'", + f"{[node.Description.Hostname for node in updated_nodes if node.Description]}", + ) + newly_drained_instances = [ + AssociatedInstance(node=node, ec2_instance=instance.ec2_instance) + for instance, node in zip(cluster.retired_nodes, updated_nodes, strict=True) + ] + return dataclasses.replace( + cluster, + retired_nodes=[], + drained_nodes=cluster.drained_nodes + newly_drained_instances, + ) + + async def _autoscale_cluster( app: FastAPI, cluster: Cluster, @@ -1187,6 +1228,7 @@ async def auto_scale_cluster( cluster = await _try_attach_pending_ec2s( app, cluster, auto_scaling_mode, allowed_instance_types ) + cluster = await _drain_retired_nodes(app, cluster) cluster = await _autoscale_cluster( app, cluster, auto_scaling_mode, allowed_instance_types diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py index 7c30b1c61db..921af2cdd01 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py @@ -86,6 +86,11 @@ async def compute_cluster_total_resources( async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool: ... + @staticmethod + @abstractmethod + async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + ... + @staticmethod def is_instance_drained(instance: AssociatedInstance) -> bool: return not utils_docker.is_node_osparc_ready(instance.node) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index 0e3862e51b1..ecddfc5e8ec 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -177,6 +177,14 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance ) + @staticmethod + async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + if not utils_docker.is_node_osparc_ready(instance.node): + return False + return await dask.is_worker_retired( + _scheduler_url(app), _scheduler_auth(app), instance.ec2_instance + ) + @staticmethod async def try_retire_nodes(app: FastAPI) -> None: await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app)) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py index 4f6cd21006b..f267ebe013c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py @@ -102,6 +102,13 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool assert app # nosec return utils_docker.is_node_osparc_ready(instance.node) + @staticmethod + async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool: + assert app # nosec + assert instance # nosec + # nothing to do here + return False + @staticmethod async def try_retire_nodes(app: FastAPI) -> None: assert app # nosec diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index edb02d40e2f..5e1c7e2f0c7 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -11,6 +11,7 @@ import distributed.scheduler from aws_library.ec2 import EC2InstanceData, Resources from dask_task_models_library.resource_constraints import DaskTaskResources +from distributed.core import Status from models_library.clusters import InternalClusterAuthentication, TLSAuthentication from pydantic import AnyUrl, ByteSize, parse_obj_as @@ -120,8 +121,28 @@ async def is_worker_connected( ) -> bool: with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError): async with _scheduler_client(scheduler_url, authentication) as client: - _dask_worker_from_ec2_instance(client, worker_ec2_instance) - return True + _, worker_details = _dask_worker_from_ec2_instance( + client, worker_ec2_instance + ) + return Status(worker_details["status"]) == Status.running + return False + + +async def is_worker_retired( + scheduler_url: AnyUrl, + authentication: InternalClusterAuthentication, + worker_ec2_instance: EC2InstanceData, +) -> bool: + with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError): + async with _scheduler_client(scheduler_url, authentication) as client: + _, worker_details = _dask_worker_from_ec2_instance( + client, worker_ec2_instance + ) + return Status(worker_details["status"]) in { + Status.closed, + Status.closing, + Status.closing_gracefully, + } return False @@ -152,9 +173,9 @@ def _list_tasks( } async with _scheduler_client(scheduler_url, authentication) as client: - list_of_tasks: dict[ - dask.typing.Key, DaskTaskResources - ] = await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) + list_of_tasks: dict[dask.typing.Key, DaskTaskResources] = ( + await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) + ) _logger.debug("found unrunnable tasks: %s", list_of_tasks) return [ DaskTask( @@ -186,10 +207,10 @@ def _list_processing_tasks( return worker_to_processing_tasks async with _scheduler_client(scheduler_url, authentication) as client: - worker_to_tasks: dict[ - str, list[tuple[dask.typing.Key, DaskTaskResources]] - ] = await _wrap_client_async_routine( - client.run_on_scheduler(_list_processing_tasks) + worker_to_tasks: dict[str, list[tuple[dask.typing.Key, DaskTaskResources]]] = ( + await _wrap_client_async_routine( + client.run_on_scheduler(_list_processing_tasks) + ) ) _logger.debug("found processing tasks: %s", worker_to_tasks) tasks_per_worker = defaultdict(list) @@ -255,12 +276,12 @@ def _list_processing_tasks_on_worker( _logger.debug("looking for processing tasksfor %s", f"{worker_url=}") # now get the used resources - worker_processing_tasks: list[ - tuple[dask.typing.Key, DaskTaskResources] - ] = await _wrap_client_async_routine( - client.run_on_scheduler( - _list_processing_tasks_on_worker, worker_url=worker_url - ), + worker_processing_tasks: list[tuple[dask.typing.Key, DaskTaskResources]] = ( + await _wrap_client_async_routine( + client.run_on_scheduler( + _list_processing_tasks_on_worker, worker_url=worker_url + ), + ) ) total_resources_used: collections.Counter[str] = collections.Counter() diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py index 4d8f53025b5..1cfa96a6f36 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py @@ -42,6 +42,10 @@ "Number of EC2-backed docker nodes that started the termination process", EC2_INSTANCE_LABELS, ), + "retired_nodes": ( + "Number of EC2-backed docker nodes that were actively retired and waiting for draining and termination or re-use", + EC2_INSTANCE_LABELS, + ), "terminated_instances": ( "Number of EC2 instances that were terminated (they are typically visible 1 hour)", EC2_INSTANCE_LABELS, diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py index 70b96cabfe2..056a77ea2a5 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py @@ -29,6 +29,7 @@ class ClusterMetrics(MetricsBase): # pylint: disable=too-many-instance-attribut buffer_ec2s: TrackedGauge = field(init=False) disconnected_nodes: TrackedGauge = field(init=False) terminating_nodes: TrackedGauge = field(init=False) + retired_nodes: TrackedGauge = field(init=False) terminated_instances: TrackedGauge = field(init=False) def __post_init__(self) -> None: diff --git a/services/autoscaling/tests/manual/docker-compose-computational.yml b/services/autoscaling/tests/manual/docker-compose-computational.yml index 29575c76f7e..d97387ca95b 100644 --- a/services/autoscaling/tests/manual/docker-compose-computational.yml +++ b/services/autoscaling/tests/manual/docker-compose-computational.yml @@ -1,8 +1,9 @@ services: autoscaling: environment: + - AUTOSCALING_DASK={} - DASK_MONITORING_URL=tcp://dask-scheduler:8786 - - DASK_SCHEDULER_AUTH='{}' + - DASK_SCHEDULER_AUTH={} dask-sidecar: image: itisfoundation/dask-sidecar:master-github-latest init: true diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 4ad72c33d99..97e709c2dba 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -799,6 +799,7 @@ def _creator(**cluter_overrides) -> Cluster: buffer_ec2s=[], disconnected_nodes=[], terminating_nodes=[], + retired_nodes=[], terminated_instances=[], ), **cluter_overrides,