Skip to content

🐛Autoscaling: Fix issue where all warm buffers would be used to replace hot buffers (🚨) #7079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions ci/github/helpers/install_aws_cli_v2.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
#
# Installs the latest version of AWS CLI V2
#

# http://redsymbol.net/articles/unofficial-bash-strict-mode/
set -o errexit # abort on nonzero exitstatus
set -o nounset # abort on unbound variable
set -o pipefail # don't hide errors within pipes
IFS=$'\n\t'

AWS_CLI_VERSION="2.11.11"
ARCH="x86_64"

curl "https://awscli.amazonaws.com/awscli-exe-linux-${ARCH}-${AWS_CLI_VERSION}.zip" --output "awscliv2.zip" &&
apt-get update &&
apt-get install -y unzip &&
unzip awscliv2.zip &&
./aws/install --update &&
apt-get remove --purge -y unzip &&
rm awscliv2.zip &&
rm -rf awscliv2
1 change: 1 addition & 0 deletions ci/github/integration-testing/simcore-sdk.bash
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ install() {
make devenv
# shellcheck source=/dev/null
source .venv/bin/activate
sudo ./ci/github/helpers/install_aws_cli_v2.bash
pushd packages/simcore-sdk
make install-ci
popd
Expand Down
12 changes: 6 additions & 6 deletions packages/aws-library/src/aws_library/ec2/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ async def launch_instances(
with log_context(
_logger,
logging.INFO,
msg=f"launching {number_of_instances} AWS instance(s) {instance_config.type.name} with {instance_config.tags=}",
msg=f"launch {number_of_instances} AWS instance(s) {instance_config.type.name} with {instance_config.tags=}",
):
# first check the max amount is not already reached
current_instances = await self.get_instances(
Expand Down Expand Up @@ -277,7 +277,7 @@ async def start_instances(
with log_context(
_logger,
logging.INFO,
msg=f"starting instances {instance_ids}",
msg=f"start instances {instance_ids}",
):
await self.client.start_instances(InstanceIds=instance_ids)
# wait for the instance to be in a pending state
Expand Down Expand Up @@ -310,7 +310,7 @@ async def stop_instances(self, instance_datas: Iterable[EC2InstanceData]) -> Non
with log_context(
_logger,
logging.INFO,
msg=f"stopping instances {[i.id for i in instance_datas]}",
msg=f"stop instances {[i.id for i in instance_datas]}",
):
await self.client.stop_instances(InstanceIds=[i.id for i in instance_datas])

Expand All @@ -321,7 +321,7 @@ async def terminate_instances(
with log_context(
_logger,
logging.INFO,
msg=f"terminating instances {[i.id for i in instance_datas]}",
msg=f"terminate instances {[i.id for i in instance_datas]}",
):
await self.client.terminate_instances(
InstanceIds=[i.id for i in instance_datas]
Expand All @@ -335,7 +335,7 @@ async def set_instances_tags(
with log_context(
_logger,
logging.DEBUG,
msg=f"setting {tags=} on instances '[{[i.id for i in instances]}]'",
msg=f"set {tags=} on instances '[{[i.id for i in instances]}]'",
):
await self.client.create_tags(
Resources=[i.id for i in instances],
Expand All @@ -357,7 +357,7 @@ async def remove_instances_tags(
with log_context(
_logger,
logging.DEBUG,
msg=f"removing {tag_keys=} of instances '[{[i.id for i in instances]}]'",
msg=f"removal of {tag_keys=} from instances '[{[i.id for i in instances]}]'",
):
await self.client.delete_tags(
Resources=[i.id for i in instances],
Expand Down
164 changes: 106 additions & 58 deletions packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
from models_library.docker import DockerGenericTag
from types_aiobotocore_ec2 import EC2Client
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType
from types_aiobotocore_ec2.type_defs import FilterTypeDef, InstanceTypeDef, TagTypeDef
from types_aiobotocore_ec2.type_defs import (
FilterTypeDef,
InstanceTypeDef,
ReservationTypeDef,
TagTypeDef,
)


async def assert_autoscaled_computational_ec2_instances(
Expand Down Expand Up @@ -43,6 +48,7 @@ async def assert_autoscaled_dynamic_ec2_instances(
expected_additional_tag_keys: list[str],
instance_filters: Sequence[FilterTypeDef] | None,
expected_user_data: list[str] | None = None,
check_reservation_index: int | None = None,
) -> list[InstanceTypeDef]:
if expected_user_data is None:
expected_user_data = ["docker swarm join"]
Expand All @@ -59,6 +65,7 @@ async def assert_autoscaled_dynamic_ec2_instances(
],
expected_user_data=expected_user_data,
instance_filters=instance_filters,
check_reservation_index=check_reservation_index,
)


Expand All @@ -72,6 +79,7 @@ async def assert_autoscaled_dynamic_warm_pools_ec2_instances(
expected_additional_tag_keys: list[str],
expected_pre_pulled_images: list[DockerGenericTag] | None,
instance_filters: Sequence[FilterTypeDef] | None,
check_reservation_index: int | None = None,
) -> list[InstanceTypeDef]:
return await assert_ec2_instances(
ec2_client,
Expand All @@ -88,9 +96,80 @@ async def assert_autoscaled_dynamic_warm_pools_ec2_instances(
expected_pre_pulled_images=expected_pre_pulled_images,
expected_user_data=[],
instance_filters=instance_filters,
check_reservation_index=check_reservation_index,
)


async def _assert_reservation(
ec2_client: EC2Client,
reservation: ReservationTypeDef,
*,
expected_num_instances: int,
expected_instance_type: InstanceTypeType,
expected_instance_state: InstanceStateNameType,
expected_instance_tag_keys: list[str],
expected_user_data: list[str],
expected_pre_pulled_images: list[DockerGenericTag] | None,
) -> list[InstanceTypeDef]:
list_instances: list[InstanceTypeDef] = []
assert "Instances" in reservation
assert (
len(reservation["Instances"]) == expected_num_instances
), f"expected {expected_num_instances}, found {len(reservation['Instances'])}"
for instance in reservation["Instances"]:
assert "InstanceType" in instance
assert instance["InstanceType"] == expected_instance_type
assert "Tags" in instance
assert instance["Tags"]
expected_tag_keys = {
*expected_instance_tag_keys,
"io.simcore.autoscaling.version",
"Name",
}
instance_tag_keys = {tag["Key"] for tag in instance["Tags"] if "Key" in tag}
assert instance_tag_keys == expected_tag_keys

if expected_pre_pulled_images is None:
assert "io.simcore.autoscaling.pre_pulled_images" not in instance_tag_keys
else:
assert "io.simcore.autoscaling.pre_pulled_images" in instance_tag_keys

def _by_pre_pull_image(ec2_tag: TagTypeDef) -> bool:
assert "Key" in ec2_tag
return ec2_tag["Key"] == "io.simcore.autoscaling.pre_pulled_images"

instance_pre_pulled_images_aws_tag = next(
iter(filter(_by_pre_pull_image, instance["Tags"]))
)
assert "Value" in instance_pre_pulled_images_aws_tag
assert (
instance_pre_pulled_images_aws_tag["Value"]
== f"{json_dumps(expected_pre_pulled_images)}"
)

assert "PrivateDnsName" in instance
instance_private_dns_name = instance["PrivateDnsName"]
if expected_instance_state not in ["terminated"]:
# NOTE: moto behaves here differently than AWS by still returning an IP which does not really make sense
assert instance_private_dns_name.endswith(".ec2.internal")
assert "State" in instance
state = instance["State"]
assert "Name" in state
assert state["Name"] == expected_instance_state

assert "InstanceId" in instance
user_data = await ec2_client.describe_instance_attribute(
Attribute="userData", InstanceId=instance["InstanceId"]
)
assert "UserData" in user_data
assert "Value" in user_data["UserData"]
user_data = base64.b64decode(user_data["UserData"]["Value"]).decode()
for user_data_string in expected_user_data:
assert user_data.count(user_data_string) == 1
list_instances.append(instance)
return list_instances


async def assert_ec2_instances(
ec2_client: EC2Client,
*,
Expand All @@ -102,66 +181,35 @@ async def assert_ec2_instances(
expected_user_data: list[str],
expected_pre_pulled_images: list[DockerGenericTag] | None = None,
instance_filters: Sequence[FilterTypeDef] | None = None,
check_reservation_index: int | None = None,
) -> list[InstanceTypeDef]:
list_instances: list[InstanceTypeDef] = []
all_instances = await ec2_client.describe_instances(Filters=instance_filters or [])
assert len(all_instances["Reservations"]) == expected_num_reservations
if check_reservation_index is not None:
assert check_reservation_index < len(all_instances["Reservations"])
reservation = all_instances["Reservations"][check_reservation_index]
return await _assert_reservation(
ec2_client,
reservation,
expected_num_instances=expected_num_instances,
expected_instance_type=expected_instance_type,
expected_instance_state=expected_instance_state,
expected_instance_tag_keys=expected_instance_tag_keys,
expected_user_data=expected_user_data,
expected_pre_pulled_images=expected_pre_pulled_images,
)
list_instances: list[InstanceTypeDef] = []
for reservation in all_instances["Reservations"]:
assert "Instances" in reservation
assert (
len(reservation["Instances"]) == expected_num_instances
), f"expected {expected_num_instances}, found {len(reservation['Instances'])}"
for instance in reservation["Instances"]:
assert "InstanceType" in instance
assert instance["InstanceType"] == expected_instance_type
assert "Tags" in instance
assert instance["Tags"]
expected_tag_keys = {
*expected_instance_tag_keys,
"io.simcore.autoscaling.version",
"Name",
}
instance_tag_keys = {tag["Key"] for tag in instance["Tags"] if "Key" in tag}
assert instance_tag_keys == expected_tag_keys

if expected_pre_pulled_images is None:
assert (
"io.simcore.autoscaling.pre_pulled_images" not in instance_tag_keys
)
else:
assert "io.simcore.autoscaling.pre_pulled_images" in instance_tag_keys

def _by_pre_pull_image(ec2_tag: TagTypeDef) -> bool:
assert "Key" in ec2_tag
return ec2_tag["Key"] == "io.simcore.autoscaling.pre_pulled_images"

instance_pre_pulled_images_aws_tag = next(
iter(filter(_by_pre_pull_image, instance["Tags"]))
)
assert "Value" in instance_pre_pulled_images_aws_tag
assert (
instance_pre_pulled_images_aws_tag["Value"]
== f"{json_dumps(expected_pre_pulled_images)}"
)

assert "PrivateDnsName" in instance
instance_private_dns_name = instance["PrivateDnsName"]
if expected_instance_state not in ["terminated"]:
# NOTE: moto behaves here differently than AWS by still returning an IP which does not really make sense
assert instance_private_dns_name.endswith(".ec2.internal")
assert "State" in instance
state = instance["State"]
assert "Name" in state
assert state["Name"] == expected_instance_state

assert "InstanceId" in instance
user_data = await ec2_client.describe_instance_attribute(
Attribute="userData", InstanceId=instance["InstanceId"]
list_instances.extend(
await _assert_reservation(
ec2_client,
reservation,
expected_num_instances=expected_num_instances,
expected_instance_type=expected_instance_type,
expected_instance_state=expected_instance_state,
expected_instance_tag_keys=expected_instance_tag_keys,
expected_user_data=expected_user_data,
expected_pre_pulled_images=expected_pre_pulled_images,
)
assert "UserData" in user_data
assert "Value" in user_data["UserData"]
user_data = base64.b64decode(user_data["UserData"]["Value"]).decode()
for user_data_string in expected_user_data:
assert user_data.count(user_data_string) == 1
list_instances.append(instance)
)
return list_instances
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,9 @@ async def _activate_drained_nodes(
return cluster

with log_context(
_logger, logging.INFO, f"activate {len(nodes_to_activate)} drained nodes"
_logger,
logging.INFO,
f"activate {len(nodes_to_activate)} drained nodes {[n.ec2_instance.id for n in nodes_to_activate]}",
):
activated_nodes = await asyncio.gather(
*(
Expand Down Expand Up @@ -447,9 +449,19 @@ async def _start_warm_buffer_instances(
if (warm_buffer.ec2_instance.type == hot_buffer_instance_type)
and not warm_buffer.assigned_tasks
]
# check there are no empty pending ec2s/nodes that are not assigned to any task
unnassigned_pending_ec2s = [
i.ec2_instance for i in cluster.pending_ec2s if not i.assigned_tasks
]
unnassigned_pending_nodes = [
i.ec2_instance for i in cluster.pending_nodes if not i.assigned_tasks
]

instances_to_start += free_startable_warm_buffers_to_replace_hot_buffers[
: app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
- len(cluster.buffer_drained_nodes)
- len(unnassigned_pending_ec2s)
- len(unnassigned_pending_nodes)
]

if not instances_to_start:
Expand Down Expand Up @@ -983,11 +995,14 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster:
new_terminating_instances = []
for instance in await _find_terminateable_instances(app, cluster):
assert instance.node.description is not None # nosec
with log_context(
_logger,
logging.INFO,
msg=f"termination process for {instance.node.description.hostname}:{instance.ec2_instance.id}",
), log_catch(_logger, reraise=False):
with (
log_context(
_logger,
logging.INFO,
msg=f"termination process for {instance.node.description.hostname}:{instance.ec2_instance.id}",
),
log_catch(_logger, reraise=False),
):
await utils_docker.set_node_begin_termination_process(
get_docker_client(app), instance.node
)
Expand Down Expand Up @@ -1232,7 +1247,6 @@ async def _autoscale_cluster(
async def _notify_autoscaling_status(
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
) -> None:

monitored_instances = list(
itertools.chain(
cluster.active_nodes, cluster.drained_nodes, cluster.buffer_drained_nodes
Expand Down
11 changes: 10 additions & 1 deletion services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,15 @@ def with_instances_machines_hot_buffer(
)


@pytest.fixture
def hot_buffer_instance_type(app_settings: ApplicationSettings) -> InstanceTypeType:
assert app_settings.AUTOSCALING_EC2_INSTANCES
return cast(
InstanceTypeType,
next(iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)),
)


@pytest.fixture
def mock_find_node_with_name_returns_none(mocker: MockerFixture) -> Iterator[mock.Mock]:
return mocker.patch(
Expand Down Expand Up @@ -1093,7 +1102,7 @@ def ec2_instances_allowed_types_with_only_1_buffered(
"t2.micro": EC2InstanceBootSpecific(
ami_id=faker.pystr(),
pre_pull_images=fake_pre_pull_images,
buffer_count=faker.pyint(min_value=1, max_value=10),
buffer_count=faker.pyint(min_value=2, max_value=10),
)
}

Expand Down
Loading
Loading