Skip to content

Commit 532647b

Browse files
authored
Merge branch 'master' into i4529/api-server-upgrade-to-asyncpg
2 parents 2a965ca + 168a1d6 commit 532647b

File tree

23 files changed

+634
-379
lines changed

23 files changed

+634
-379
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from typing import Final
2+
from uuid import uuid4
3+
4+
from models_library.projects import ProjectID
5+
from models_library.projects_nodes_io import NodeID
6+
from models_library.services_types import ServiceKey, ServiceVersion
7+
from models_library.users import UserID
8+
from pydantic import TypeAdapter
9+
10+
from ..models import DaskJobID
11+
12+
13+
def generate_dask_job_id(
14+
service_key: ServiceKey,
15+
service_version: ServiceVersion,
16+
user_id: UserID,
17+
project_id: ProjectID,
18+
node_id: NodeID,
19+
) -> DaskJobID:
20+
"""creates a dask job id:
21+
The job ID shall contain the user_id, project_id, node_id
22+
Also, it must be unique
23+
and it is shown in the Dask scheduler dashboard website
24+
"""
25+
return DaskJobID(
26+
f"{service_key}:{service_version}:userid_{user_id}:projectid_{project_id}:nodeid_{node_id}:uuid_{uuid4()}"
27+
)
28+
29+
30+
_JOB_ID_PARTS: Final[int] = 6
31+
32+
33+
def parse_dask_job_id(
34+
job_id: str,
35+
) -> tuple[ServiceKey, ServiceVersion, UserID, ProjectID, NodeID]:
36+
parts = job_id.split(":")
37+
assert len(parts) == _JOB_ID_PARTS # nosec
38+
return (
39+
parts[0],
40+
parts[1],
41+
TypeAdapter(UserID).validate_python(parts[2][len("userid_") :]),
42+
ProjectID(parts[3][len("projectid_") :]),
43+
NodeID(parts[4][len("nodeid_") :]),
44+
)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from typing import TypeAlias
2+
3+
DaskJobID: TypeAlias = str
4+
DaskResources: TypeAlias = dict[str, int | float]
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# pylint: disable=too-many-positional-arguments
2+
# pylint:disable=redefined-outer-name
3+
# pylint:disable=too-many-arguments
4+
# pylint:disable=unused-argument
5+
# pylint:disable=unused-variable
6+
7+
import pytest
8+
from dask_task_models_library.container_tasks.utils import (
9+
generate_dask_job_id,
10+
parse_dask_job_id,
11+
)
12+
from faker import Faker
13+
from models_library.projects import ProjectID
14+
from models_library.projects_nodes_io import NodeID
15+
from models_library.services_types import ServiceKey, ServiceVersion
16+
from models_library.users import UserID
17+
from pydantic import TypeAdapter
18+
19+
20+
@pytest.fixture(
21+
params=["simcore/service/comp/some/fake/service/key", "dockerhub-style/service_key"]
22+
)
23+
def service_key(request) -> ServiceKey:
24+
return request.param
25+
26+
27+
@pytest.fixture()
28+
def service_version() -> str:
29+
return "1234.32432.2344"
30+
31+
32+
@pytest.fixture
33+
def user_id(faker: Faker) -> UserID:
34+
return TypeAdapter(UserID).validate_python(faker.pyint(min_value=1))
35+
36+
37+
@pytest.fixture
38+
def project_id(faker: Faker) -> ProjectID:
39+
return ProjectID(faker.uuid4())
40+
41+
42+
@pytest.fixture
43+
def node_id(faker: Faker) -> NodeID:
44+
return NodeID(faker.uuid4())
45+
46+
47+
def test_dask_job_id_serialization(
48+
service_key: ServiceKey,
49+
service_version: ServiceVersion,
50+
user_id: UserID,
51+
project_id: ProjectID,
52+
node_id: NodeID,
53+
):
54+
dask_job_id = generate_dask_job_id(
55+
service_key, service_version, user_id, project_id, node_id
56+
)
57+
(
58+
parsed_service_key,
59+
parsed_service_version,
60+
parsed_user_id,
61+
parsed_project_id,
62+
parsed_node_id,
63+
) = parse_dask_job_id(dask_job_id)
64+
assert service_key == parsed_service_key
65+
assert service_version == parsed_service_version
66+
assert user_id == parsed_user_id
67+
assert project_id == parsed_project_id
68+
assert node_id == parsed_node_id

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from aws_library.ec2._errors import EC2TooManyInstancesError
1919
from fastapi import FastAPI
2020
from models_library.generated_models.docker_rest_api import Node, NodeState
21+
from models_library.rabbitmq_messages import ProgressType
2122
from servicelib.logging_utils import log_catch, log_context
2223
from servicelib.utils import limited_gather
2324
from servicelib.utils_formatting import timedelta_as_minute_second
@@ -51,7 +52,11 @@
5152
get_deactivated_buffer_ec2_tags,
5253
is_buffer_machine,
5354
)
54-
from ..utils.rabbitmq import post_autoscaling_status_message
55+
from ..utils.rabbitmq import (
56+
post_autoscaling_status_message,
57+
post_tasks_log_message,
58+
post_tasks_progress_message,
59+
)
5560
from .auto_scaling_mode_base import BaseAutoscaling
5661
from .docker import get_docker_client
5762
from .ec2 import get_ec2_client
@@ -354,7 +359,6 @@ def _as_selection(instance_type: EC2InstanceType) -> int:
354359

