Skip to content

Commit e1110a2

Browse files
authored
Merge branch 'master' into is6318/fix-catalog
2 parents 6005b58 + 1320b69 commit e1110a2

39 files changed

+729
-413
lines changed

.env-devel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ TRAEFIK_API_SERVER_INFLIGHTREQ_AMOUNT=25
2525

2626
AUTOSCALING_DASK=null
2727
AUTOSCALING_DRAIN_NODES_WITH_LABELS=False
28+
AUTOSCALING_DOCKER_JOIN_DRAINED=True
29+
AUTOSCALING_WAIT_FOR_CLOUD_INIT_BEFORE_WARM_BUFFER_ACTIVATION=False
2830
AUTOSCALING_EC2_ACCESS=null
2931
AUTOSCALING_EC2_INSTANCES=null
3032
AUTOSCALING_LOGLEVEL=WARNING

packages/pytest-simcore/src/pytest_simcore/helpers/playwright_sim4life.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
_EC2_STARTUP_MAX_WAIT_TIME + _S4L_DOCKER_PULLING_MAX_TIME + _S4L_MAX_STARTUP_TIME
2929
)
3030
_S4L_STARTUP_SCREEN_MAX_TIME: Final[int] = 45 * SECOND
31-
31+
_S4L_COPY_WORKSPACE_TIME: Final[int] = 60 * SECOND
3232

3333
@dataclass(kw_only=True)
3434
class S4LWaitForWebsocket:
@@ -84,7 +84,7 @@ def __call__(self, message: str) -> bool:
8484
return False
8585

8686

87-
def launch_S4L(page: Page, node_id, log_in_and_out: WebSocket, autoscaled: bool) -> Dict[str, Union[WebSocket, FrameLocator]]:
87+
def launch_S4L(page: Page, node_id, log_in_and_out: WebSocket, autoscaled: bool, copy_workspace: bool = False) -> Dict[str, Union[WebSocket, FrameLocator]]:
8888
with log_context(logging.INFO, "launch S4L") as ctx:
8989
predicate = S4LWaitForWebsocket(logger=ctx.logger)
9090
with page.expect_websocket(
@@ -95,6 +95,11 @@ def launch_S4L(page: Page, node_id, log_in_and_out: WebSocket, autoscaled: bool)
9595
if autoscaled
9696
else _S4L_MAX_STARTUP_TIME
9797
)
98+
+ (
99+
_S4L_COPY_WORKSPACE_TIME
100+
if copy_workspace
101+
else 0
102+
)
98103
+ 10 * SECOND,
99104
) as ws_info:
100105
s4l_iframe = wait_for_service_running(
@@ -105,6 +110,11 @@ def launch_S4L(page: Page, node_id, log_in_and_out: WebSocket, autoscaled: bool)
105110
_S4L_AUTOSCALED_MAX_STARTUP_TIME
106111
if autoscaled
107112
else _S4L_MAX_STARTUP_TIME
113+
)
114+
+ (
115+
_S4L_COPY_WORKSPACE_TIME
116+
if copy_workspace
117+
else 0
108118
),
109119
press_start_button=False,
110120
)

services/autoscaling/src/simcore_service_autoscaling/constants.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@
1212
)
1313
PREPULL_COMMAND_NAME: Final[str] = "docker images pulling"
1414

15-
DOCKER_PULL_COMMAND: Final[
16-
str
17-
] = "docker compose -f /docker-pull.compose.yml -p buffering pull"
15+
DOCKER_JOIN_COMMAND_NAME: Final[str] = "docker swarm join"
16+
DOCKER_JOIN_COMMAND_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
17+
AWSTagKey, "io.simcore.autoscaling.joined_command_sent"
18+
)
19+
20+
21+
DOCKER_PULL_COMMAND: Final[str] = (
22+
"docker compose -f /docker-pull.compose.yml -p buffering pull"
23+
)
1824

1925
PRE_PULLED_IMAGES_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
2026
AWSTagKey, "io.simcore.autoscaling.pre_pulled_images"

