Skip to content

Commit 7336ce5

Browse files
authored
✨Autoscaling: add buffer metrics (#6260)
1 parent aead556 commit 7336ce5

23 files changed

+666
-370
lines changed

packages/aws-library/requirements/_base.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
aioboto3
1010
aiocache
11+
arrow
1112
pydantic[email]
1213
types-aiobotocore[ec2,s3,ssm]
1314
sh

packages/aws-library/requirements/_base.txt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,7 @@ arrow==1.3.0
4141
# -r requirements/../../../packages/models-library/requirements/_base.in
4242
# -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in
4343
# -r requirements/../../../packages/service-library/requirements/_base.in
44-
async-timeout==4.0.3
45-
# via
46-
# aiohttp
47-
# redis
44+
# -r requirements/_base.in
4845
attrs==24.2.0
4946
# via
5047
# aiohttp
@@ -65,8 +62,6 @@ dnspython==2.6.1
6562
# via email-validator
6663
email-validator==2.2.0
6764
# via pydantic
68-
exceptiongroup==1.2.2
69-
# via anyio
7065
fast-depends==2.4.8
7166
# via faststream
7267
faststream==0.5.18
@@ -201,7 +196,6 @@ types-python-dateutil==2.9.0.20240821
201196
typing-extensions==4.12.2
202197
# via
203198
# aiodebug
204-
# anyio
205199
# faststream
206200
# pydantic
207201
# typer

packages/aws-library/requirements/_test.txt

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,6 @@ cryptography==43.0.0
5858
# moto
5959
docker==7.1.0
6060
# via moto
61-
exceptiongroup==1.2.2
62-
# via
63-
# -c requirements/_base.txt
64-
# pytest
6561
faker==27.0.0
6662
# via -r requirements/_test.in
6763
flask==3.0.3
@@ -246,10 +242,6 @@ sympy==1.13.2
246242
# via cfn-lint
247243
termcolor==2.4.0
248244
# via pytest-sugar
249-
tomli==2.0.1
250-
# via
251-
# coverage
252-
# pytest
253245
types-aioboto3==13.1.1
254246
# via -r requirements/_test.in
255247
types-aiobotocore==2.13.2

packages/aws-library/requirements/_tools.txt

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,22 +72,12 @@ setuptools==73.0.1
7272
# via
7373
# -c requirements/_test.txt
7474
# pip-tools
75-
tomli==2.0.1
76-
# via
77-
# -c requirements/_test.txt
78-
# black
79-
# build
80-
# mypy
81-
# pip-tools
82-
# pylint
8375
tomlkit==0.13.2
8476
# via pylint
8577
typing-extensions==4.12.2
8678
# via
8779
# -c requirements/_base.txt
8880
# -c requirements/_test.txt
89-
# astroid
90-
# black
9181
# mypy
9282
virtualenv==20.26.3
9383
# via pre-commit

packages/aws-library/src/aws_library/ssm/_client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import contextlib
2+
import datetime
23
import logging
34
from collections.abc import Sequence
45
from dataclasses import dataclass
56
from typing import Final, cast
67

78
import aioboto3
9+
import arrow
810
import botocore
911
import botocore.exceptions
1012
from aiobotocore.session import ClientCreatorContext
@@ -31,6 +33,8 @@ class SSMCommand:
3133
command_id: str
3234
instance_ids: Sequence[str]
3335
status: CommandStatusType
36+
start_time: datetime.datetime | None
37+
finish_time: datetime.datetime | None
3438
message: str | None = None
3539

3640

@@ -89,12 +93,15 @@ async def send_command(
8993
assert "Comment" in response["Command"] # nosec
9094
assert "CommandId" in response["Command"] # nosec
9195
assert "Status" in response["Command"] # nosec
96+
assert "RequestedDateTime" in response["Command"] # nosec
9297

9398
return SSMCommand(
9499
name=response["Command"]["Comment"],
95100
command_id=response["Command"]["CommandId"],
96101
status=response["Command"]["Status"],
97102
instance_ids=instance_ids,
103+
start_time=None,
104+
finish_time=None,
98105
)
99106

100107
@log_decorator(_logger, logging.DEBUG)
@@ -111,6 +118,8 @@ async def get_command(self, instance_id: str, *, command_id: str) -> SSMCommand:
111118
instance_ids=[response["InstanceId"]],
112119
status=response["Status"] if response["Status"] != "Delayed" else "Pending",
113120
message=response["StatusDetails"],
121+
start_time=arrow.get(response["ExecutionStartDateTime"]).datetime,
122+
finish_time=arrow.get(response["ExecutionEndDateTime"]).datetime,
114123
)
115124

116125
@log_decorator(_logger, logging.DEBUG)

packages/aws-library/tests/test_ssm_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ async def test_send_command(
112112
assert dataclasses.asdict(got) == {
113113
**dataclasses.asdict(sent_command),
114114
"message": "Success",
115+
"start_time": got.start_time,
116+
"finish_time": got.finish_time,
115117
}
116118
with pytest.raises(SSMInvalidCommandError):
117119
await simcore_ssm_api.get_command(

services/autoscaling/src/simcore_service_autoscaling/models.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class Cluster: # pylint: disable=too-many-instance-attributes
6868
"description": "This is a EC2-backed docker node which is drained (cannot accept tasks)"
6969
}
7070
)
71-
reserve_drained_nodes: list[AssociatedInstance] = field(
71+
buffer_drained_nodes: list[AssociatedInstance] = field(
7272
metadata={
7373
"description": "This is a EC2-backed docker node which is drained in the reserve if this is enabled (with no tasks)"
7474
}
@@ -115,7 +115,7 @@ def total_number_of_machines(self) -> int:
115115
len(self.active_nodes)
116116
+ len(self.pending_nodes)
117117
+ len(self.drained_nodes)
118-
+ len(self.reserve_drained_nodes)
118+
+ len(self.buffer_drained_nodes)
119119
+ len(self.pending_ec2s)
120120
+ len(self.broken_ec2s)
121121
+ len(self.terminating_nodes)
@@ -131,7 +131,7 @@ def _get_instance_ids(
131131
f"Cluster(active-nodes: count={len(self.active_nodes)} {_get_instance_ids(self.active_nodes)}, "
132132
f"pending-nodes: count={len(self.pending_nodes)} {_get_instance_ids(self.pending_nodes)}, "
133133
f"drained-nodes: count={len(self.drained_nodes)} {_get_instance_ids(self.drained_nodes)}, "
134-
f"reserve-drained-nodes: count={len(self.reserve_drained_nodes)} {_get_instance_ids(self.reserve_drained_nodes)}, "
134+
f"reserve-drained-nodes: count={len(self.buffer_drained_nodes)} {_get_instance_ids(self.buffer_drained_nodes)}, "
135135
f"pending-ec2s: count={len(self.pending_ec2s)} {_get_instance_ids(self.pending_ec2s)}, "
136136
f"broken-ec2s: count={len(self.broken_ec2s)} {_get_instance_ids(self.broken_ec2s)}, "
137137
f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, "

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,14 @@ async def _analyze_current_cluster(
139139
else:
140140
pending_nodes.append(instance)
141141

142-
drained_nodes, reserve_drained_nodes, terminating_nodes = sort_drained_nodes(
142+
drained_nodes, buffer_drained_nodes, terminating_nodes = sort_drained_nodes(
143143
app_settings, all_drained_nodes, allowed_instance_types
144144
)
145145
cluster = Cluster(
146146
active_nodes=active_nodes,
147147
pending_nodes=pending_nodes,
148148
drained_nodes=drained_nodes,
149-
reserve_drained_nodes=reserve_drained_nodes,
149+
buffer_drained_nodes=buffer_drained_nodes,
150150
pending_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in pending_ec2s],
151151
broken_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in broken_ec2s],
152152
buffer_ec2s=[
@@ -285,19 +285,19 @@ async def _try_attach_pending_ec2s(
285285
)
286286
else:
287287
still_pending_ec2s.append(instance_data)
288-
except Ec2InvalidDnsNameError: # noqa: PERF203
288+
except Ec2InvalidDnsNameError:
289289
_logger.exception("Unexpected EC2 private dns")
290290
# NOTE: first provision the reserve drained nodes if possible
291291
all_drained_nodes = (
292-
cluster.drained_nodes + cluster.reserve_drained_nodes + new_found_instances
292+
cluster.drained_nodes + cluster.buffer_drained_nodes + new_found_instances
293293
)
294-
drained_nodes, reserve_drained_nodes, _ = sort_drained_nodes(
294+
drained_nodes, buffer_drained_nodes, _ = sort_drained_nodes(
295295
app_settings, all_drained_nodes, allowed_instance_types
296296
)
297297
return dataclasses.replace(
298298
cluster,
299299
drained_nodes=drained_nodes,
300-
reserve_drained_nodes=reserve_drained_nodes,
300+
buffer_drained_nodes=buffer_drained_nodes,
301301
pending_ec2s=still_pending_ec2s,
302302
)
303303

@@ -359,9 +359,7 @@ async def _activate_drained_nodes(
359359
) -> Cluster:
360360
nodes_to_activate = [
361361
node
362-
for node in itertools.chain(
363-
cluster.drained_nodes, cluster.reserve_drained_nodes
364-
)
362+
for node in itertools.chain(cluster.drained_nodes, cluster.buffer_drained_nodes)
365363
if node.assigned_tasks
366364
]
367365

@@ -380,14 +378,14 @@ async def _activate_drained_nodes(
380378
]
381379
remaining_reserved_drained_nodes = [
382380
node
383-
for node in cluster.reserve_drained_nodes
381+
for node in cluster.buffer_drained_nodes
384382
if node.ec2_instance.id not in new_active_node_ids
385383
]
386384
return dataclasses.replace(
387385
cluster,
388386
active_nodes=cluster.active_nodes + nodes_to_activate,
389387
drained_nodes=remaining_drained_nodes,
390-
reserve_drained_nodes=remaining_reserved_drained_nodes,
388+
buffer_drained_nodes=remaining_reserved_drained_nodes,
391389
)
392390

393391

@@ -490,7 +488,7 @@ async def _assign_tasks_to_current_cluster(
490488
),
491489
lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance(
492490
task,
493-
instances=cluster.drained_nodes + cluster.reserve_drained_nodes,
491+
instances=cluster.drained_nodes + cluster.buffer_drained_nodes,
494492
task_required_ec2_instance=required_ec2,
495493
task_required_resources=required_resources,
496494
),
@@ -620,7 +618,7 @@ async def _find_needed_instances(
620618
if (
621619
num_missing_nodes := (
622620
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
623-
- len(cluster.reserve_drained_nodes)
621+
- len(cluster.buffer_drained_nodes)
624622
)
625623
) > 0:
626624
# check if some are already pending
@@ -629,7 +627,7 @@ async def _find_needed_instances(
629627
] + [i.ec2_instance for i in cluster.pending_nodes if not i.assigned_tasks]
630628
if len(remaining_pending_instances) < (
631629
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
632-
- len(cluster.reserve_drained_nodes)
630+
- len(cluster.buffer_drained_nodes)
633631
):
634632
default_instance_type = get_machine_buffer_type(available_ec2_types)
635633
num_instances_per_type[default_instance_type] += num_missing_nodes
@@ -1085,7 +1083,7 @@ async def _autoscale_cluster(
10851083
app_settings = get_application_settings(app)
10861084
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
10871085
if queued_or_missing_instance_tasks or (
1088-
len(cluster.reserve_drained_nodes)
1086+
len(cluster.buffer_drained_nodes)
10891087
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
10901088
):
10911089
if (
@@ -1126,7 +1124,7 @@ async def _notify_autoscaling_status(
11261124

11271125
monitored_instances = list(
11281126
itertools.chain(
1129-
cluster.active_nodes, cluster.drained_nodes, cluster.reserve_drained_nodes
1127+
cluster.active_nodes, cluster.drained_nodes, cluster.buffer_drained_nodes
11301128
)
11311129
)
11321130

@@ -1147,7 +1145,7 @@ async def _notify_autoscaling_status(
11471145
)
11481146
# prometheus instrumentation
11491147
if has_instrumentation(app):
1150-
get_instrumentation(app).update_from_cluster(cluster)
1148+
get_instrumentation(app).cluster_metrics.update_from_cluster(cluster)
11511149

11521150

11531151
async def auto_scale_cluster(

services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_core.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
from fastapi import FastAPI
3434
from pydantic import NonNegativeInt
3535
from servicelib.logging_utils import log_context
36+
from simcore_service_autoscaling.modules.instrumentation import (
37+
get_instrumentation,
38+
has_instrumentation,
39+
)
3640
from types_aiobotocore_ec2.literals import InstanceTypeType
3741

3842
from ..constants import (
@@ -68,6 +72,14 @@ async def _analyze_running_instance_state(
6872
elif await ssm_client.is_instance_connected_to_ssm_server(instance.id):
6973
try:
7074
if await ssm_client.wait_for_has_instance_completed_cloud_init(instance.id):
75+
if has_instrumentation(app):
76+
get_instrumentation(
77+
app
78+
).buffer_machines_pools_metrics.instances_ready_to_pull_seconds.labels(
79+
instance_type=instance.type
80+
).observe(
81+
(arrow.utcnow().datetime - instance.launch_time).total_seconds()
82+
)
7183
if app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[
7284
instance.type
7385
].pre_pull_images:
@@ -314,6 +326,18 @@ async def _handle_pool_image_pulling(
314326
)
315327
match ssm_command.status:
316328
case "Success":
329+
if has_instrumentation(app):
330+
assert ssm_command.start_time is not None # nosec
331+
assert ssm_command.finish_time is not None # nosec
332+
get_instrumentation(
333+
app
334+
).buffer_machines_pools_metrics.instances_completed_pulling_seconds.labels(
335+
instance_type=instance.type
336+
).observe(
337+
(
338+
ssm_command.finish_time - ssm_command.start_time
339+
).total_seconds()
340+
)
317341
instances_to_stop.add(instance)
318342
case "InProgress" | "Pending":
319343
# do nothing we pass
@@ -409,3 +433,9 @@ async def monitor_buffer_machines(
409433

410434
# 4. pull docker images if needed
411435
await _handle_image_pre_pulling(app, buffers_manager)
436+
437+
# 5. instrumentation
438+
if has_instrumentation(app):
439+
get_instrumentation(
440+
app
441+
).buffer_machines_pools_metrics.update_from_buffer_pool_manager(buffers_manager)

services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ async def _startup() -> None:
4242

4343
def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]:
4444
async def _stop() -> None:
45-
await stop_periodic_task(app.state.autoscaler_task)
4645
if hasattr(app.state, "buffers_pool_task"):
4746
await stop_periodic_task(app.state.buffers_pool_task)
4847

0 commit comments

Comments
 (0)