Skip to content

Commit f318930

Browse files
sandereggmatusdrobuliak66
authored andcommitted
🐛Autoscaling: Fix issue where all warm buffers would be used to replace hot buffers (🚨) (#7079)
1 parent bcc8e34 commit f318930

File tree

7 files changed

+443
-72
lines changed

7 files changed

+443
-72
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#!/bin/bash
2+
#
3+
# Installs the latest version of AWS CLI V2
4+
#
5+
6+
# http://redsymbol.net/articles/unofficial-bash-strict-mode/
7+
set -o errexit # abort on nonzero exitstatus
8+
set -o nounset # abort on unbound variable
9+
set -o pipefail # don't hide errors within pipes
10+
IFS=$'\n\t'
11+
12+
AWS_CLI_VERSION="2.11.11"
13+
ARCH="x86_64"
14+
15+
curl "https://awscli.amazonaws.com/awscli-exe-linux-${ARCH}-${AWS_CLI_VERSION}.zip" --output "awscliv2.zip" &&
16+
apt-get update &&
17+
apt-get install -y unzip &&
18+
unzip awscliv2.zip &&
19+
./aws/install --update &&
20+
apt-get remove --purge -y unzip &&
21+
rm awscliv2.zip &&
22+
rm -rf awscliv2

ci/github/integration-testing/simcore-sdk.bash

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ install() {
88
make devenv
99
# shellcheck source=/dev/null
1010
source .venv/bin/activate
11+
sudo ./ci/github/helpers/install_aws_cli_v2.bash
1112
pushd packages/simcore-sdk
1213
make install-ci
1314
popd

packages/aws-library/src/aws_library/ec2/_client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ async def launch_instances(
133133
with log_context(
134134
_logger,
135135
logging.INFO,
136-
msg=f"launching {number_of_instances} AWS instance(s) {instance_config.type.name} with {instance_config.tags=}",
136+
msg=f"launch {number_of_instances} AWS instance(s) {instance_config.type.name} with {instance_config.tags=}",
137137
):
138138
# first check the max amount is not already reached
139139
current_instances = await self.get_instances(
@@ -277,7 +277,7 @@ async def start_instances(
277277
with log_context(
278278
_logger,
279279
logging.INFO,
280-
msg=f"starting instances {instance_ids}",
280+
msg=f"start instances {instance_ids}",
281281
):
282282
await self.client.start_instances(InstanceIds=instance_ids)
283283
# wait for the instance to be in a pending state
@@ -310,7 +310,7 @@ async def stop_instances(self, instance_datas: Iterable[EC2InstanceData]) -> Non
310310
with log_context(
311311
_logger,
312312
logging.INFO,
313-
msg=f"stopping instances {[i.id for i in instance_datas]}",
313+
msg=f"stop instances {[i.id for i in instance_datas]}",
314314
):
315315
await self.client.stop_instances(InstanceIds=[i.id for i in instance_datas])
316316

@@ -321,7 +321,7 @@ async def terminate_instances(
321321
with log_context(
322322
_logger,
323323
logging.INFO,
324-
msg=f"terminating instances {[i.id for i in instance_datas]}",
324+
msg=f"terminate instances {[i.id for i in instance_datas]}",
325325
):
326326
await self.client.terminate_instances(
327327
InstanceIds=[i.id for i in instance_datas]
@@ -335,7 +335,7 @@ async def set_instances_tags(
335335
with log_context(
336336
_logger,
337337
logging.DEBUG,
338-
msg=f"setting {tags=} on instances '[{[i.id for i in instances]}]'",
338+
msg=f"set {tags=} on instances '[{[i.id for i in instances]}]'",
339339
):
340340
await self.client.create_tags(
341341
Resources=[i.id for i in instances],
@@ -357,7 +357,7 @@ async def remove_instances_tags(
357357
with log_context(
358358
_logger,
359359
logging.DEBUG,
360-
msg=f"removing {tag_keys=} of instances '[{[i.id for i in instances]}]'",
360+
msg=f"removal of {tag_keys=} from instances '[{[i.id for i in instances]}]'",
361361
):
362362
await self.client.delete_tags(
363363
Resources=[i.id for i in instances],

packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py

Lines changed: 106 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55
from models_library.docker import DockerGenericTag
66
from types_aiobotocore_ec2 import EC2Client
77
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType
8-
from types_aiobotocore_ec2.type_defs import FilterTypeDef, InstanceTypeDef, TagTypeDef
8+
from types_aiobotocore_ec2.type_defs import (
9+
FilterTypeDef,
10+
InstanceTypeDef,
11+
ReservationTypeDef,
12+
TagTypeDef,
13+
)
914

1015

1116
async def assert_autoscaled_computational_ec2_instances(
@@ -43,6 +48,7 @@ async def assert_autoscaled_dynamic_ec2_instances(
4348
expected_additional_tag_keys: list[str],
4449
instance_filters: Sequence[FilterTypeDef] | None,
4550
expected_user_data: list[str] | None = None,
51+
check_reservation_index: int | None = None,
4652
) -> list[InstanceTypeDef]:
4753
if expected_user_data is None:
4854
expected_user_data = ["docker swarm join"]
@@ -59,6 +65,7 @@ async def assert_autoscaled_dynamic_ec2_instances(
5965
],
6066
expected_user_data=expected_user_data,
6167
instance_filters=instance_filters,
68+
check_reservation_index=check_reservation_index,
6269
)
6370

6471

@@ -72,6 +79,7 @@ async def assert_autoscaled_dynamic_warm_pools_ec2_instances(
7279
expected_additional_tag_keys: list[str],
7380
expected_pre_pulled_images: list[DockerGenericTag] | None,
7481
instance_filters: Sequence[FilterTypeDef] | None,
82+
check_reservation_index: int | None = None,
7583
) -> list[InstanceTypeDef]:
7684
return await assert_ec2_instances(
7785
ec2_client,
@@ -88,9 +96,80 @@ async def assert_autoscaled_dynamic_warm_pools_ec2_instances(
8896
expected_pre_pulled_images=expected_pre_pulled_images,
8997
expected_user_data=[],
9098
instance_filters=instance_filters,
99+
check_reservation_index=check_reservation_index,
91100
)
92101

93102

103+
async def _assert_reservation(
104+
ec2_client: EC2Client,
105+
reservation: ReservationTypeDef,
106+
*,
107+
expected_num_instances: int,
108+
expected_instance_type: InstanceTypeType,
109+
expected_instance_state: InstanceStateNameType,
110+
expected_instance_tag_keys: list[str],
111+
expected_user_data: list[str],
112+
expected_pre_pulled_images: list[DockerGenericTag] | None,
113+
) -> list[InstanceTypeDef]:
114+
list_instances: list[InstanceTypeDef] = []
115+
assert "Instances" in reservation
116+
assert (
117+
len(reservation["Instances"]) == expected_num_instances
118+
), f"expected {expected_num_instances}, found {len(reservation['Instances'])}"
119+
for instance in reservation["Instances"]:
120+
assert "InstanceType" in instance
121+
assert instance["InstanceType"] == expected_instance_type
122+
assert "Tags" in instance
123+
assert instance["Tags"]
124+
expected_tag_keys = {
125+
*expected_instance_tag_keys,
126+
"io.simcore.autoscaling.version",
127+
"Name",
128+
}
129+
instance_tag_keys = {tag["Key"] for tag in instance["Tags"] if "Key" in tag}
130+
assert instance_tag_keys == expected_tag_keys
131+
132+
if expected_pre_pulled_images is None:
133+
assert "io.simcore.autoscaling.pre_pulled_images" not in instance_tag_keys
134+
else:
135+
assert "io.simcore.autoscaling.pre_pulled_images" in instance_tag_keys
136+
137+
def _by_pre_pull_image(ec2_tag: TagTypeDef) -> bool:
138+
assert "Key" in ec2_tag
139+
return ec2_tag["Key"] == "io.simcore.autoscaling.pre_pulled_images"
140+
141+
instance_pre_pulled_images_aws_tag = next(
142+
iter(filter(_by_pre_pull_image, instance["Tags"]))
143+
)
144+
assert "Value" in instance_pre_pulled_images_aws_tag
145+
assert (
146+
instance_pre_pulled_images_aws_tag["Value"]
147+
== f"{json_dumps(expected_pre_pulled_images)}"
148+
)
149+
150+
assert "PrivateDnsName" in instance
151+
instance_private_dns_name = instance["PrivateDnsName"]
152+
if expected_instance_state not in ["terminated"]:
153+
# NOTE: moto behaves here differently than AWS by still returning an IP which does not really make sense
154+
assert instance_private_dns_name.endswith(".ec2.internal")
155+
assert "State" in instance
156+
state = instance["State"]
157+
assert "Name" in state
158+
assert state["Name"] == expected_instance_state
159+
160+
assert "InstanceId" in instance
161+
user_data = await ec2_client.describe_instance_attribute(
162+
Attribute="userData", InstanceId=instance["InstanceId"]
163+
)
164+
assert "UserData" in user_data
165+
assert "Value" in user_data["UserData"]
166+
user_data = base64.b64decode(user_data["UserData"]["Value"]).decode()
167+
for user_data_string in expected_user_data:
168+
assert user_data.count(user_data_string) == 1
169+
list_instances.append(instance)
170+
return list_instances
171+
172+
94173
async def assert_ec2_instances(
95174
ec2_client: EC2Client,
96175
*,
@@ -102,66 +181,35 @@ async def assert_ec2_instances(
102181
expected_user_data: list[str],
103182
expected_pre_pulled_images: list[DockerGenericTag] | None = None,
104183
instance_filters: Sequence[FilterTypeDef] | None = None,
184+
check_reservation_index: int | None = None,
105185
) -> list[InstanceTypeDef]:
106-
list_instances: list[InstanceTypeDef] = []
107186
all_instances = await ec2_client.describe_instances(Filters=instance_filters or [])
108187
assert len(all_instances["Reservations"]) == expected_num_reservations
188+
if check_reservation_index is not None:
189+
assert check_reservation_index < len(all_instances["Reservations"])
190+
reservation = all_instances["Reservations"][check_reservation_index]
191+
return await _assert_reservation(
192+
ec2_client,
193+
reservation,
194+
expected_num_instances=expected_num_instances,
195+
expected_instance_type=expected_instance_type,
196+
expected_instance_state=expected_instance_state,
197+
expected_instance_tag_keys=expected_instance_tag_keys,
198+
expected_user_data=expected_user_data,
199+
expected_pre_pulled_images=expected_pre_pulled_images,
200+
)
201+
list_instances: list[InstanceTypeDef] = []
109202
for reservation in all_instances["Reservations"]:
110-
assert "Instances" in reservation
111-
assert (
112-
len(reservation["Instances"]) == expected_num_instances
113-
), f"expected {expected_num_instances}, found {len(reservation['Instances'])}"
114-
for instance in reservation["Instances"]:
115-
assert "InstanceType" in instance
116-
assert instance["InstanceType"] == expected_instance_type
117-
assert "Tags" in instance
118-
assert instance["Tags"]
119-
expected_tag_keys = {
120-
*expected_instance_tag_keys,
121-
"io.simcore.autoscaling.version",
122-
"Name",
123-
}
124-
instance_tag_keys = {tag["Key"] for tag in instance["Tags"] if "Key" in tag}
125-
assert instance_tag_keys == expected_tag_keys
126-
127-
if expected_pre_pulled_images is None:
128-
assert (
129-
"io.simcore.autoscaling.pre_pulled_images" not in instance_tag_keys
130-
)
131-
else:
132-
assert "io.simcore.autoscaling.pre_pulled_images" in instance_tag_keys
133-
134-
def _by_pre_pull_image(ec2_tag: TagTypeDef) -> bool:
135-
assert "Key" in ec2_tag
136-
return ec2_tag["Key"] == "io.simcore.autoscaling.pre_pulled_images"
137-
138-
instance_pre_pulled_images_aws_tag = next(
139-
iter(filter(_by_pre_pull_image, instance["Tags"]))
140-
)
141-
assert "Value" in instance_pre_pulled_images_aws_tag
142-
assert (
143-
instance_pre_pulled_images_aws_tag["Value"]
144-
== f"{json_dumps(expected_pre_pulled_images)}"
145-
)
146-
147-
assert "PrivateDnsName" in instance
148-
instance_private_dns_name = instance["PrivateDnsName"]
149-
if expected_instance_state not in ["terminated"]:
150-
# NOTE: moto behaves here differently than AWS by still returning an IP which does not really make sense
151-
assert instance_private_dns_name.endswith(".ec2.internal")
152-
assert "State" in instance
153-
state = instance["State"]
154-
assert "Name" in state
155-
assert state["Name"] == expected_instance_state
156-
157-
assert "InstanceId" in instance
158-
user_data = await ec2_client.describe_instance_attribute(
159-
Attribute="userData", InstanceId=instance["InstanceId"]
203+
list_instances.extend(
204+
await _assert_reservation(
205+
ec2_client,
206+
reservation,
207+
expected_num_instances=expected_num_instances,
208+
expected_instance_type=expected_instance_type,
209+
expected_instance_state=expected_instance_state,
210+
expected_instance_tag_keys=expected_instance_tag_keys,
211+
expected_user_data=expected_user_data,
212+
expected_pre_pulled_images=expected_pre_pulled_images,
160213
)
161-
assert "UserData" in user_data
162-
assert "Value" in user_data["UserData"]
163-
user_data = base64.b64decode(user_data["UserData"]["Value"]).decode()
164-
for user_data_string in expected_user_data:
165-
assert user_data.count(user_data_string) == 1
166-
list_instances.append(instance)
214+
)
167215
return list_instances

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,9 @@ async def _activate_drained_nodes(
391391
return cluster
392392

393393
with log_context(
394-
_logger, logging.INFO, f"activate {len(nodes_to_activate)} drained nodes"
394+
_logger,
395+
logging.INFO,
396+
f"activate {len(nodes_to_activate)} drained nodes {[n.ec2_instance.id for n in nodes_to_activate]}",
395397
):
396398
activated_nodes = await asyncio.gather(
397399
*(
@@ -447,9 +449,19 @@ async def _start_warm_buffer_instances(
447449
if (warm_buffer.ec2_instance.type == hot_buffer_instance_type)
448450
and not warm_buffer.assigned_tasks
449451
]
452+
# check there are no empty pending ec2s/nodes that are not assigned to any task
453+
unnassigned_pending_ec2s = [
454+
i.ec2_instance for i in cluster.pending_ec2s if not i.assigned_tasks
455+
]
456+
unnassigned_pending_nodes = [
457+
i.ec2_instance for i in cluster.pending_nodes if not i.assigned_tasks
458+
]
459+
450460
instances_to_start += free_startable_warm_buffers_to_replace_hot_buffers[
451461
: app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
452462
- len(cluster.buffer_drained_nodes)
463+
- len(unnassigned_pending_ec2s)
464+
- len(unnassigned_pending_nodes)
453465
]
454466

455467
if not instances_to_start:
@@ -983,11 +995,14 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster:
983995
new_terminating_instances = []
984996
for instance in await _find_terminateable_instances(app, cluster):
985997
assert instance.node.description is not None # nosec
986-
with log_context(
987-
_logger,
988-
logging.INFO,
989-
msg=f"termination process for {instance.node.description.hostname}:{instance.ec2_instance.id}",
990-
), log_catch(_logger, reraise=False):
998+
with (
999+
log_context(
1000+
_logger,
1001+
logging.INFO,
1002+
msg=f"termination process for {instance.node.description.hostname}:{instance.ec2_instance.id}",
1003+
),
1004+
log_catch(_logger, reraise=False),
1005+
):
9911006
await utils_docker.set_node_begin_termination_process(
9921007
get_docker_client(app), instance.node
9931008
)
@@ -1232,7 +1247,6 @@ async def _autoscale_cluster(
12321247
async def _notify_autoscaling_status(
12331248
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
12341249
) -> None:
1235-
12361250
monitored_instances = list(
12371251
itertools.chain(
12381252
cluster.active_nodes, cluster.drained_nodes, cluster.buffer_drained_nodes

services/autoscaling/tests/unit/conftest.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1023,6 +1023,15 @@ def with_instances_machines_hot_buffer(
10231023
)
10241024

10251025

1026+
@pytest.fixture
1027+
def hot_buffer_instance_type(app_settings: ApplicationSettings) -> InstanceTypeType:
1028+
assert app_settings.AUTOSCALING_EC2_INSTANCES
1029+
return cast(
1030+
InstanceTypeType,
1031+
next(iter(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES)),
1032+
)
1033+
1034+
10261035
@pytest.fixture
10271036
def mock_find_node_with_name_returns_none(mocker: MockerFixture) -> Iterator[mock.Mock]:
10281037
return mocker.patch(
@@ -1093,7 +1102,7 @@ def ec2_instances_allowed_types_with_only_1_buffered(
10931102
"t2.micro": EC2InstanceBootSpecific(
10941103
ami_id=faker.pystr(),
10951104
pre_pull_images=fake_pre_pull_images,
1096-
buffer_count=faker.pyint(min_value=1, max_value=10),
1105+
buffer_count=faker.pyint(min_value=2, max_value=10),
10971106
)
10981107
}
10991108

0 commit comments

Comments
 (0)