diff --git a/ci/github/helpers/install_aws_cli_v2.bash b/ci/github/helpers/install_aws_cli_v2.bash new file mode 100755 index 00000000000..3647f2b5568 --- /dev/null +++ b/ci/github/helpers/install_aws_cli_v2.bash @@ -0,0 +1,22 @@ +#!/bin/bash +# +# Installs the latest version of AWS CLI V2 +# + +# http://redsymbol.net/articles/unofficial-bash-strict-mode/ +set -o errexit # abort on nonzero exitstatus +set -o nounset # abort on unbound variable +set -o pipefail # don't hide errors within pipes +IFS=$'\n\t' + +AWS_CLI_VERSION="2.11.11" +ARCH="x86_64" + +curl "https://awscli.amazonaws.com/awscli-exe-linux-${ARCH}-${AWS_CLI_VERSION}.zip" --output "awscliv2.zip" && + apt-get update && + apt-get install -y unzip && + unzip awscliv2.zip && + ./aws/install --update && + apt-get remove --purge -y unzip && + rm awscliv2.zip && + rm -rf awscliv2 diff --git a/ci/github/integration-testing/simcore-sdk.bash b/ci/github/integration-testing/simcore-sdk.bash index f0add8cfe45..952192cf09b 100755 --- a/ci/github/integration-testing/simcore-sdk.bash +++ b/ci/github/integration-testing/simcore-sdk.bash @@ -8,6 +8,7 @@ install() { make devenv # shellcheck source=/dev/null source .venv/bin/activate + sudo ./ci/github/helpers/install_aws_cli_v2.bash pushd packages/simcore-sdk make install-ci popd diff --git a/packages/aws-library/src/aws_library/ec2/_client.py b/packages/aws-library/src/aws_library/ec2/_client.py index 276423415a5..970d6130e69 100644 --- a/packages/aws-library/src/aws_library/ec2/_client.py +++ b/packages/aws-library/src/aws_library/ec2/_client.py @@ -133,7 +133,7 @@ async def launch_instances( with log_context( _logger, logging.INFO, - msg=f"launching {number_of_instances} AWS instance(s) {instance_config.type.name} with {instance_config.tags=}", + msg=f"launch {number_of_instances} AWS instance(s) {instance_config.type.name} with {instance_config.tags=}", ): # first check the max amount is not already reached current_instances = await self.get_instances( @@ -277,7 +277,7 @@ async def start_instances( with log_context( _logger, logging.INFO, - msg=f"starting instances {instance_ids}", + msg=f"start instances {instance_ids}", ): await self.client.start_instances(InstanceIds=instance_ids) # wait for the instance to be in a pending state @@ -310,7 +310,7 @@ async def stop_instances(self, instance_datas: Iterable[EC2InstanceData]) -> Non with log_context( _logger, logging.INFO, - msg=f"stopping instances {[i.id for i in instance_datas]}", + msg=f"stop instances {[i.id for i in instance_datas]}", ): await self.client.stop_instances(InstanceIds=[i.id for i in instance_datas]) @@ -321,7 +321,7 @@ async def terminate_instances( with log_context( _logger, logging.INFO, - msg=f"terminating instances {[i.id for i in instance_datas]}", + msg=f"terminate instances {[i.id for i in instance_datas]}", ): await self.client.terminate_instances( InstanceIds=[i.id for i in instance_datas] @@ -335,7 +335,7 @@ async def set_instances_tags( with log_context( _logger, logging.DEBUG, - msg=f"setting {tags=} on instances '[{[i.id for i in instances]}]'", + msg=f"set {tags=} on instances '[{[i.id for i in instances]}]'", ): await self.client.create_tags( Resources=[i.id for i in instances], @@ -357,7 +357,7 @@ async def remove_instances_tags( with log_context( _logger, logging.DEBUG, - msg=f"removing {tag_keys=} of instances '[{[i.id for i in instances]}]'", + msg=f"removal of {tag_keys=} from instances '[{[i.id for i in instances]}]'", ): await self.client.delete_tags( Resources=[i.id for i in instances], diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py b/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py index 7bb826149fe..5f16fefc801 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py @@ -5,7 +5,12 @@ from models_library.docker import DockerGenericTag from types_aiobotocore_ec2 import EC2Client from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType -from types_aiobotocore_ec2.type_defs import FilterTypeDef, InstanceTypeDef, TagTypeDef +from types_aiobotocore_ec2.type_defs import ( + FilterTypeDef, + InstanceTypeDef, + ReservationTypeDef, + TagTypeDef, +) async def assert_autoscaled_computational_ec2_instances( @@ -43,6 +48,7 @@ async def assert_autoscaled_dynamic_ec2_instances( expected_additional_tag_keys: list[str], instance_filters: Sequence[FilterTypeDef] | None, expected_user_data: list[str] | None = None, + check_reservation_index: int | None = None, ) -> list[InstanceTypeDef]: if expected_user_data is None: expected_user_data = ["docker swarm join"] @@ -59,6 +65,7 @@ async def assert_autoscaled_dynamic_ec2_instances( ], expected_user_data=expected_user_data, instance_filters=instance_filters, + check_reservation_index=check_reservation_index, ) @@ -72,6 +79,7 @@ async def assert_autoscaled_dynamic_warm_pools_ec2_instances( expected_additional_tag_keys: list[str], expected_pre_pulled_images: list[DockerGenericTag] | None, instance_filters: Sequence[FilterTypeDef] | None, + check_reservation_index: int | None = None, ) -> list[InstanceTypeDef]: return await assert_ec2_instances( ec2_client, @@ -88,9 +96,80 @@ async def assert_autoscaled_dynamic_warm_pools_ec2_instances( expected_pre_pulled_images=expected_pre_pulled_images, expected_user_data=[], instance_filters=instance_filters, + check_reservation_index=check_reservation_index, ) +async def _assert_reservation( + ec2_client: EC2Client, + reservation: ReservationTypeDef, + *, + expected_num_instances: int, + expected_instance_type: InstanceTypeType, + expected_instance_state: InstanceStateNameType, + expected_instance_tag_keys: list[str], + expected_user_data: list[str], + expected_pre_pulled_images: list[DockerGenericTag] | None, +) -> list[InstanceTypeDef]: + list_instances: list[InstanceTypeDef] = [] + assert "Instances" in reservation + assert ( + len(reservation["Instances"]) == expected_num_instances + ), f"expected {expected_num_instances}, found {len(reservation['Instances'])}" + for instance in reservation["Instances"]: + assert "InstanceType" in instance + assert instance["InstanceType"] == expected_instance_type + assert "Tags" in instance + assert instance["Tags"] + expected_tag_keys = { + *expected_instance_tag_keys, + "io.simcore.autoscaling.version", + "Name", + } + instance_tag_keys = {tag["Key"] for tag in instance["Tags"] if "Key" in tag} + assert instance_tag_keys == expected_tag_keys + + if expected_pre_pulled_images is None: + assert "io.simcore.autoscaling.pre_pulled_images" not in instance_tag_keys + else: + assert "io.simcore.autoscaling.pre_pulled_images" in instance_tag_keys + + def _by_pre_pull_image(ec2_tag: TagTypeDef) -> bool: + assert "Key" in ec2_tag + return ec2_tag["Key"] == "io.simcore.autoscaling.pre_pulled_images" + + instance_pre_pulled_images_aws_tag = next( + iter(filter(_by_pre_pull_image, instance["Tags"])) + ) + assert "Value" in instance_pre_pulled_images_aws_tag + assert ( + instance_pre_pulled_images_aws_tag["Value"] + == f"{json_dumps(expected_pre_pulled_images)}" + ) + + assert "PrivateDnsName" in instance + instance_private_dns_name = instance["PrivateDnsName"] + if expected_instance_state not in ["terminated"]: + # NOTE: moto behaves here differently than AWS by still returning an IP which does not really make sense + assert instance_private_dns_name.endswith(".ec2.internal") + assert "State" in instance + state = instance["State"] + assert "Name" in state + assert state["Name"] == expected_instance_state + + assert "InstanceId" in instance + user_data = await ec2_client.describe_instance_attribute( + Attribute="userData", InstanceId=instance["InstanceId"] + ) + assert "UserData" in user_data + assert "Value" in user_data["UserData"] + user_data = base64.b64decode(user_data["UserData"]["Value"]).decode() + for user_data_string in expected_user_data: + assert user_data.count(user_data_string) == 1 + list_instances.append(instance) + return list_instances + + async def assert_ec2_instances( ec2_client: EC2Client, *, @@ -102,66 +181,35 @@ async def assert_ec2_instances( expected_user_data: list[str], expected_pre_pulled_images: list[DockerGenericTag] | None = None, instance_filters: Sequence[FilterTypeDef] | None = None, + check_reservation_index: int | None = None, ) -> list[InstanceTypeDef]: - list_instances: list[InstanceTypeDef] = [] all_instances = await ec2_client.describe_instances(Filters=instance_filters or []) assert len(all_instances["Reservations"]) == expected_num_reservations + if check_reservation_index is not None: + assert check_reservation_index < len(all_instances["Reservations"]) + reservation = all_instances["Reservations"][check_reservation_index] + return await _assert_reservation( + ec2_client, + reservation, + expected_num_instances=expected_num_instances, + expected_instance_type=expected_instance_type, + expected_instance_state=expected_instance_state, + expected_instance_tag_keys=expected_instance_tag_keys, + expected_user_data=expected_user_data, + expected_pre_pulled_images=expected_pre_pulled_images, + ) + list_instances: list[InstanceTypeDef] = [] for reservation in all_instances["Reservations"]: - assert "Instances" in reservation - assert ( - len(reservation["Instances"]) == expected_num_instances - ), f"expected {expected_num_instances}, found {len(reservation['Instances'])}" - for instance in reservation["Instances"]: - assert "InstanceType" in instance - assert instance["InstanceType"] == expected_instance_type - assert "Tags" in instance - assert instance["Tags"] - expected_tag_keys = { - *expected_instance_tag_keys, - "io.simcore.autoscaling.version", - "Name", - } - instance_tag_keys = {tag["Key"] for tag in instance["Tags"] if "Key" in tag} - assert instance_tag_keys == expected_tag_keys - - if expected_pre_pulled_images is None: - assert ( - "io.simcore.autoscaling.pre_pulled_images" not in instance_tag_keys - ) - else: - assert "io.simcore.autoscaling.pre_pulled_images" in instance_tag_keys - - def _by_pre_pull_image(ec2_tag: TagTypeDef) -> bool: - assert "Key" in ec2_tag - return ec2_tag["Key"] == "io.simcore.autoscaling.pre_pulled_images" - - instance_pre_pulled_images_aws_tag = next( - iter(filter(_by_pre_pull_image, instance["Tags"])) - ) - assert "Value" in instance_pre_pulled_images_aws_tag - assert ( - instance_pre_pulled_images_aws_tag["Value"] - == f"{json_dumps(expected_pre_pulled_images)}" - ) - - assert "PrivateDnsName" in instance - instance_private_dns_name = instance["PrivateDnsName"] - if expected_instance_state not in ["terminated"]: - # NOTE: moto behaves here differently than AWS by still returning an IP which does not really make sense - assert instance_private_dns_name.endswith(".ec2.internal") - assert "State" in instance - state = instance["State"] - assert "Name" in state - assert state["Name"] == expected_instance_state - - assert "InstanceId" in instance - user_data = await ec2_client.describe_instance_attribute( - Attribute="userData", InstanceId=instance["InstanceId"] + list_instances.extend( + await _assert_reservation( + ec2_client, + reservation, + expected_num_instances=expected_num_instances, + expected_instance_type=expected_instance_type, + expected_instance_state=expected_instance_state, + expected_instance_tag_keys=expected_instance_tag_keys, + expected_user_data=expected_user_data, + expected_pre_pulled_images=expected_pre_pulled_images, ) - assert "UserData" in user_data - assert "Value" in user_data["UserData"] - user_data = base64.b64decode(user_data["UserData"]["Value"]).decode() - for user_data_string in expected_user_data: - assert user_data.count(user_data_string) == 1 - list_instances.append(instance) + ) return list_instances diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 9c45de0524b..48567f3f0b6 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -391,7 +391,9 @@ async def _activate_drained_nodes( return cluster with log_context( - _logger, logging.INFO, f"activate {len(nodes_to_activate)} drained nodes" + _logger, + logging.INFO, + f"activate {len(nodes_to_activate)} drained nodes {[n.ec2_instance.id for n in nodes_to_activate]}", ): activated_nodes = await asyncio.gather( *( @@ -447,9 +449,19 @@ async def _start_warm_buffer_instances( if (warm_buffer.ec2_instance.type == hot_buffer_instance_type) and not warm_buffer.assigned_tasks ] + # check there are no empty pending ec2s/nodes that are not assigned to any task + unnassigned_pending_ec2s = [ + i.ec2_instance for i in cluster.pending_ec2s if not i.assigned_tasks + ] + unnassigned_pending_nodes = [ + i.ec2_instance for i in cluster.pending_nodes if not i.assigned_tasks + ] + instances_to_start += free_startable_warm_buffers_to_replace_hot_buffers[ : app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER - len(cluster.buffer_drained_nodes) + - len(unnassigned_pending_ec2s) + - len(unnassigned_pending_nodes) ] if not instances_to_start: @@ -983,11 +995,14 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster: new_terminating_instances = [] for instance in await _find_terminateable_instances(app, cluster): assert instance.node.description is not None # nosec - with log_context( - _logger, - logging.INFO, - msg=f"termination process for {instance.node.description.hostname}:{instance.ec2_instance.id}", - ), log_catch(_logger, reraise=False): + with ( + log_context( + _logger, + logging.INFO, + msg=f"termination process for {instance.node.description.hostname}:{instance.ec2_instance.id}", + ), + log_catch(_logger, reraise=False), + ): await utils_docker.set_node_begin_termination_process( get_docker_client(app), instance.node ) @@ -1232,7 +1247,6 @@ async def _autoscale_cluster( async def _notify_autoscaling_status( app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling ) -> None: - monitored_instances = list( itertools.chain( cluster.active_nodes, cluster.drained_nodes, cluster.buffer_drained_nodes diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index a3f554493b1..4ae3f0ccd13 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -1023,6 +1023,15 @@ def with_instances_machines_hot_buffer( ) +@pytest.fixture +def hot_buffer_instance_type(app_settings: ApplicationSettings) -> InstanceTypeType: + assert app_settings.AUTOSCALING_EC2_INSTANCES + return cast( + InstanceTypeType, + next(iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)), + ) + + @pytest.fixture def mock_find_node_with_name_returns_none(mocker: MockerFixture) -> Iterator[mock.Mock]: return mocker.patch( @@ -1093,7 +1102,7 @@ def ec2_instances_allowed_types_with_only_1_buffered( "t2.micro": EC2InstanceBootSpecific( ami_id=faker.pystr(), pre_pull_images=fake_pre_pull_images, - buffer_count=faker.pyint(min_value=1, max_value=10), + buffer_count=faker.pyint(min_value=2, max_value=10), ) } diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index afd3c01e4a3..6bb3a865bbe 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -66,6 +66,9 @@ AutoscalingDocker, get_docker_client, ) +from simcore_service_autoscaling.utils.auto_scaling_core import ( + node_host_name_from_ec2_private_dns, +) from simcore_service_autoscaling.utils.utils_docker import ( _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, _OSPARC_NODE_TERMINATION_PROCESS_LABEL_KEY, @@ -245,6 +248,15 @@ def instance_type_filters( ] +@pytest.fixture +def stopped_instance_type_filters( + instance_type_filters: Sequence[FilterTypeDef], +) -> Sequence[FilterTypeDef]: + copied_filters = deepcopy(instance_type_filters) + copied_filters[-1]["Values"] = ["stopped"] + return copied_filters + + @dataclass(frozen=True) class _ScaleUpParams: imposed_instance_type: InstanceTypeType | None @@ -2061,3 +2073,268 @@ async def test_warm_buffers_are_started_to_replace_missing_hot_buffers( len(analyzed_cluster.pending_ec2s) == app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER ) + + +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_docker_join_drained", + ["without_AUTOSCALING_DOCKER_JOIN_DRAINED"], + indirect=True, +) +@pytest.mark.parametrize( + # NOTE: only the main test test_cluster_scaling_up_and_down is run with all options + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) +async def test_warm_buffers_only_replace_hot_buffer_if_service_is_started_issue7071( + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, + minimal_configuration: None, + with_instances_machines_hot_buffer: EnvVarsDict, + with_drain_nodes_labelled: bool, + ec2_client: EC2Client, + initialized_app: FastAPI, + app_settings: ApplicationSettings, + ec2_instance_custom_tags: dict[str, str], + buffer_count: int, + create_buffer_machines: Callable[ + [int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag] | None], + Awaitable[list[str]], + ], + create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], + hot_buffer_instance_type: InstanceTypeType, + spied_cluster_analysis: MockType, + instance_type_filters: Sequence[FilterTypeDef], + stopped_instance_type_filters: Sequence[FilterTypeDef], + mock_find_node_with_name_returns_fake_node: mock.Mock, + mock_compute_node_used_resources: mock.Mock, + mock_docker_tag_node: mock.Mock, + mocker: MockerFixture, + fake_node: Node, +): + # NOTE: https://github.com/ITISFoundation/osparc-simcore/issues/7071 + + # + # PRE-requisites + # + assert app_settings.AUTOSCALING_EC2_INSTANCES + assert app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER > 0 + num_hot_buffer = ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER + ) + + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # ensure we get our running hot buffer + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + await assert_autoscaled_dynamic_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=num_hot_buffer, + expected_instance_type=hot_buffer_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + instance_filters=instance_type_filters, + ) + # this brings a new analysis + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + spied_cluster = assert_cluster_state( + spied_cluster_analysis, expected_calls=2, expected_num_machines=5 + ) + # calling again should attach the new nodes to the reserve, but nothing should start + fake_attached_node_base = deepcopy(fake_node) + assert fake_attached_node_base.spec + fake_attached_node_base.spec.availability = ( + Availability.active if with_drain_nodes_labelled else Availability.drain + ) + assert fake_attached_node_base.spec.labels + assert app_settings.AUTOSCALING_NODES_MONITORING + expected_docker_node_tags = { + tag_key: "true" + for tag_key in ( + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS + + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS + ) + } | { + DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: f"{hot_buffer_instance_type}" + } + fake_attached_node_base.spec.labels |= expected_docker_node_tags | { + _OSPARC_SERVICE_READY_LABEL_KEY: "false" + } + assert fake_attached_node_base.status + fake_attached_node_base.status.state = NodeState.ready + fake_hot_buffer_nodes = [] + for i in range(num_hot_buffer): + node = fake_attached_node_base.model_copy(deep=True) + assert node.description + node.description.hostname = node_host_name_from_ec2_private_dns( + spied_cluster.pending_ec2s[i].ec2_instance + ) + fake_hot_buffer_nodes.append(node) + auto_scaling_mode = DynamicAutoscaling() + mocker.patch.object( + auto_scaling_mode, + "get_monitored_nodes", + autospec=True, + return_value=fake_hot_buffer_nodes, + ) + + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + await assert_autoscaled_dynamic_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=num_hot_buffer, + expected_instance_type=hot_buffer_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + instance_filters=instance_type_filters, + ) + spied_cluster = assert_cluster_state( + spied_cluster_analysis, expected_calls=1, expected_num_machines=5 + ) + assert len(spied_cluster.buffer_drained_nodes) == num_hot_buffer + assert not spied_cluster.buffer_ec2s + + # have a few warm buffers ready with the same type as the hot buffer machines + await create_buffer_machines( + buffer_count, + hot_buffer_instance_type, + "stopped", + None, + ) + await assert_autoscaled_dynamic_warm_pools_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=buffer_count, + expected_instance_type=hot_buffer_instance_type, + expected_instance_state="stopped", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_pre_pulled_images=None, + instance_filters=stopped_instance_type_filters, + ) + + # calling again should do nothing + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + await assert_autoscaled_dynamic_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=num_hot_buffer, + expected_instance_type=hot_buffer_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + instance_filters=instance_type_filters, + ) + await assert_autoscaled_dynamic_warm_pools_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=buffer_count, + expected_instance_type=hot_buffer_instance_type, + expected_instance_state="stopped", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_pre_pulled_images=None, + instance_filters=stopped_instance_type_filters, + ) + spied_cluster = assert_cluster_state( + spied_cluster_analysis, expected_calls=1, expected_num_machines=5 + ) + assert len(spied_cluster.buffer_drained_nodes) == num_hot_buffer + assert len(spied_cluster.buffer_ec2s) == buffer_count + + # + # BUG REPRODUCTION + # + # start a service that imposes same type as the hot buffer + assert ( + hot_buffer_instance_type == "t2.xlarge" + ), "the test is hard-coded for this type and accordingly resource. If this changed then the resource shall be changed too" + scale_up_params = _ScaleUpParams( + imposed_instance_type=hot_buffer_instance_type, + service_resources=Resources( + cpus=2, ram=TypeAdapter(ByteSize).validate_python("1Gib") + ), + num_services=1, + expected_instance_type="t2.xlarge", + expected_num_instances=1, + ) + await create_services_batch(scale_up_params) + + # this should trigger usage of the hot buffer and the warm buffers should replace the hot buffer + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + await assert_autoscaled_dynamic_ec2_instances( + ec2_client, + expected_num_reservations=2, + check_reservation_index=0, + expected_num_instances=num_hot_buffer, + expected_instance_type=hot_buffer_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + instance_filters=instance_type_filters, + ) + await assert_autoscaled_dynamic_warm_pools_ec2_instances( + ec2_client, + expected_num_reservations=2, + check_reservation_index=1, + expected_num_instances=1, + expected_instance_type=hot_buffer_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_pre_pulled_images=None, + instance_filters=instance_type_filters, + ) + await assert_autoscaled_dynamic_warm_pools_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=buffer_count - 1, + expected_instance_type=hot_buffer_instance_type, + expected_instance_state="stopped", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_pre_pulled_images=None, + instance_filters=stopped_instance_type_filters, + ) + # simulate one of the hot buffer is not drained anymore and took the pending service + random_fake_node = random.choice(fake_hot_buffer_nodes) # noqa: S311 + random_fake_node.spec.labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "true" + random_fake_node.spec.labels[ + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY + ] = arrow.utcnow().isoformat() + random_fake_node.spec.availability = Availability.active + # simulate the fact that the warm buffer that just started is not yet visible + mock_find_node_with_name_returns_fake_node.return_value = None + + # get the new analysis + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) + spied_cluster = assert_cluster_state( + spied_cluster_analysis, expected_calls=2, expected_num_machines=6 + ) + assert len(spied_cluster.buffer_drained_nodes) == num_hot_buffer - 1 + assert len(spied_cluster.buffer_ec2s) == buffer_count - 1 + assert len(spied_cluster.active_nodes) == 1 + assert len(spied_cluster.pending_ec2s) == 1 + + # running it again shall do nothing + @tenacity.retry( + retry=tenacity.retry_always, + reraise=True, + wait=tenacity.wait_fixed(0.1), + stop=tenacity.stop_after_attempt(10), + ) + async def _check_autoscaling_is_stable() -> None: + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=auto_scaling_mode + ) + spied_cluster = assert_cluster_state( + spied_cluster_analysis, expected_calls=1, expected_num_machines=6 + ) + assert len(spied_cluster.buffer_drained_nodes) == num_hot_buffer - 1 + assert len(spied_cluster.buffer_ec2s) == buffer_count - 1 + assert len(spied_cluster.active_nodes) == 1 + assert len(spied_cluster.pending_ec2s) == 1 + + with pytest.raises(tenacity.RetryError): + await _check_autoscaling_is_stable()