services/autoscaling/src/simcore_service_autoscaling/core/settings.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class EC2InstancesSettings(BaseCustomSettings):
112112
)
113113
EC2_INSTANCES_TIME_BEFORE_TERMINATION: datetime.timedelta = Field(
114114
default=datetime.timedelta(minutes=1),
115-
description="Time after which an EC2 instance may being the termination process (0<=T<=59 minutes, is automatically capped)"
115+
description="Time after which an EC2 instance may begin the termination process (0<=T<=59 minutes, is automatically capped)"
116116
"(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)",
117117
)
118118
EC2_INSTANCES_TIME_BEFORE_FINAL_TERMINATION: datetime.timedelta = Field(
@@ -272,6 +272,17 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
272272
"but a docker node label named osparc-services-ready is attached",
273273
)
274274

275+
AUTOSCALING_DOCKER_JOIN_DRAINED: bool = Field(
276+
default=True,
277+
description="If true, new nodes join the swarm as drained. If false as active.",
278+
)
279+
280+
AUTOSCALING_WAIT_FOR_CLOUD_INIT_BEFORE_WARM_BUFFER_ACTIVATION: bool = Field(
281+
default=False,
282+
description="If True, then explicitely wait for cloud-init process to be completed before issuing commands. "
283+
"TIP: might be useful when cheap machines are used",
284+
)
285+
275286
@cached_property
276287
def LOG_LEVEL(self): # noqa: N802
277288
return self.AUTOSCALING_LOGLEVEL

services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
Resources,
1616
)
1717
from aws_library.ec2._errors import EC2TooManyInstancesError
18+
from aws_library.ec2._models import AWSTagValue
1819
from fastapi import FastAPI
1920
from models_library.generated_models.docker_rest_api import Node, NodeState
2021
from servicelib.logging_utils import log_catch, log_context
2122
from servicelib.utils import limited_gather
2223
from servicelib.utils_formatting import timedelta_as_minute_second
24+
from ..constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME
2325
from types_aiobotocore_ec2.literals import InstanceTypeType
2426

