Skip to content

Commit acd677a

Browse files
GitHKAndrei Neagu
and
Andrei Neagu
authored
♻️ Redirecting inputs retrieval via dynamic-scheduler ⚠️ (#6908)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent 75aed81 commit acd677a

File tree

15 files changed

+207
-78
lines changed

15 files changed

+207
-78
lines changed

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import logging
22
from typing import Final
33

4-
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
4+
from models_library.api_schemas_directorv2.dynamic_services import (
5+
DynamicServiceGet,
6+
RetrieveDataOutEnveloped,
7+
)
58
from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE
69
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
710
DynamicServiceStart,
@@ -11,6 +14,7 @@
1114
from models_library.projects import ProjectID
1215
from models_library.projects_nodes_io import NodeID
1316
from models_library.rabbitmq_basic_types import RPCMethodName
17+
from models_library.services_types import ServicePortKey
1418
from models_library.users import UserID
1519
from pydantic import NonNegativeInt, TypeAdapter
1620
from servicelib.logging_utils import log_decorator
@@ -95,6 +99,25 @@ async def stop_dynamic_service(
9599
assert result is None # nosec
96100

97101

102+
@log_decorator(_logger, level=logging.DEBUG)
103+
async def retrieve_inputs(
104+
rabbitmq_rpc_client: RabbitMQRPCClient,
105+
*,
106+
node_id: NodeID,
107+
port_keys: list[ServicePortKey],
108+
timeout_s: NonNegativeInt,
109+
) -> RetrieveDataOutEnveloped:
110+
result = await rabbitmq_rpc_client.request(
111+
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
112+
_RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_inputs"),
113+
node_id=node_id,
114+
port_keys=port_keys,
115+
timeout_s=timeout_s,
116+
)
117+
assert isinstance(result, RetrieveDataOutEnveloped) # nosec
118+
return result
119+
120+
98121
@log_decorator(_logger, level=logging.DEBUG)
99122
async def update_projects_networks(
100123
rabbitmq_rpc_client: RabbitMQRPCClient, *, project_id: ProjectID

services/director/src/simcore_service_director/producer.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,7 +1078,7 @@ async def _get_node_details(
10781078

10791079

10801080
async def get_services_details(
1081-
app: FastAPI, user_id: str | None, study_id: str | None
1081+
app: FastAPI, user_id: str | None, project_id: str | None
10821082
) -> list[dict]:
10831083
app_settings = get_application_settings(app)
10841084
async with docker_utils.docker_client() as client: # pylint: disable=not-async-context-manager
@@ -1091,9 +1091,10 @@ async def get_services_details(
10911091
filters.append(
10921092
f"{_to_simcore_runtime_docker_label_key('user_id')}=" + user_id
10931093
)
1094-
if study_id:
1094+
if project_id:
10951095
filters.append(
1096-
f"{_to_simcore_runtime_docker_label_key('project_id')}=" + study_id
1096+
f"{_to_simcore_runtime_docker_label_key('project_id')}="
1097+
+ project_id
10971098
)
10981099
list_running_services = await client.services.list(
10991100
filters={"label": filters}
@@ -1104,7 +1105,7 @@ async def get_services_details(
11041105
for service in list_running_services
11051106
]
11061107
except aiodocker.DockerError as err:
1107-
msg = f"Error while accessing container for {user_id=}, {study_id=}"
1108+
msg = f"Error while accessing container for {user_id=}, {project_id=}"
11081109
raise GenericDockerError(err=msg) from err
11091110

11101111

services/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,7 @@ services:
568568
DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING}
569569
DYNAMIC_SCHEDULER_TRACING: ${DYNAMIC_SCHEDULER_TRACING}
570570
DYNAMIC_SCHEDULER_UI_STORAGE_SECRET: ${DYNAMIC_SCHEDULER_UI_STORAGE_SECRET}
571+
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: ${DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT}
571572
TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}
572573
TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT}
573574
static-webserver:

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
from fastapi import FastAPI
2-
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
2+
from models_library.api_schemas_directorv2.dynamic_services import (
3+
DynamicServiceGet,
4+
RetrieveDataOutEnveloped,
5+
)
36
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
47
DynamicServiceStart,
58
DynamicServiceStop,
69
)
710
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
811
from models_library.projects import ProjectID
912
from models_library.projects_nodes_io import NodeID
13+
from models_library.services_types import ServicePortKey
1014
from models_library.users import UserID
1115
from servicelib.rabbitmq import RPCRouter
1216
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import (
@@ -58,6 +62,15 @@ async def stop_dynamic_service(
5862
)
5963

6064

65+
@router.expose()
66+
async def retrieve_inputs(
67+
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
68+
) -> RetrieveDataOutEnveloped:
69+
return await scheduler_interface.retrieve_inputs(
70+
app, node_id=node_id, port_keys=port_keys
71+
)
72+
73+
6174
@router.expose()
6275
async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
6376
await scheduler_interface.update_projects_networks(app, project_id=project_id)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,26 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
6262

6363
DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: datetime.timedelta = Field(
6464
default=datetime.timedelta(minutes=60),
65+
validation_alias=AliasChoices(
66+
"DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT",
67+
"DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT",
68+
),
6569
description=(
6670
"Time to wait before timing out when stopping a dynamic service. "
6771
"Since services require data to be stopped, this operation is timed out after 1 hour"
6872
),
6973
)
7074

75+
DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field(
76+
default=datetime.timedelta(minutes=60),
77+
description=(
78+
"When dynamic services upload and download data from storage, "
79+
"sometimes very big payloads are involved. In order to handle "
80+
"such payloads it is required to have long timeouts which "
81+
"allow the service to finish the operation."
82+
),
83+
)
84+
7185
DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field(
7286
default=False,
7387
description=(

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@
22
from typing import Any
33

44
from fastapi import FastAPI, status
5-
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
5+
from models_library.api_schemas_directorv2.dynamic_services import (
6+
DynamicServiceGet,
7+
RetrieveDataOutEnveloped,
8+
)
69
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
710
DynamicServiceStart,
811
)
912
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
1013
from models_library.projects import ProjectID
1114
from models_library.projects_nodes_io import NodeID
15+
from models_library.services_types import ServicePortKey
1216
from models_library.users import UserID
1317
from pydantic import TypeAdapter
1418
from servicelib.fastapi.app_state import SingletonInAppStateMixin
@@ -75,7 +79,7 @@ async def stop_dynamic_service(
7579
node_id: NodeID,
7680
simcore_user_agent: str,
7781
save_state: bool,
78-
timeout: datetime.timedelta, # noqa: ASYNC109
82+
timeout: datetime.timedelta # noqa: ASYNC109
7983
) -> None:
8084
try:
8185
await self.thin_client.delete_dynamic_service(
@@ -100,6 +104,19 @@ async def stop_dynamic_service(
100104

101105
raise
102106

107+
async def retrieve_inputs(
108+
self,
109+
*,
110+
node_id: NodeID,
111+
port_keys: list[ServicePortKey],
112+
timeout: datetime.timedelta # noqa: ASYNC109
113+
) -> RetrieveDataOutEnveloped:
114+
response = await self.thin_client.dynamic_service_retrieve(
115+
node_id=node_id, port_keys=port_keys, timeout=timeout
116+
)
117+
dict_response: dict[str, Any] = response.json()
118+
return TypeAdapter(RetrieveDataOutEnveloped).validate_python(dict_response)
119+
103120
async def list_tracked_dynamic_services(
104121
self, *, user_id: UserID | None = None, project_id: ProjectID | None = None
105122
) -> list[DynamicServiceGet]:

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from models_library.projects import ProjectID
1212
from models_library.projects_nodes_io import NodeID
1313
from models_library.services_resources import ServiceResourcesDictHelpers
14+
from models_library.services_types import ServicePortKey
1415
from models_library.users import UserID
1516
from servicelib.common_headers import (
1617
X_DYNAMIC_SIDECAR_REQUEST_DNS,
@@ -91,7 +92,7 @@ async def delete_dynamic_service(
9192
node_id: NodeID,
9293
simcore_user_agent: str,
9394
save_state: bool,
94-
timeout: datetime.timedelta,
95+
timeout: datetime.timedelta, # noqa: ASYNC109
9596
) -> Response:
9697
@retry_on_errors(total_retry_timeout_overwrite=timeout.total_seconds())
9798
@expect_status(status.HTTP_204_NO_CONTENT)
@@ -112,6 +113,22 @@ async def _(
112113

113114
return await _(self)
114115

116+
@retry_on_errors()
117+
@expect_status(status.HTTP_200_OK)
118+
async def dynamic_service_retrieve(
119+
self,
120+
*,
121+
node_id: NodeID,
122+
port_keys: list[ServicePortKey],
123+
timeout: datetime.timedelta, # noqa: ASYNC109
124+
) -> Response:
125+
post_data = {"port_keys": port_keys}
126+
return await self.client.post(
127+
f"/dynamic_services/{node_id}:retrieve",
128+
content=json_dumps(post_data),
129+
timeout=timeout.total_seconds(),
130+
)
131+
115132
@retry_on_errors()
116133
@expect_status(status.HTTP_200_OK)
117134
async def get_dynamic_services(

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
from fastapi import FastAPI
2-
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
2+
from models_library.api_schemas_directorv2.dynamic_services import (
3+
DynamicServiceGet,
4+
RetrieveDataOutEnveloped,
5+
)
36
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
47
DynamicServiceStart,
58
DynamicServiceStop,
69
)
710
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
811
from models_library.projects import ProjectID
912
from models_library.projects_nodes_io import NodeID
13+
from models_library.services_types import ServicePortKey
1014
from models_library.users import UserID
1115

1216
from ..core.settings import ApplicationSettings
@@ -75,6 +79,21 @@ async def stop_dynamic_service(
7579
await set_request_as_stopped(app, dynamic_service_stop)
7680

7781

82+
async def retrieve_inputs(
83+
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
84+
) -> RetrieveDataOutEnveloped:
85+
settings: ApplicationSettings = app.state.settings
86+
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
87+
raise NotImplementedError
88+
89+
director_v2_client = DirectorV2Client.get_from_app_state(app)
90+
return await director_v2_client.retrieve_inputs(
91+
node_id=node_id,
92+
port_keys=port_keys,
93+
timeout=settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT,
94+
)
95+
96+
7897
async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
7998
settings: ApplicationSettings = app.state.settings
8099
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:

services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
from faker import Faker
1010
from fastapi import FastAPI, status
1111
from fastapi.encoders import jsonable_encoder
12-
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
12+
from models_library.api_schemas_directorv2.dynamic_services import (
13+
DynamicServiceGet,
14+
RetrieveDataOutEnveloped,
15+
)
1316
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
1417
DynamicServiceStart,
1518
DynamicServiceStop,
@@ -55,14 +58,14 @@ def node_not_found(faker: Faker) -> NodeID:
5558
@pytest.fixture
5659
def service_status_new_style() -> DynamicServiceGet:
5760
return TypeAdapter(DynamicServiceGet).validate_python(
58-
DynamicServiceGet.model_config["json_schema_extra"]["examples"][1]
61+
DynamicServiceGet.model_json_schema()["examples"][1]
5962
)
6063

6164

6265
@pytest.fixture
6366
def service_status_legacy() -> NodeGet:
6467
return TypeAdapter(NodeGet).validate_python(
65-
NodeGet.model_config["json_schema_extra"]["examples"][1]
68+
NodeGet.model_json_schema()["examples"][1]
6669
)
6770

6871

@@ -112,9 +115,7 @@ def mock_director_v2_service_state(
112115
mock.get("/dynamic_services").respond(
113116
status.HTTP_200_OK,
114117
text=json.dumps(
115-
jsonable_encoder(
116-
DynamicServiceGet.model_config["json_schema_extra"]["examples"]
117-
)
118+
jsonable_encoder(DynamicServiceGet.model_json_schema()["examples"])
118119
),
119120
)
120121

@@ -193,7 +194,7 @@ async def test_list_tracked_dynamic_services(rpc_client: RabbitMQRPCClient):
193194
assert len(results) == 2
194195
assert results == [
195196
TypeAdapter(DynamicServiceGet).validate_python(x)
196-
for x in DynamicServiceGet.model_config["json_schema_extra"]["examples"]
197+
for x in DynamicServiceGet.model_json_schema()["examples"]
197198
]
198199

199200

@@ -223,7 +224,7 @@ async def test_get_state(
223224
def dynamic_service_start() -> DynamicServiceStart:
224225
# one for legacy and one for new style?
225226
return TypeAdapter(DynamicServiceStart).validate_python(
226-
DynamicServiceStart.model_config["json_schema_extra"]["example"]
227+
DynamicServiceStart.model_json_schema()["example"]
227228
)
228229

229230

@@ -492,6 +493,41 @@ async def test_stop_dynamic_service_serializes_generic_errors(
492493
)
493494

494495

496+
@pytest.fixture
497+
def mock_director_v2_service_retrieve_inputs(node_id: NodeID) -> Iterator[None]:
498+
with respx.mock(
499+
base_url="http://director-v2:8000/v2",
500+
assert_all_called=False,
501+
assert_all_mocked=True, # IMPORTANT: KEEP always True!
502+
) as mock:
503+
request_ok = mock.post(f"/dynamic_services/{node_id}:retrieve")
504+
505+
request_ok.respond(
506+
status.HTTP_200_OK,
507+
text=TypeAdapter(RetrieveDataOutEnveloped)
508+
.validate_python(
509+
RetrieveDataOutEnveloped.model_json_schema()["examples"][0]
510+
)
511+
.model_dump_json(),
512+
)
513+
514+
yield None
515+
516+
517+
async def test_retrieve_inputs(
518+
mock_director_v2_service_retrieve_inputs: None,
519+
rpc_client: RabbitMQRPCClient,
520+
node_id: NodeID,
521+
):
522+
results = await services.retrieve_inputs(
523+
rpc_client, node_id=node_id, port_keys=[], timeout_s=10
524+
)
525+
assert (
526+
results.model_dump(mode="python")
527+
== RetrieveDataOutEnveloped.model_json_schema()["examples"][0]
528+
)
529+
530+
495531
@pytest.fixture
496532
def mock_director_v2_update_projects_networks(project_id: ProjectID) -> Iterator[None]:
497533
with respx.mock(

0 commit comments

Comments
 (0)