Skip to content

Commit c6246c3

Browse files
GitHKAndrei Neaguodeimaiz
authored
♻️✨adding nodeports support to dynamic-sidecar (ITISFoundation#2509)
* there seems to be an issue elsewere * fixed issue when booting * enabling debug boot * injecting pahts to all containers * all declared volumes are removed as well * fixing dy-sidecar unit tests * dv2 added volume creation and removal for dy sidecar * forarding pgsettings to dybamic-sidecar * setting refactoring * volumes are properly mounted on the spec * adding additional setup elements * starting direcotry watcher * adding module to properly manage volumes * fixed some tests * created new directory * using correct password * better error message * codestyle * extended tests for the API * refactor renames * attached nodeports pulling * adding permissin changes * semplified interface for saving and restoring the service state * using new api interface * is no longer set to fail * refactored * raising error now * fixing tests * added data_manager integration * added tests for mounted_fs * added missing env var and refactor * fixed API call to save and restore state * pylint * fixing tests * fix mounted_fs and volumes generated * refactored volume resolve and mounting * added missing env paths * fixed an issue with initialization * fixed issues with permission changing * added more comments * rename refactor * changed debug message * minor refactor * added utility to run on threadppols * using not blocking calls * adds API for nodeports push and pull * fix codestyle * updated opeanpi specs for new entrypoint * updated webserver ospeanpi spec * always return size in bytes of the trasnferred data * updated openapi.json * exposed retrieve to frontend * ensures clean shutdown in call cases * split nodeports from state saving * fixing codestyle * refactor * codeclimate * codestyle * fixed version parsing * Feature/dynamic retrieve (#8) * minor * refactoring * bumped version * bumped service version * composing correct url * version bumped * revert change * computeServiceV2RetrieveUrl (#9) * computeServiceV2RetrieveUrl * minor * updated openaipi specs * updated requirements * added new webserver entrypoint * added retrieve api to director-v2 * fixed director-v2 tests * fix bug * inverted oder * sending in the correct format * faster booting of dy-sidecar * removed todos * update policy timeout * fixed depenencies after merge * using appropriate images for testing * pylint * storage is now connected for this test * fix status url * fixed port forwarding issues * fixed broken test * removed dangerous code * updated openapi.json * minor refactor * further refactoring * refactor * further refactoring * shared functions refactored * refactor * added save_to option to datamanager.pull * refactor * added dependencies for testing * aded nodeports integration test * removed unsued plugin * trying to fix test in CI * refactor API interface * updated API inteface after change * moving to fixure * added more information * using nodeports execption * rename function * mocked api removed, no requests are fowarded here * refactor * renaming function * updated docstring * updated description * update comment * fix comment * updated _meta.py * revert to old version * adding watchdog * added missing requirement * added missing requirement * removed test uncertanty * enhanced test slightly * fixed broken endpoint * adding some debug messages * updated openapi specification * expadning fixture to support multiple versions * added better description * typing * removed comment * added timeouts * using ByteSize * renamed * renmed save_state to can_save * reverting change * more renaming * renaming restore * renamed endpoints * updated openapi.json * @sanderegg fixing in all Dockerfiles * migrated to dataclass * better debug messages * fixed docstring * [skip ci] no ci when pushing * making it easier to debug * trying to give nodeports more time * put back missing parameter * putting back option * restored missing API * fixed typing * replaced with 3.8 version * typo * using statues from starlette * making it run in development * fixed import issue * only log keys * aded status retrying * ading retries on status and retrieve * refactor * renaming and refactoring * renamed again * removing dependency bump * reverting * added minio * added extra comments * added more assersions * added more debug prints * adding more infomation in logs * more explicit debug message * added sleep to make sure data is available * adding more information to make it easier to debug * tring to trigger CI again * reafactoring * refactor * used for testing, leaving it in * added new container logs dump for simpler debugging * dumping logs twice * adding logs debug * updated return type * logs from containers on fail * using version with more debug options * bumped expected version in workbench * better error message and explaniation * even more debug information * put back original check * added separators * removed space * trigger CI again * bumping timeout * prining data for all containers * fix issue * bumping dy_static_file_server version * updated versions in workbench * fixing python-lining CI * reverted changes * debug help * addresses an issue with stopping * fix error * replaced logging. with logger. * adding more debug messages * adding healthcheck to traefik * enabling logs from traefik * adding more logging * adding more logs * raising attempts 15 minute timeout * adding better debug messages * changed the dafults to something else * added logging when data is not found to be upoaded * updating logs on function * fixes a race condition when saving ports in parallel * reverting changes * changed endpoint to make it more readable * removed client-sdk references * migrated nodeports implementaiton * archiving folders is now done in parallel * fix import issue * some missing types * typing * fix pylint * checking how much the test lasts * fixing timeout for test * added faster bailout * making bigger case start first * adding debug flags to detect hangs * reduced timeout to a reasonable value * added port forward for proxy to test * making tests more reliable * extended logging * pylint Co-authored-by: Andrei Neagu <[email protected]> Co-authored-by: Odei Maiz <[email protected]>
1 parent 5ef4cd9 commit c6246c3

File tree

96 files changed

+5236
-1319
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+5236
-1319
lines changed

.github/workflows/ci-testing-deploy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1919,7 +1919,7 @@ jobs:
19191919
path: codeclimate.${{ github.job }}_coverage.json
19201920

19211921
integration-test-director-v2-02:
1922-
timeout-minutes: 20 # if this timeout gets too small, then split the tests
1922+
timeout-minutes: 30 # if this timeout gets too small, then split the tests
19231923
name: "[int] director-v2 02"
19241924
needs: [build-test-images]
19251925
runs-on: ${{ matrix.os }}

api/specs/webserver/openapi-projects.yaml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,55 @@ paths:
395395
default:
396396
$ref: "./openapi.yaml#/components/responses/DefaultErrorResponse"
397397

398+
/projects/{project_id}/nodes/{node_id}/retrieve:
399+
parameters:
400+
- name: project_id
401+
in: path
402+
required: true
403+
schema:
404+
type: string
405+
- name: node_id
406+
in: path
407+
required: true
408+
schema:
409+
type: string
410+
411+
post:
412+
tags:
413+
- project
414+
description: Triggers service retrieve
415+
operationId: post_retrieve
416+
requestBody:
417+
required: true
418+
content:
419+
application/json:
420+
schema:
421+
type: object
422+
properties:
423+
port_keys:
424+
description: list of por keys to be retrieved
425+
type: array
426+
items:
427+
type: string
428+
responses:
429+
"200":
430+
description: Returns the amount of transferred bytes when pulling data via nodeports
431+
content:
432+
application/json:
433+
schema:
434+
type: object
435+
properties:
436+
data:
437+
type: object
438+
description: response payload
439+
properties:
440+
size_bytes:
441+
type: integer
442+
description: amount of transferred bytes
443+
444+
default:
445+
$ref: "#/components/responses/DefaultErrorResponse"
446+
398447
/projects/{study_uuid}/tags/{tag_id}:
399448
parameters:
400449
- name: tag_id

api/specs/webserver/openapi.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ paths:
200200
/projects/{project_id}/nodes/{node_id}:
201201
$ref: "./openapi-projects.yaml#/paths/~1projects~1{project_id}~1nodes~1{node_id}"
202202

203+
/projects/{project_id}/nodes/{node_id}:retrieve:
204+
$ref: "./openapi-projects.yaml#/paths/~1projects~1{project_id}~1nodes~1{node_id}~1retrieve"
205+
203206
/nodes/{nodeInstanceUUID}/outputUi/{outputKey}:
204207
$ref: "./openapi-node-v0.0.1.yaml#/paths/~1nodes~1{nodeInstanceUUID}~1outputUi~1{outputKey}"
205208

ci/github/integration-testing/director-v2.bash

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ test() {
2323
echo "testing in services/director-v2/tests/integration/$1"
2424
pytest --cov=simcore_service_director_v2 --durations=10 --cov-append \
2525
--color=yes --cov-report=term-missing --cov-report=xml --cov-config=.coveragerc \
26+
-vvv -s --log-cli-level=DEBUG \
2627
-v -m "not travis" "services/director-v2/tests/integration/$1" --log-level=DEBUG
2728
}
2829

packages/pytest-simcore/src/pytest_simcore/docker_registry.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -230,52 +230,54 @@ def jupyter_service(docker_registry: str, node_meta_schema: Dict) -> Dict[str, s
230230
)
231231

232232

233-
DY_STATIC_FILE_SERVER_VERSION = "1.0.5"
233+
@pytest.fixture(scope="session", params=["2.0.2"])
234+
def dy_static_file_server_version(request):
235+
return request.param
234236

235237

236238
@pytest.fixture(scope="session")
237239
def dy_static_file_server_service(
238-
docker_registry: str, node_meta_schema: Dict
240+
docker_registry: str, node_meta_schema: Dict, dy_static_file_server_version: str
239241
) -> Dict[str, str]:
240242
"""
241243
Adds the below service in docker registry
242244
itisfoundation/dy-static-file-server
243245
"""
244246
return _pull_push_service(
245247
"itisfoundation/dy-static-file-server",
246-
DY_STATIC_FILE_SERVER_VERSION,
248+
dy_static_file_server_version,
247249
docker_registry,
248250
node_meta_schema,
249251
)
250252

251253

252254
@pytest.fixture(scope="session")
253255
def dy_static_file_server_dynamic_sidecar_service(
254-
docker_registry: str, node_meta_schema: Dict
256+
docker_registry: str, node_meta_schema: Dict, dy_static_file_server_version: str
255257
) -> Dict[str, str]:
256258
"""
257259
Adds the below service in docker registry
258260
itisfoundation/dy-static-file-server-dynamic-sidecar
259261
"""
260262
return _pull_push_service(
261263
"itisfoundation/dy-static-file-server-dynamic-sidecar",
262-
DY_STATIC_FILE_SERVER_VERSION,
264+
dy_static_file_server_version,
263265
docker_registry,
264266
node_meta_schema,
265267
)
266268

267269

268270
@pytest.fixture(scope="session")
269271
def dy_static_file_server_dynamic_sidecar_compose_spec_service(
270-
docker_registry: str, node_meta_schema: Dict
272+
docker_registry: str, node_meta_schema: Dict, dy_static_file_server_version: str
271273
) -> Dict[str, str]:
272274
"""
273275
Adds the below service in docker registry
274276
itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec
275277
"""
276278
return _pull_push_service(
277279
"itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec",
278-
DY_STATIC_FILE_SERVER_VERSION,
280+
dy_static_file_server_version,
279281
docker_registry,
280282
node_meta_schema,
281283
)

packages/service-library/src/servicelib/async_utils.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,25 @@
22
import logging
33
from collections import deque
44
from functools import wraps
5-
from typing import Dict, List, Optional
5+
from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Optional
66

77
import attr
88

99
logger = logging.getLogger(__name__)
1010

11+
if TYPE_CHECKING:
12+
Queue = asyncio.Queue
13+
else:
14+
15+
class FakeGenericMeta(type):
16+
def __getitem__(self, item):
17+
return self
18+
19+
class Queue(
20+
asyncio.Queue, metaclass=FakeGenericMeta
21+
): # pylint: disable=function-redefined
22+
pass
23+
1124

1225
@attr.s(auto_attribs=True)
1326
class Context:
@@ -30,7 +43,9 @@ async def stop_sequential_workers() -> None:
3043
logger.info("All run_sequentially_in_context pending workers stopped")
3144

3245

33-
def run_sequentially_in_context(target_args: List[str] = None):
46+
def run_sequentially_in_context(
47+
target_args: List[str] = None,
48+
) -> Callable[[Any], Any]:
3449
"""All request to function with same calling context will be run sequentially.
3550
3651
Example:
@@ -68,15 +83,17 @@ async def func(param1, param2, param3):
6883
"""
6984
target_args = [] if target_args is None else target_args
7085

71-
def internal(decorated_function):
72-
def get_context(args, kwargs: Dict) -> Context:
86+
def internal(
87+
decorated_function: Callable[[Any], Optional[Any]]
88+
) -> Callable[[Any], Optional[Any]]:
89+
def get_context(args: Any, kwargs: Dict[Any, Any]) -> Context:
7390
arg_names = decorated_function.__code__.co_varnames[
7491
: decorated_function.__code__.co_argcount
7592
]
7693
search_args = dict(zip(arg_names, args))
7794
search_args.update(kwargs)
7895

79-
key_parts = deque()
96+
key_parts: Deque[str] = deque()
8097
for arg in target_args:
8198
sub_args = arg.split(".")
8299
main_arg = sub_args[0]
@@ -108,13 +125,13 @@ def get_context(args, kwargs: Dict) -> Context:
108125
return _sequential_jobs_contexts[key]
109126

110127
@wraps(decorated_function)
111-
async def wrapper(*args, **kwargs):
128+
async def wrapper(*args: Any, **kwargs: Any) -> Any:
112129
context: Context = get_context(args, kwargs)
113130

114131
if not context.initialized:
115132
context.initialized = True
116133

117-
async def worker(in_q: asyncio.Queue, out_q: asyncio.Queue):
134+
async def worker(in_q: Queue, out_q: Queue) -> None:
118135
while True:
119136
awaitable = await in_q.get()
120137
in_q.task_done()
@@ -137,7 +154,7 @@ async def worker(in_q: asyncio.Queue, out_q: asyncio.Queue):
137154
worker(context.in_queue, context.out_queue)
138155
)
139156

140-
await context.in_queue.put(decorated_function(*args, **kwargs))
157+
await context.in_queue.put(decorated_function(*args, **kwargs)) # type: ignore
141158

142159
wrapped_result = await context.out_queue.get()
143160
if isinstance(wrapped_result, Exception):

packages/service-library/src/servicelib/pools.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import asyncio
12
from concurrent.futures import ProcessPoolExecutor
23
from contextlib import contextmanager
4+
from typing import Any, Callable
35

46
# only gets created on use and is guaranteed to be the s
57
# ame for the entire lifetime of the application
@@ -35,3 +37,10 @@ def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
3537
# FIXME: uncomment below line when the issue is fixed
3638
# executor.shutdown(wait=False)
3739
pass
40+
41+
42+
async def async_on_threadpool(callable_function: Callable, *args: Any) -> Any:
43+
"""Ensures blocking operation runs on shared thread pool"""
44+
return await asyncio.get_event_loop().run_in_executor(
45+
None, callable_function, *args
46+
)

packages/service-library/src/servicelib/utils.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import asyncio
88
import logging
99
import os
10-
1110
from pathlib import Path
1211
from typing import Any, Awaitable, Coroutine, List, Optional, Union
1312

@@ -79,8 +78,11 @@ def log_exception_callback(fut: asyncio.Future):
7978

8079
# // tasks
8180
async def logged_gather(
82-
*tasks, reraise: bool = True, log: logging.Logger = logger, max_concurrency: int = 0
83-
) -> List[Any]:
81+
*tasks: Awaitable[Any],
82+
reraise: bool = True,
83+
log: logging.Logger = logger,
84+
max_concurrency: int = 0,
85+
) -> List[Optional[Any]]:
8486
"""
8587
Thin wrapper around asyncio.gather that allows excuting ALL tasks concurently until the end
8688
even if any of them fail. Finally, all errors are logged and the first raised (if reraise=True)
@@ -91,18 +93,15 @@ async def logged_gather(
9193
use directly asyncio.gather(*tasks, return_exceptions=True).
9294
9395
:param reraise: reraises first exception (in order the tasks were passed) concurrent tasks, defaults to True
94-
:type reraise: bool, optional
9596
:param log: passing the logger gives a chance to identify the origin of the gather call, defaults to current submodule's logger
96-
:type log: logging.Logger, optional
9797
:return: list of tasks results and errors e.g. [1, 2, ValueError("task3 went wrong"), 33, "foo"]
98-
:rtype: List[Any]
9998
"""
10099

