Skip to content

♻️✨adding nodeports support to dynamic-sidecar #2509

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 277 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
277 commits
Select commit Hold shift + click to select a range
bae45d3
there seems to be an issue elsewere
Sep 10, 2021
c099f60
fixed issue when booting
Sep 10, 2021
28ce8dd
enabling debug boot
Sep 10, 2021
ac37f2b
injecting pahts to all containers
Sep 10, 2021
c2a7264
all declared volumes are removed as well
Sep 14, 2021
165172a
fixing dy-sidecar unit tests
Sep 14, 2021
f7a66d8
dv2 added volume creation and removal for dy sidecar
Sep 14, 2021
c9b4815
forarding pgsettings to dybamic-sidecar
Sep 14, 2021
d301d6e
setting refactoring
Sep 14, 2021
840a601
volumes are properly mounted on the spec
Sep 14, 2021
d530d2b
adding additional setup elements
Sep 14, 2021
2c79d1d
starting direcotry watcher
Sep 14, 2021
2dbbd56
adding module to properly manage volumes
Sep 14, 2021
3657390
fixed some tests
Sep 14, 2021
c81650d
created new directory
Sep 14, 2021
63c5239
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Sep 14, 2021
40971e3
using correct password
Sep 15, 2021
94132ec
better error message
Sep 15, 2021
fadeb23
codestyle
Sep 15, 2021
5cc7fe6
extended tests for the API
Sep 15, 2021
792e969
refactor renames
Sep 15, 2021
4ff580a
attached nodeports pulling
Sep 15, 2021
cde9880
adding permissin changes
Sep 15, 2021
98ba781
semplified interface for saving and restoring the service state
Sep 15, 2021
4f50f4f
using new api interface
Sep 15, 2021
3e53aca
is no longer set to fail
Sep 15, 2021
732119c
refactored
Sep 15, 2021
4648a15
raising error now
Sep 15, 2021
6947fcf
fixing tests
Sep 16, 2021
47ead54
added data_manager integration
Sep 16, 2021
899a7f7
added tests for mounted_fs
Sep 16, 2021
749e30b
added missing env var and refactor
Sep 16, 2021
7f1f337
fixed API call to save and restore state
Sep 16, 2021
1447f24
pylint
Sep 16, 2021
99cf768
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Sep 16, 2021
cd323cb
fixing tests
Sep 16, 2021
0d3038d
fix mounted_fs and volumes generated
Sep 16, 2021
44d0021
refactored volume resolve and mounting
Sep 16, 2021
3b042a2
added missing env paths
Sep 17, 2021
fcbc481
fixed an issue with initialization
Sep 17, 2021
d4011f7
fixed issues with permission changing
Sep 17, 2021
e2ae770
added more comments
Sep 17, 2021
061abd7
rename refactor
Sep 17, 2021
07b42b1
changed debug message
Sep 17, 2021
5a549a0
minor refactor
Sep 17, 2021
69d6ec3
added utility to run on threadppols
Sep 17, 2021
28fcb2c
using not blocking calls
Sep 17, 2021
b49fead
adds API for nodeports push and pull
Sep 20, 2021
79a6b1f
fix codestyle
Sep 20, 2021
f8a5443
updated opeanpi specs for new entrypoint
Sep 20, 2021
25d1d2b
updated webserver ospeanpi spec
Sep 20, 2021
63e2e02
always return size in bytes of the trasnferred data
Sep 20, 2021
82fc27d
updated openapi.json
Sep 20, 2021
f37c6ed
exposed retrieve to frontend
Sep 20, 2021
eb1c914
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Sep 20, 2021
ba362be
ensures clean shutdown in call cases
Sep 20, 2021
428dddc
split nodeports from state saving
Sep 20, 2021
47f61f2
fixing codestyle
Sep 20, 2021
8205e87
refactor
Sep 20, 2021
42f4ea9
codeclimate
Sep 20, 2021
828e762
codestyle
Sep 20, 2021
b7205dc
fixed version parsing
Sep 20, 2021
c518171
Feature/dynamic retrieve (#8)
odeimaiz Sep 20, 2021
7833b8f
bumped version
Sep 21, 2021
3c55459
bumped service version
Sep 21, 2021
6daf8ed
composing correct url
Sep 21, 2021
9584bc0
version bumped
Sep 21, 2021
009811f
revert change
Sep 21, 2021
70e9bbe
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Sep 21, 2021
b029168
computeServiceV2RetrieveUrl (#9)
odeimaiz Sep 21, 2021
4786e1e
updated openaipi specs
Sep 21, 2021
6c48725
updated requirements
Sep 23, 2021
4f5e041
added new webserver entrypoint
Sep 23, 2021
fb4586e
added retrieve api to director-v2
Sep 23, 2021
087a107
fixed director-v2 tests
Sep 23, 2021
f4f2607
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Sep 23, 2021
81457f5
fix bug
Sep 23, 2021
4b1e584
inverted oder
Sep 23, 2021
d67aa15
sending in the correct format
Sep 23, 2021
bfd853a
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Sep 23, 2021
72d828c
faster booting of dy-sidecar
Sep 23, 2021
7d4de28
removed todos
Sep 23, 2021
9c49828
update policy timeout
Sep 27, 2021
1c7b617
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Sep 27, 2021
b0b1d79
fixed depenencies after merge
Sep 27, 2021
be5c20c
using appropriate images for testing
Sep 27, 2021
dd8171c
pylint
Sep 27, 2021
35e19f6
storage is now connected for this test
Sep 27, 2021
f2733cf
fix status url
Sep 27, 2021
9f24cdf
fixed port forwarding issues
Sep 27, 2021
4b962ca
fixed broken test
Sep 27, 2021
2d97af7
removed dangerous code
Sep 28, 2021
678162b
updated openapi.json
Sep 28, 2021
63b9315
minor refactor
Sep 28, 2021
9071db6
further refactoring
Sep 28, 2021
c11e4fa
refactor
Sep 28, 2021
b9eec99
further refactoring
Sep 28, 2021
3ee22e0
shared functions refactored
Sep 28, 2021
72ba380
refactor
Sep 29, 2021
16353e6
added save_to option to datamanager.pull
Sep 30, 2021
c2847de
refactor
Sep 30, 2021
16231a8
added dependencies for testing
Sep 30, 2021
900e754
aded nodeports integration test
Sep 30, 2021
7dc8173
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Sep 30, 2021
8be09cc
removed unsued plugin
Sep 30, 2021
cfe613e
trying to fix test in CI
Oct 1, 2021
54e53a8
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 1, 2021
3fa89c9
refactor API interface
Oct 1, 2021
6e6fb6c
updated API inteface after change
Oct 1, 2021
06f6c52
moving to fixure
Oct 1, 2021
cd22dbe
added more information
Oct 1, 2021
670854a
using nodeports execption
Oct 1, 2021
ab08489
rename function
Oct 1, 2021
7789141
mocked api removed, no requests are fowarded here
Oct 1, 2021
9095c96
refactor
Oct 1, 2021
c30fae7
renaming function
Oct 1, 2021
08800da
updated docstring
Oct 1, 2021
dc59515
updated description
Oct 1, 2021
99666d3
update comment
Oct 1, 2021
75556ea
fix comment
Oct 1, 2021
ed3e9e1
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 1, 2021
0e620fc
updated _meta.py
Oct 1, 2021
fba79b4
revert to old version
Oct 1, 2021
11ad5a5
adding watchdog
Oct 1, 2021
49be5c8
added missing requirement
Oct 1, 2021
0fa5782
added missing requirement
Oct 1, 2021
e5f11bc
removed test uncertanty
Oct 1, 2021
0f61c45
enhanced test slightly
Oct 1, 2021
5488edd
fixed broken endpoint
Oct 1, 2021
63f51b6
adding some debug messages
Oct 1, 2021
c3c90fa
updated openapi specification
Oct 4, 2021
ab9ea54
expadning fixture to support multiple versions
Oct 4, 2021
2d77b37
added better description
Oct 4, 2021
6824a12
typing
Oct 4, 2021
b44a6f7
removed comment
Oct 4, 2021
aa45576
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 4, 2021
4db40c1
added timeouts
Oct 4, 2021
407df77
using ByteSize
Oct 4, 2021
0141f50
renamed
Oct 4, 2021
3797e42
renmed save_state to can_save
Oct 4, 2021
579424a
reverting change
Oct 4, 2021
26a2ffe
more renaming
Oct 4, 2021
f045dd6
renaming restore
Oct 4, 2021
52a87e9
renamed endpoints
Oct 4, 2021
cba2b5a
updated openapi.json
Oct 4, 2021
8248a1a
@sanderegg fixing in all Dockerfiles
Oct 4, 2021
0c6e9c9
migrated to dataclass
Oct 4, 2021
4b838f7
better debug messages
Oct 4, 2021
72a39ba
fixed docstring
Oct 4, 2021
4ca60ed
[skip ci] no ci when pushing
Oct 4, 2021
0bd7831
making it easier to debug
Oct 4, 2021
33ac837
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 4, 2021
e613817
trying to give nodeports more time
Oct 4, 2021
a9ff010
put back missing parameter
Oct 5, 2021
7d1ca3c
putting back option
Oct 5, 2021
9654a66
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 5, 2021
75770f6
restored missing API
Oct 5, 2021
335a608
fixed typing
Oct 5, 2021
24656aa
replaced with 3.8 version
Oct 5, 2021
8da91d1
typo
Oct 5, 2021
4e34430
using statues from starlette
Oct 5, 2021
f469779
making it run in development
Oct 5, 2021
7d47845
fixed import issue
Oct 5, 2021
57f8416
only log keys
Oct 5, 2021
9a500dd
aded status retrying
Oct 5, 2021
45c063b
ading retries on status and retrieve
Oct 5, 2021
617b673
refactor
Oct 5, 2021
4293b1e
renaming and refactoring
Oct 5, 2021
2d36167
renamed again
Oct 5, 2021
a9c2d2e
removing dependency bump
Oct 5, 2021
8e1b2cd
reverting
Oct 5, 2021
60b5c25
added minio
Oct 5, 2021
66ff2e9
added extra comments
Oct 5, 2021
5c1ed26
added more assersions
Oct 5, 2021
5c53050
added more debug prints
Oct 5, 2021
a8e39d0
adding more infomation in logs
Oct 5, 2021
b6ce757
more explicit debug message
Oct 5, 2021
a19240f
added sleep to make sure data is available
Oct 5, 2021
445b618
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 5, 2021
61ffddd
adding more information to make it easier to debug
Oct 5, 2021
f469512
tring to trigger CI again
Oct 6, 2021
95245b4
reafactoring
Oct 6, 2021
f8e5071
refactor
Oct 6, 2021
2c0ca49
used for testing, leaving it in
Oct 6, 2021
4515be6
added new container logs dump for simpler debugging
Oct 6, 2021
ca55af8
dumping logs twice
Oct 6, 2021
7d984af
adding logs debug
Oct 6, 2021
e6c4a44
updated return type
Oct 6, 2021
b7c9d94
logs from containers on fail
Oct 6, 2021
6381f3c
using version with more debug options
Oct 6, 2021
81867c6
bumped expected version in workbench
Oct 6, 2021
82b3fde
better error message and explaniation
Oct 6, 2021
624ce69
even more debug information
Oct 6, 2021
974b4ae
put back original check
Oct 6, 2021
432a6c7
added separators
Oct 6, 2021
f5b5c12
removed space
Oct 6, 2021
71bce09
trigger CI again
Oct 6, 2021
0ab0836
bumping timeout
Oct 6, 2021
19dfa8c
prining data for all containers
Oct 7, 2021
d1e8adc
fix issue
Oct 7, 2021
6e12de2
bumping dy_static_file_server version
Oct 7, 2021
cd56c78
updated versions in workbench
Oct 7, 2021
91c76ee
fixing python-lining CI
Oct 7, 2021
0d514d5
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 8, 2021
d5579a4
reverted changes
Oct 8, 2021
1cc8e3e
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 8, 2021
679aadb
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 11, 2021
6ce12e9
debug help
Oct 14, 2021
d6924a9
addresses an issue with stopping
Oct 14, 2021
7707ab6
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 14, 2021
eaf98f8
fix error
Oct 14, 2021
e3bcc76
replaced logging. with logger.
Oct 14, 2021
a0f2caf
adding more debug messages
Oct 14, 2021
4e84bbc
adding healthcheck to traefik
Oct 14, 2021
b529cec
enabling logs from traefik
Oct 14, 2021
c9f8e32
adding more logging
Oct 14, 2021
14204d2
adding more logs
Oct 14, 2021
b682c82
raising attempts 15 minute timeout
Oct 14, 2021
4a27482
adding better debug messages
Oct 15, 2021
dbd7c5c
changed the dafults to something else
Oct 15, 2021
a3ab392
added logging when data is not found to be upoaded
Oct 15, 2021
0359265
updating logs on function
Oct 15, 2021
3b3fd75
fixes a race condition when saving ports in parallel
Oct 15, 2021
21fafd9
reverting changes
Oct 18, 2021
67b7b1d
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 19, 2021
4feecf8
changed endpoint to make it more readable
Oct 19, 2021
a12379b
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 20, 2021
89cd203
removed client-sdk references
Oct 20, 2021
32f59de
migrated nodeports implementaiton
Oct 20, 2021
7984e44
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 20, 2021
510a89f
archiving folders is now done in parallel
Oct 20, 2021
bc8e8b2
fix import issue
Oct 20, 2021
4e31959
some missing types
Oct 20, 2021
c1ceb8a
typing
Oct 20, 2021
97110b1
fix pylint
Oct 20, 2021
fb73d84
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 21, 2021
9f37123
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 22, 2021
f8cb77b
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 26, 2021
2dced01
checking how much the test lasts
Oct 26, 2021
5e6ac0d
fixing timeout for test
Oct 26, 2021
b53bf16
added faster bailout
Oct 26, 2021
7c1410b
making bigger case start first
Oct 26, 2021
b62a9fe
adding debug flags to detect hangs
Oct 26, 2021
2aab907
reduced timeout to a reasonable value
Oct 27, 2021
093e1d2
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 27, 2021
7c4570c
added port forward for proxy to test
Oct 27, 2021
1972f68
making tests more reliable
Oct 27, 2021
702262f
extended logging
Oct 27, 2021
b54198f
Merge remote-tracking branch 'upstream/master' into adding-nodeports
Oct 27, 2021
23e9b21
pylint
Oct 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-testing-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1919,7 +1919,7 @@ jobs:
path: codeclimate.${{ github.job }}_coverage.json

integration-test-director-v2-02:
timeout-minutes: 20 # if this timeout gets too small, then split the tests
timeout-minutes: 30 # if this timeout gets too small, then split the tests
name: "[int] director-v2 02"
needs: [build-test-images]
runs-on: ${{ matrix.os }}
Expand Down
49 changes: 49 additions & 0 deletions api/specs/webserver/openapi-projects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,55 @@ paths:
default:
$ref: "./openapi.yaml#/components/responses/DefaultErrorResponse"

/projects/{project_id}/nodes/{node_id}/retrieve:
parameters:
- name: project_id
in: path
required: true
schema:
type: string
- name: node_id
in: path
required: true
schema:
type: string

post:
tags:
- project
description: Triggers service retrieve
operationId: post_retrieve
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
port_keys:
description: list of por keys to be retrieved
type: array
items:
type: string
responses:
"200":
description: Returns the amount of transferred bytes when pulling data via nodeports
content:
application/json:
schema:
type: object
properties:
data:
type: object
description: response payload
properties:
size_bytes:
type: integer
description: amount of transferred bytes

default:
$ref: "#/components/responses/DefaultErrorResponse"

/projects/{study_uuid}/tags/{tag_id}:
parameters:
- name: tag_id
Expand Down
3 changes: 3 additions & 0 deletions api/specs/webserver/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ paths:
/projects/{project_id}/nodes/{node_id}:
$ref: "./openapi-projects.yaml#/paths/~1projects~1{project_id}~1nodes~1{node_id}"

/projects/{project_id}/nodes/{node_id}:retrieve:
$ref: "./openapi-projects.yaml#/paths/~1projects~1{project_id}~1nodes~1{node_id}~1retrieve"

/nodes/{nodeInstanceUUID}/outputUi/{outputKey}:
$ref: "./openapi-node-v0.0.1.yaml#/paths/~1nodes~1{nodeInstanceUUID}~1outputUi~1{outputKey}"

Expand Down
1 change: 1 addition & 0 deletions ci/github/integration-testing/director-v2.bash
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ test() {
echo "testing in services/director-v2/tests/integration/$1"
pytest --cov=simcore_service_director_v2 --durations=10 --cov-append \
--color=yes --cov-report=term-missing --cov-report=xml --cov-config=.coveragerc \
-vvv -s --log-cli-level=DEBUG \
-v -m "not travis" "services/director-v2/tests/integration/$1" --log-level=DEBUG
}

Expand Down
16 changes: 9 additions & 7 deletions packages/pytest-simcore/src/pytest_simcore/docker_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,52 +230,54 @@ def jupyter_service(docker_registry: str, node_meta_schema: Dict) -> Dict[str, s
)


DY_STATIC_FILE_SERVER_VERSION = "1.0.5"
@pytest.fixture(scope="session", params=["2.0.2"])
def dy_static_file_server_version(request):
return request.param


@pytest.fixture(scope="session")
def dy_static_file_server_service(
docker_registry: str, node_meta_schema: Dict
docker_registry: str, node_meta_schema: Dict, dy_static_file_server_version: str
) -> Dict[str, str]:
"""
Adds the below service in docker registry
itisfoundation/dy-static-file-server
"""
return _pull_push_service(
"itisfoundation/dy-static-file-server",
DY_STATIC_FILE_SERVER_VERSION,
dy_static_file_server_version,
docker_registry,
node_meta_schema,
)


@pytest.fixture(scope="session")
def dy_static_file_server_dynamic_sidecar_service(
docker_registry: str, node_meta_schema: Dict
docker_registry: str, node_meta_schema: Dict, dy_static_file_server_version: str
) -> Dict[str, str]:
"""
Adds the below service in docker registry
itisfoundation/dy-static-file-server-dynamic-sidecar
"""
return _pull_push_service(
"itisfoundation/dy-static-file-server-dynamic-sidecar",
DY_STATIC_FILE_SERVER_VERSION,
dy_static_file_server_version,
docker_registry,
node_meta_schema,
)


@pytest.fixture(scope="session")
def dy_static_file_server_dynamic_sidecar_compose_spec_service(
docker_registry: str, node_meta_schema: Dict
docker_registry: str, node_meta_schema: Dict, dy_static_file_server_version: str
) -> Dict[str, str]:
"""
Adds the below service in docker registry
itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec
"""
return _pull_push_service(
"itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec",
DY_STATIC_FILE_SERVER_VERSION,
dy_static_file_server_version,
docker_registry,
node_meta_schema,
)
33 changes: 25 additions & 8 deletions packages/service-library/src/servicelib/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,25 @@
import logging
from collections import deque
from functools import wraps
from typing import Dict, List, Optional
from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Optional

import attr

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
Queue = asyncio.Queue
else:

class FakeGenericMeta(type):
def __getitem__(self, item):
return self

class Queue(
asyncio.Queue, metaclass=FakeGenericMeta
): # pylint: disable=function-redefined
pass


@attr.s(auto_attribs=True)
class Context:
Expand All @@ -30,7 +43,9 @@ async def stop_sequential_workers() -> None:
logger.info("All run_sequentially_in_context pending workers stopped")


def run_sequentially_in_context(target_args: List[str] = None):
def run_sequentially_in_context(
target_args: List[str] = None,
) -> Callable[[Any], Any]:
"""All request to function with same calling context will be run sequentially.

Example:
Expand Down Expand Up @@ -68,15 +83,17 @@ async def func(param1, param2, param3):
"""
target_args = [] if target_args is None else target_args

def internal(decorated_function):
def get_context(args, kwargs: Dict) -> Context:
def internal(
decorated_function: Callable[[Any], Optional[Any]]
) -> Callable[[Any], Optional[Any]]:
def get_context(args: Any, kwargs: Dict[Any, Any]) -> Context:
arg_names = decorated_function.__code__.co_varnames[
: decorated_function.__code__.co_argcount
]
search_args = dict(zip(arg_names, args))
search_args.update(kwargs)

key_parts = deque()
key_parts: Deque[str] = deque()
for arg in target_args:
sub_args = arg.split(".")
main_arg = sub_args[0]
Expand Down Expand Up @@ -108,13 +125,13 @@ def get_context(args, kwargs: Dict) -> Context:
return _sequential_jobs_contexts[key]

@wraps(decorated_function)
async def wrapper(*args, **kwargs):
async def wrapper(*args: Any, **kwargs: Any) -> Any:
context: Context = get_context(args, kwargs)

if not context.initialized:
context.initialized = True

async def worker(in_q: asyncio.Queue, out_q: asyncio.Queue):
async def worker(in_q: Queue, out_q: Queue) -> None:
while True:
awaitable = await in_q.get()
in_q.task_done()
Expand All @@ -137,7 +154,7 @@ async def worker(in_q: asyncio.Queue, out_q: asyncio.Queue):
worker(context.in_queue, context.out_queue)
)

await context.in_queue.put(decorated_function(*args, **kwargs))
await context.in_queue.put(decorated_function(*args, **kwargs)) # type: ignore

wrapped_result = await context.out_queue.get()
if isinstance(wrapped_result, Exception):
Expand Down
9 changes: 9 additions & 0 deletions packages/service-library/src/servicelib/pools.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from concurrent.futures import ProcessPoolExecutor
from contextlib import contextmanager
from typing import Any, Callable

# only gets created on use and is guaranteed to be the s
# ame for the entire lifetime of the application
Expand Down Expand Up @@ -35,3 +37,10 @@ def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
# FIXME: uncomment below line when the issue is fixed
# executor.shutdown(wait=False)
pass


async def async_on_threadpool(callable_function: Callable, *args: Any) -> Any:
"""Ensures blocking operation runs on shared thread pool"""
return await asyncio.get_event_loop().run_in_executor(
None, callable_function, *args
)
Comment on lines +42 to +46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: not sure this brings anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a small wrapper method. I could move it from servicelib to the dynamic-sidecar, but I would still keep it

13 changes: 6 additions & 7 deletions packages/service-library/src/servicelib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import asyncio
import logging
import os

from pathlib import Path
from typing import Any, Awaitable, Coroutine, List, Optional, Union

Expand Down Expand Up @@ -79,8 +78,11 @@ def log_exception_callback(fut: asyncio.Future):

# // tasks
async def logged_gather(
*tasks, reraise: bool = True, log: logging.Logger = logger, max_concurrency: int = 0
) -> List[Any]:
*tasks: Awaitable[Any],
reraise: bool = True,
log: logging.Logger = logger,
max_concurrency: int = 0,
) -> List[Optional[Any]]:
"""
Thin wrapper around asyncio.gather that allows excuting ALL tasks concurently until the end
even if any of them fail. Finally, all errors are logged and the first raised (if reraise=True)
Expand All @@ -91,18 +93,15 @@ async def logged_gather(
use directly asyncio.gather(*tasks, return_exceptions=True).

:param reraise: reraises first exception (in order the tasks were passed) concurrent tasks, defaults to True
:type reraise: bool, optional
:param log: passing the logger gives a chance to identify the origin of the gather call, defaults to current submodule's logger
:type log: logging.Logger, optional
:return: list of tasks results and errors e.g. [1, 2, ValueError("task3 went wrong"), 33, "foo"]
:rtype: List[Any]
"""