355360
async def _activate_and_notify(
356361
app: FastAPI,
357-
auto_scaling_mode: BaseAutoscaling,
358362
drained_node: AssociatedInstance,
359363
) -> AssociatedInstance:
360364
app_settings = get_application_settings(app)
@@ -363,14 +367,17 @@ async def _activate_and_notify(
363367
utils_docker.set_node_osparc_ready(
364368
app_settings, docker_client, drained_node.node, ready=True
365369
),
366-
auto_scaling_mode.log_message_from_tasks(
370+
post_tasks_log_message(
367371
app,
368-
drained_node.assigned_tasks,
369-
"cluster adjusted, service should start shortly...",
372+
tasks=drained_node.assigned_tasks,
373+
message="cluster adjusted, service should start shortly...",
370374
level=logging.INFO,
371375
),
372-
auto_scaling_mode.progress_message_from_tasks(
373-
app, drained_node.assigned_tasks, progress=1.0
376+
post_tasks_progress_message(
377+
app,
378+
tasks=drained_node.assigned_tasks,
379+
progress=1.0,
380+
progress_type=ProgressType.CLUSTER_UP_SCALING,
374381
),
375382
)
376383
return dataclasses.replace(drained_node, node=updated_node)
@@ -379,7 +386,6 @@ async def _activate_and_notify(
379386
async def _activate_drained_nodes(
380387
app: FastAPI,
381388
cluster: Cluster,
382-
auto_scaling_mode: BaseAutoscaling,
383389
) -> Cluster:
384390
nodes_to_activate = [
385391
node
@@ -396,10 +402,7 @@ async def _activate_drained_nodes(
396402
f"activate {len(nodes_to_activate)} drained nodes {[n.ec2_instance.id for n in nodes_to_activate]}",
397403
):
398404
activated_nodes = await asyncio.gather(
399-
*(
400-
_activate_and_notify(app, auto_scaling_mode, node)
401-
for node in nodes_to_activate
402-
)
405+
*(_activate_and_notify(app, node) for node in nodes_to_activate)
403406
)
404407
new_active_node_ids = {node.ec2_instance.id for node in activated_nodes}
405408
remaining_drained_nodes = [
@@ -787,10 +790,10 @@ async def _launch_instances(
787790
app, needed_instances, new_instance_tags
788791
)
789792
except EC2TooManyInstancesError:
790-
await auto_scaling_mode.log_message_from_tasks(
793+
await post_tasks_log_message(
791794
app,
792-
tasks,
793-
"The maximum number of machines in the cluster was reached. Please wait for your running jobs "
795+
tasks=tasks,
796+
message="The maximum number of machines in the cluster was reached. Please wait for your running jobs "
794797
"to complete and try again later or contact osparc support if this issue does not resolve.",
795798
level=logging.ERROR,
796799
)
@@ -829,10 +832,10 @@ async def _launch_instances(
829832
new_pending_instances: list[EC2InstanceData] = []
830833
for r in results:
831834
if isinstance(r, EC2TooManyInstancesError):
832-
await auto_scaling_mode.log_message_from_tasks(
835+
await post_tasks_log_message(
833836
app,
834-
tasks,
835-
"Exceptionally high load on computational cluster, please try again later.",
837+
tasks=tasks,
838+
message="Exceptionally high load on computational cluster, please try again later.",
836839
level=logging.ERROR,
837840
)
838841
elif isinstance(r, BaseException):
@@ -847,14 +850,14 @@ async def _launch_instances(
847850
f"{sum(n for n in capped_needed_machines.values())} new machines launched"
848851
", it might take up to 3 minutes to start, Please wait..."
849852
)
850-
await auto_scaling_mode.log_message_from_tasks(
851-
app, tasks, log_message, level=logging.INFO
853+
await post_tasks_log_message(
854+
app, tasks=tasks, message=log_message, level=logging.INFO
852855
)
853856
if last_issue:
854-
await auto_scaling_mode.log_message_from_tasks(
857+
await post_tasks_log_message(
855858
app,
856-
tasks,
857-
"Unexpected issues detected, probably due to high load, please contact support",
859+
tasks=tasks,
860+
message="Unexpected issues detected, probably due to high load, please contact support",
858861
level=logging.ERROR,
859862
)
860863

@@ -1064,7 +1067,6 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster:
10641067
async def _notify_based_on_machine_type(
10651068
app: FastAPI,
10661069
instances: list[AssociatedInstance] | list[NonAssociatedInstance],
1067-
auto_scaling_mode: BaseAutoscaling,
10681070
*,
10691071
message: str,
10701072
) -> None:
@@ -1088,24 +1090,22 @@ async def _notify_based_on_machine_type(
10881090
f" est. remaining time: {timedelta_as_minute_second(estimated_time_to_completion)})...please wait..."
10891091
)
10901092
if tasks:
1091-
await auto_scaling_mode.log_message_from_tasks(
1092-
app, tasks, message=msg, level=logging.INFO
1093+
await post_tasks_log_message(
1094+
app, tasks=tasks, message=msg, level=logging.INFO
10931095
)
1094-
await auto_scaling_mode.progress_message_from_tasks(
1096+
await post_tasks_progress_message(
10951097
app,
1096-
tasks,
1098+
tasks=tasks,
10971099
progress=time_since_launch.total_seconds()
10981100
/ instance_max_time_to_start.total_seconds(),
1101+
progress_type=ProgressType.CLUSTER_UP_SCALING,
10991102
)
11001103

11011104

1102-
async def _notify_machine_creation_progress(
1103-
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
1104-
) -> None:
1105+
async def _notify_machine_creation_progress(app: FastAPI, cluster: Cluster) -> None:
11051106
await _notify_based_on_machine_type(
11061107
app,
11071108
cluster.pending_ec2s,
1108-
auto_scaling_mode,
11091109
message="waiting for machine to join cluster",
11101110
)
11111111

@@ -1191,10 +1191,10 @@ async def _scale_up_cluster(
11911191
if needed_ec2_instances := await _find_needed_instances(
11921192
app, unassigned_tasks, allowed_instance_types, cluster, auto_scaling_mode
11931193
):
1194-
await auto_scaling_mode.log_message_from_tasks(
1194+
await post_tasks_log_message(
11951195
app,
1196-
unassigned_tasks,
1197-
"service is pending due to missing resources, scaling up cluster now...",
1196+
tasks=unassigned_tasks,
1197+
message="service is pending due to missing resources, scaling up cluster now...",
11981198
level=logging.INFO,
11991199
)
12001200
new_pending_instances = await _launch_instances(
@@ -1228,7 +1228,7 @@ async def _autoscale_cluster(
12281228
)
12291229

12301230
# 2. activate available drained nodes to cover some of the tasks
1231-
cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode)
1231+
cluster = await _activate_drained_nodes(app, cluster)
12321232

12331233
# 3. start warm buffer instances to cover the remaining tasks
12341234
cluster = await _start_warm_buffer_instances(app, cluster, auto_scaling_mode)
@@ -1301,5 +1301,5 @@ async def auto_scale_cluster(
13011301
)
13021302

13031303
# notify
1304-
await _notify_machine_creation_progress(app, cluster, auto_scaling_mode)
1304+
await _notify_machine_creation_progress(app, cluster)
13051305
await _notify_autoscaling_status(app, cluster, auto_scaling_mode)

0 commit comments

Comments
 (0)