101100
wrapped_tasks = tasks
102101
if max_concurrency > 0:
103102
semaphore = asyncio.Semaphore(max_concurrency)
104103

105-
async def sem_task(task):
104+
async def sem_task(task: Awaitable[Any]) -> Any:
106105
async with semaphore:
107106
return await task
108107

packages/service-library/tests/test_pools.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
from asyncio import BaseEventLoop
22
from concurrent.futures import ProcessPoolExecutor
33

4-
5-
from servicelib.pools import non_blocking_process_pool_executor
4+
from servicelib.pools import async_on_threadpool, non_blocking_process_pool_executor
65

76

87
def return_int_one() -> int:
@@ -32,3 +31,7 @@ async def test_different_pool_instances() -> None:
3231
max_workers=1
3332
) as first, non_blocking_process_pool_executor() as second:
3433
assert first != second
34+
35+
36+
async def test_run_on_thread_pool() -> None:
37+
assert await async_on_threadpool(return_int_one) == 1

packages/simcore-sdk/requirements/_test.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pytest-mock
1818
pytest-runner
1919
pytest-sugar
2020
pytest-xdist
21+
pytest-lazy-fixture
2122

2223
# mockups/fixtures
2324
alembic

packages/simcore-sdk/requirements/_test.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ pytest==6.2.5
131131
# pytest-forked
132132
# pytest-icdiff
133133
# pytest-instafail
134+
# pytest-lazy-fixture
134135
# pytest-mock
135136
# pytest-sugar
136137
# pytest-xdist
@@ -144,6 +145,8 @@ pytest-icdiff==0.5
144145
# via -r requirements/_test.in
145146
pytest-instafail==0.4.2
146147
# via -r requirements/_test.in
148+
pytest-lazy-fixture==0.6.3
149+
# via -r requirements/_test.in
147150
pytest-mock==3.6.1
148151
# via -r requirements/_test.in
149152
pytest-runner==5.3.1

packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,36 +66,50 @@ async def push(
6666
return await _push_file(user_id, project_id, node_uuid, archive_file_path, None)
6767

6868

69-
async def _pull_file(user_id: int, project_id: str, node_uuid: str, file_path: Path):
69+
async def _pull_file(
70+
user_id: int,
71+
project_id: str,
72+
node_uuid: str,
73+
file_path: Path,
74+
save_to: Optional[Path] = None,
75+
):
76+
destination_path = file_path if save_to is None else save_to
7077
s3_object = _create_s3_object(project_id, node_uuid, file_path)
7178
log.info("pulling data from %s to %s...", s3_object, file_path)
7279
downloaded_file = await filemanager.download_file_from_s3(
7380
user_id=user_id,
7481
store_id="0",
7582
s3_object=s3_object,
76-
local_folder=file_path.parent,
83+
local_folder=destination_path.parent,
7784
)
78-
if downloaded_file != file_path:
79-
if file_path.exists():
80-
file_path.unlink()
81-
move(f"{downloaded_file}", file_path)
82-
log.info("%s successfuly pulled", file_path)
85+
if downloaded_file != destination_path:
86+
destination_path.unlink(missing_ok=True)
87+
move(f"{downloaded_file}", destination_path)
88+
log.info("%s successfuly pulled", destination_path)
8389

8490

8591
def _get_archive_name(path: Path) -> str:
8692
return f"{path.stem}.zip"
8793

8894

89-
async def pull(user_id: int, project_id: str, node_uuid: str, file_or_folder: Path):
95+
async def pull(
96+
user_id: int,
97+
project_id: str,
98+
node_uuid: str,
99+
file_or_folder: Path,
100+
save_to: Optional[Path] = None,
101+
):
90102
if file_or_folder.is_file():
91-
return await _pull_file(user_id, project_id, node_uuid, file_or_folder)
103+
return await _pull_file(user_id, project_id, node_uuid, file_or_folder, save_to)
92104
# we have a folder, so we need somewhere to extract it to
93105
with TemporaryDirectory() as tmp_dir_name:
94106
archive_file = Path(tmp_dir_name) / _get_archive_name(file_or_folder)
95107
await _pull_file(user_id, project_id, node_uuid, archive_file)
96108
log.info("extracting data from %s", archive_file)
109+
110+
destination_folder = file_or_folder if save_to is None else save_to
97111
await unarchive_dir(
98-
archive_to_extract=archive_file, destination_folder=file_or_folder
112+
archive_to_extract=archive_file, destination_folder=destination_folder
99113
)
100114
log.info("extraction completed")
101115

0 commit comments

Comments
 (0)