wrapped_tasks = tasks
if max_concurrency > 0:
semaphore = asyncio.Semaphore(max_concurrency)

async def sem_task(task):
async def sem_task(task: Awaitable[Any]) -> Any:
async with semaphore:
return await task

Expand Down
7 changes: 5 additions & 2 deletions packages/service-library/tests/test_pools.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from asyncio import BaseEventLoop
from concurrent.futures import ProcessPoolExecutor


from servicelib.pools import non_blocking_process_pool_executor
from servicelib.pools import async_on_threadpool, non_blocking_process_pool_executor


def return_int_one() -> int:
Expand Down Expand Up @@ -32,3 +31,7 @@ async def test_different_pool_instances() -> None:
max_workers=1
) as first, non_blocking_process_pool_executor() as second:
assert first != second


async def test_run_on_thread_pool() -> None:
assert await async_on_threadpool(return_int_one) == 1
1 change: 1 addition & 0 deletions packages/simcore-sdk/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pytest-mock
pytest-runner
pytest-sugar
pytest-xdist
pytest-lazy-fixture

# mockups/fixtures
alembic
Expand Down
3 changes: 3 additions & 0 deletions packages/simcore-sdk/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pytest==6.2.5
# pytest-forked
# pytest-icdiff
# pytest-instafail
# pytest-lazy-fixture
# pytest-mock
# pytest-sugar
# pytest-xdist
Expand All @@ -144,6 +145,8 @@ pytest-icdiff==0.5
# via -r requirements/_test.in
pytest-instafail==0.4.2
# via -r requirements/_test.in
pytest-lazy-fixture==0.6.3
# via -r requirements/_test.in
pytest-mock==3.6.1
# via -r requirements/_test.in
pytest-runner==5.3.1
Expand Down
34 changes: 24 additions & 10 deletions packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,36 +66,50 @@ async def push(
return await _push_file(user_id, project_id, node_uuid, archive_file_path, None)


async def _pull_file(user_id: int, project_id: str, node_uuid: str, file_path: Path):
async def _pull_file(
user_id: int,
project_id: str,
node_uuid: str,
file_path: Path,
save_to: Optional[Path] = None,
):
destination_path = file_path if save_to is None else save_to
s3_object = _create_s3_object(project_id, node_uuid, file_path)
log.info("pulling data from %s to %s...", s3_object, file_path)
downloaded_file = await filemanager.download_file_from_s3(
user_id=user_id,
store_id="0",
s3_object=s3_object,
local_folder=file_path.parent,
local_folder=destination_path.parent,
)
if downloaded_file != file_path:
if file_path.exists():
file_path.unlink()
move(f"{downloaded_file}", file_path)
log.info("%s successfuly pulled", file_path)
if downloaded_file != destination_path:
destination_path.unlink(missing_ok=True)
move(f"{downloaded_file}", destination_path)
log.info("%s successfuly pulled", destination_path)


def _get_archive_name(path: Path) -> str:
return f"{path.stem}.zip"


async def pull(user_id: int, project_id: str, node_uuid: str, file_or_folder: Path):
async def pull(
user_id: int,
project_id: str,
node_uuid: str,
file_or_folder: Path,
save_to: Optional[Path] = None,
):
if file_or_folder.is_file():
return await _pull_file(user_id, project_id, node_uuid, file_or_folder)
return await _pull_file(user_id, project_id, node_uuid, file_or_folder, save_to)
# we have a folder, so we need somewhere to extract it to
with TemporaryDirectory() as tmp_dir_name:
archive_file = Path(tmp_dir_name) / _get_archive_name(file_or_folder)
await _pull_file(user_id, project_id, node_uuid, archive_file)
log.info("extracting data from %s", archive_file)

destination_folder = file_or_folder if save_to is None else save_to
await unarchive_dir(
archive_to_extract=archive_file, destination_folder=file_or_folder
archive_to_extract=archive_file, destination_folder=destination_folder
)
log.info("extraction completed")

Expand Down
Loading