2527
from ..core.errors import (
@@ -200,13 +202,17 @@ async def _make_pending_buffer_ec2s_join_cluster(
200202
app: FastAPI,
201203
cluster: Cluster,
202204
) -> Cluster:
205+
ec2_client = get_ec2_client(app)
203206
if buffer_ec2s_pending := [
204207
i.ec2_instance
205208
for i in cluster.pending_ec2s
206209
if is_buffer_machine(i.ec2_instance.tags)
210+
and (DOCKER_JOIN_COMMAND_EC2_TAG_KEY not in i.ec2_instance.tags)
207211
]:
208212
# started buffer instance shall be asked to join the cluster once they are running
213+
app_settings = get_application_settings(app)
209214
ssm_client = get_ssm_client(app)
215+
210216
buffer_ec2_connection_state = await limited_gather(
211217
*[
212218
ssm_client.is_instance_connected_to_ssm_server(i.id)
@@ -223,27 +229,42 @@ async def _make_pending_buffer_ec2s_join_cluster(
223229
)
224230
if c is True
225231
]
226-
buffer_ec2_initialized = await limited_gather(
227-
*[
228-
ssm_client.wait_for_has_instance_completed_cloud_init(i.id)
229-
for i in buffer_ec2_connected_to_ssm_server
230-
],
231-
reraise=False,
232-
log=_logger,
233-
limit=20,
234-
)
235-
buffer_ec2_ready_for_command = [
236-
i
237-
for i, r in zip(
238-
buffer_ec2_connected_to_ssm_server, buffer_ec2_initialized, strict=True
232+
buffer_ec2_ready_for_command = buffer_ec2_connected_to_ssm_server
233+
if app_settings.AUTOSCALING_WAIT_FOR_CLOUD_INIT_BEFORE_WARM_BUFFER_ACTIVATION:
234+
buffer_ec2_initialized = await limited_gather(
235+
*[
236+
ssm_client.wait_for_has_instance_completed_cloud_init(i.id)
237+
for i in buffer_ec2_connected_to_ssm_server
238+
],
239+
reraise=False,
240+
log=_logger,
241+
limit=20,
242+
)
243+
buffer_ec2_ready_for_command = [
244+
i
245+
for i, r in zip(
246+
buffer_ec2_connected_to_ssm_server,
247+
buffer_ec2_initialized,
248+
strict=True,
249+
)
250+
if r is True
251+
]
252+
if buffer_ec2_ready_for_command:
253+
ssm_command = await ssm_client.send_command(
254+
[i.id for i in buffer_ec2_ready_for_command],
255+
command=await utils_docker.get_docker_swarm_join_bash_command(
256+
join_as_drained=app_settings.AUTOSCALING_DOCKER_JOIN_DRAINED
257+
),
258+
command_name=DOCKER_JOIN_COMMAND_NAME,
259+
)
260+
await ec2_client.set_instances_tags(
261+
buffer_ec2_ready_for_command,
262+
tags={
263+
DOCKER_JOIN_COMMAND_EC2_TAG_KEY: AWSTagValue(
264+
ssm_command.command_id
265+
),
266+
},
239267
)
240-
if r is True
241-
]
242-
await ssm_client.send_command(
243-
[i.id for i in buffer_ec2_ready_for_command],
244-
command=await utils_docker.get_docker_swarm_join_bash_command(),
245-
command_name="docker swarm join",
246-
)
247268
return cluster
248269

249270

@@ -308,14 +329,14 @@ async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]:
308329
ec2_client = get_ec2_client(app)
309330

310331
# some instances might be able to run several tasks
311-
allowed_instance_types: list[
312-
EC2InstanceType
313-
] = await ec2_client.get_ec2_instance_capabilities(
314-
cast(
315-
set[InstanceTypeType],
316-
set(
317-
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES,
318-
),
332+
allowed_instance_types: list[EC2InstanceType] = (
333+
await ec2_client.get_ec2_instance_capabilities(
334+
cast(
335+
set[InstanceTypeType],
336+
set(
337+
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES,
338+
),
339+
)
319340
)
320341
)
321342

services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_core.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,6 @@
3333
from fastapi import FastAPI
3434
from pydantic import NonNegativeInt
3535
from servicelib.logging_utils import log_context
36-
from simcore_service_autoscaling.modules.instrumentation import (
37-
get_instrumentation,
38-
has_instrumentation,
39-
)
4036
from types_aiobotocore_ec2.literals import InstanceTypeType
4137

4238
from ..constants import (
@@ -55,6 +51,7 @@
5551
)
5652
from .auto_scaling_mode_base import BaseAutoscaling
5753
from .ec2 import get_ec2_client
54+
from .instrumentation import get_instrumentation, has_instrumentation
5855
from .ssm import get_ssm_client
5956

6057
_logger = logging.getLogger(__name__)
@@ -197,8 +194,9 @@ async def _terminate_instances_with_invalid_pre_pulled_images(
197194
].pre_pulled_instances()
198195

199196
for instance in all_pre_pulled_instances:
197+
pre_pulled_images = load_pre_pulled_images_from_tags(instance.tags)
200198
if (
201-
pre_pulled_images := load_pre_pulled_images_from_tags(instance.tags)
199+
pre_pulled_images is not None
202200
) and pre_pulled_images != ec2_boot_config.pre_pull_images:
203201
_logger.info(
204202
"%s",

services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,11 @@ async def ec2_startup_script(
8080
ec2_boot_specific: EC2InstanceBootSpecific, app_settings: ApplicationSettings
8181
) -> str:
8282
startup_commands = ec2_boot_specific.custom_boot_scripts.copy()
83-
startup_commands.append(await utils_docker.get_docker_swarm_join_bash_command())
83+
startup_commands.append(
84+
await utils_docker.get_docker_swarm_join_bash_command(
85+
join_as_drained=app_settings.AUTOSCALING_DOCKER_JOIN_DRAINED
86+
)
87+
)
8488
if app_settings.AUTOSCALING_REGISTRY: # noqa: SIM102
8589
if pull_image_cmd := utils_docker.get_docker_pull_images_on_start_bash_command(
8690
ec2_boot_specific.pre_pull_images

services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ async def compute_cluster_used_resources(
392392
_DOCKER_SWARM_JOIN_PATTERN = re.compile(_DOCKER_SWARM_JOIN_RE)
393393

394394

395-
async def get_docker_swarm_join_bash_command() -> str:
395+
async def get_docker_swarm_join_bash_command(*, join_as_drained: bool) -> str:
396396
"""this assumes we are on a manager node"""
397397
command = ["docker", "swarm", "join-token", "worker"]
398398
process = await asyncio.create_subprocess_exec(
@@ -409,7 +409,7 @@ async def get_docker_swarm_join_bash_command() -> str:
409409
decoded_stdout = stdout.decode()
410410
if match := re.search(_DOCKER_SWARM_JOIN_PATTERN, decoded_stdout):
411411
capture = match.groupdict()
412-
return f"{capture['command']} --availability=drain {capture['token']} {capture['address']}"
412+
return f"{capture['command']} --availability={'drain' if join_as_drained else 'active'} {capture['token']} {capture['address']}"
413413
msg = f"expected docker '{_DOCKER_SWARM_JOIN_RE}' command not found: received {decoded_stdout}!"
414414
raise RuntimeError(msg)
415415

services/autoscaling/tests/manual/.env-devel

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
AUTOSCALING_DEBUG=true
2+
AUTOSCALING_DRAIN_NODES_WITH_LABELS=False
3+
AUTOSCALING_DOCKER_JOIN_DRAINED=True
4+
AUTOSCALING_WAIT_FOR_CLOUD_INIT_BEFORE_WARM_BUFFER_ACTIVATION=False
25
AUTOSCALING_LOGLEVEL=INFO
3-
AUTOSCALING_TASK_INTERVAL=30
6+
AUTOSCALING_POLL_INTERVAL=10
47
AUTOSCALING_EC2_ACCESS_KEY_ID=XXXXXXXXXX
58
AUTOSCALING_EC2_SECRET_ACCESS_KEY=XXXXXXXXXX
69
AUTOSCALING_EC2_ENDPOINT=null
@@ -13,20 +16,21 @@ EC2_INSTANCES_MACHINES_BUFFER=0
1316
EC2_INSTANCES_MAX_INSTANCES=20
1417
EC2_INSTANCES_TIME_BEFORE_DRAINING="00:00:10"
1518
EC2_INSTANCES_TIME_BEFORE_TERMINATION="00:03:00"
16-
EC2_INSTANCES_ALLOWED_TYPES='{"t2.micro": {"ami_id": "XXXXXXXX", "custom_boot_scripts": ["whoami"], "pre_pull_images": ["ubuntu:latest"]}}'
19+
EC2_INSTANCES_ALLOWED_TYPES={"t2.micro": {"ami_id": "XXXXXXXX", "custom_boot_scripts": ["whoami"], "pre_pull_images": ["ubuntu:latest"]}}
1720
EC2_INSTANCES_ATTACHED_IAM_PROFILE=XXXXXXXXX
1821
EC2_INSTANCES_KEY_NAME=XXXXXXXXXX
1922
EC2_INSTANCES_NAME_PREFIX=testing-osparc-computational-cluster
20-
EC2_INSTANCES_SECURITY_GROUP_IDS="[\"XXXXXXXXXX\"]"
23+
EC2_INSTANCES_SECURITY_GROUP_IDS=["XXXXXXXXXX"]
2124
EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX
22-
EC2_INSTANCES_CUSTOM_TAGS='{"special": "testing"}'
25+
EC2_INSTANCES_CUSTOM_TAGS={"special": "testing"}
2326
EC2_INSTANCES_TIME_BEFORE_DRAINING=00:00:20
2427
EC2_INSTANCES_TIME_BEFORE_TERMINATION=00:01:00
2528
LOG_FORMAT_LOCAL_DEV_ENABLED=True
2629
# define the following to activate dynamic autoscaling
27-
# NODES_MONITORING_NEW_NODES_LABELS="[\"testing.autoscaled-node\"]"
28-
# NODES_MONITORING_NODE_LABELS="[\"testing.monitored-node\"]"
29-
# NODES_MONITORING_SERVICE_LABELS="[\"testing.monitored-service\"]"
30+
# AUTOSCALING_NODES_MONITORING={}
31+
# NODES_MONITORING_NEW_NODES_LABELS=["testing.autoscaled-node"]
32+
# NODES_MONITORING_NODE_LABELS=["testing.monitored-node"]
33+
# NODES_MONITORING_SERVICE_LABELS=["testing.monitored-service"]
3034

3135
# may be activated or not
3236
# RABBIT_HOST=rabbit

services/autoscaling/tests/manual/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,9 @@ make up-devel # this will deploy the autoscaling stack
109109
docker service create \
110110
--name=test-service \
111111
--reserve-cpu=1 \
112-
--reserve-memory=1GiB \
112+
--reserve-memory=512MiB \
113113
--constraint=node.labels.testing.monitored-node==true \
114+
--constraint=node.labels.io.simcore.osparc-services-ready==true \
114115
--label=testing.monitored-service=true \
115116
--container-label=io.simcore.runtime.user-id=99 \
116117
--container-label=io.simcore.runtime.project-id='5054a589-3ba4-46c3-829d-2e3d1a6a043f' \
@@ -120,5 +121,5 @@ docker service create \
120121
--container-label=io.simcore.runtime.swarm-stack-name=thestack \
121122
--container-label=io.simcore.runtime.memory-limit=1GB \
122123
--container-label=io.simcore.runtime.cpu-limit=1 \
123-
redis # will create a redis service reserving 4 CPUs and 1GiB of RAM
124+
redis # will create a redis service reserving 1 CPUs and 512MiB of RAM
124125
```

services/autoscaling/tests/unit/conftest.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,30 @@ def with_labelize_drain_nodes(
147147
)
148148

149149

150+
@pytest.fixture(
151+
params=[
152+
"with_AUTOSCALING_DOCKER_JOIN_DRAINED",
153+
"without_AUTOSCALING_DOCKER_JOIN_DRAINED",
154+
]
155+
)
156+
def with_docker_join_drained(request: pytest.FixtureRequest) -> bool:
157+
return bool(request.param == "with_AUTOSCALING_DOCKER_JOIN_DRAINED")
158+
159+
160+
@pytest.fixture
161+
def app_with_docker_join_drained(
162+
app_environment: EnvVarsDict,
163+
monkeypatch: pytest.MonkeyPatch,
164+
with_docker_join_drained: bool,
165+
) -> EnvVarsDict:
166+
return app_environment | setenvs_from_dict(
167+
monkeypatch,
168+
{
169+
"AUTOSCALING_DOCKER_JOIN_DRAINED": f"{with_docker_join_drained}",
170+
},
171+
)
172+
173+
150174
@pytest.fixture(scope="session")
151175
def fake_ssm_settings() -> SSMSettings:
152176
return SSMSettings(**SSMSettings.Config.schema_extra["examples"][0])

services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def local_dask_scheduler_server_envs(
7070
@pytest.fixture
7171
def minimal_configuration(
7272
with_labelize_drain_nodes: EnvVarsDict,
73+
app_with_docker_join_drained: EnvVarsDict,
7374
docker_swarm: None,
7475
mocked_ec2_server_envs: EnvVarsDict,
7576
mocked_ssm_server_envs: EnvVarsDict,

services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ async def drained_host_node(
181181
@pytest.fixture
182182
def minimal_configuration(
183183
with_labelize_drain_nodes: EnvVarsDict,
184+
app_with_docker_join_drained: EnvVarsDict,
184185
docker_swarm: None,
185186
mocked_ec2_server_envs: EnvVarsDict,
186187
mocked_ssm_server_envs: EnvVarsDict,
@@ -1010,6 +1011,7 @@ async def test_cluster_scaling_up_and_down_against_aws(
10101011
skip_if_external_envfile_dict: None,
10111012
external_ec2_instances_allowed_types: None | dict[str, EC2InstanceBootSpecific],
10121013
with_labelize_drain_nodes: EnvVarsDict,
1014+
app_with_docker_join_drained: EnvVarsDict,
10131015
docker_swarm: None,
10141016
disabled_rabbitmq: None,
10151017
disable_dynamic_service_background_task: None,

0 commit comments

Comments
 (0)