Skip to content

Commit 52c1356

Browse files
committed
improving test
1 parent aaca58c commit 52c1356

File tree

3 files changed

+106
-22
lines changed

3 files changed

+106
-22
lines changed

packages/pytest-fixtures/docker_registry.py

+20-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# pylint:disable=redefined-outer-name
66

77
import json
8+
import os
89
import time
910
from typing import Dict
1011

@@ -24,7 +25,7 @@ def docker_registry(keepdockerup: bool) -> str:
2425
container = None
2526
try:
2627
docker_client.login(registry=url, username="simcore")
27-
container = docker_client.containers.list({"name":"pytest_registry"})[0]
28+
container = docker_client.containers.list({"name": "pytest_registry"})[0]
2829
except Exception:
2930
print("Warning: docker registry is already up!")
3031
container = docker_client.containers.run(
@@ -35,7 +36,7 @@ def docker_registry(keepdockerup: bool) -> str:
3536
restart_policy={"Name": "always"},
3637
detach=True,
3738
)
38-
39+
3940
# Wait until we can connect
4041
assert _wait_till_registry_is_responsive(url)
4142

@@ -55,6 +56,11 @@ def docker_registry(keepdockerup: bool) -> str:
5556
private_image = docker_client.images.pull(repo)
5657
docker_client.images.remove(image=private_image.id)
5758

59+
# necessary for old school configs
60+
os.environ["REGISTRY_URL"] = url
61+
os.environ["REGISTRY_USER"] = "simcore"
62+
os.environ["REGISTRY_PW"] = ""
63+
5864
yield url
5965
if not keepdockerup:
6066
container.stop()
@@ -88,9 +94,12 @@ def sleeper_service(docker_registry: str) -> Dict[str, str]:
8894
image_labels = image.labels
8995

9096
yield {
91-
key[len("io.simcore."):]: json.loads(value)[key[len("io.simcore."):]]
92-
for key, value in image_labels.items()
93-
if key.startswith("io.simcore.")
97+
"schema": {
98+
key[len("io.simcore.") :]: json.loads(value)[key[len("io.simcore.") :]]
99+
for key, value in image_labels.items()
100+
if key.startswith("io.simcore.")
101+
},
102+
"image": repo,
94103
}
95104

96105

@@ -120,7 +129,10 @@ def jupyter_service(docker_registry: str) -> Dict[str, str]:
120129
image_labels = image.labels
121130

122131
yield {
123-
key[len("io.simcore."):]: json.loads(value)[key[len("io.simcore."):]]
124-
for key, value in image_labels.items()
125-
if key.startswith("io.simcore.")
132+
"schema": {
133+
key[len("io.simcore.") :]: json.loads(value)[key[len("io.simcore.") :]]
134+
for key, value in image_labels.items()
135+
if key.startswith("io.simcore.")
136+
},
137+
"image": repo,
126138
}

services/sidecar/src/sidecar/core.py

+11-8
Original file line numberDiff line numberDiff line change
@@ -299,15 +299,15 @@ async def initialize(self, task, user_id: str):
299299
task.internal_id, user_id)
300300
self._task = task
301301
self._user_id = user_id
302-
302+
import pdb; pdb.set_trace()
303303
self._docker.image_name = self._docker.registry_name + \
304-
"/" + task.image['name']
305-
self._docker.image_tag = task.image['tag']
304+
"/" + task.schema["key"]
305+
self._docker.image_tag = task.schema["version"]
306306

307307
# volume paths for side-car container
308-
self._executor.in_dir = Path.home() / "input" / task.job_id
309-
self._executor.out_dir = Path.home() / 'output' / task.job_id
310-
self._executor.log_dir = Path.home() / 'log' / task.job_id
308+
self._executor.in_dir = Path.home() / f"input/{task.job_id}"
309+
self._executor.out_dir = Path.home() / f"output/{task.job_id}"
310+
self._executor.log_dir = Path.home() / f"log/{task.job_id}"
311311

312312
# volume paths for car container (w/o prefix)
313313
self._docker.env = [
@@ -324,6 +324,7 @@ async def initialize(self, task, user_id: str):
324324
task.internal_id, user_id)
325325

326326
async def preprocess(self):
327+
import pdb; pdb.set_trace()
327328
log.debug('Pre-Processing Pipeline %s:node %s:internal id %s from container',
328329
self._task.project_id, self._task.node_id, self._task.internal_id)
329330
await self._create_shared_folders()
@@ -333,6 +334,7 @@ async def preprocess(self):
333334
self._task.project_id, self._task.node_id, self._task.internal_id)
334335

335336
async def process(self):
337+
import pdb; pdb.set_trace()
336338
log.debug('Processing Pipeline %s:node %s:internal id %s from container',
337339
self._task.project_id, self._task.node_id, self._task.internal_id)
338340

@@ -396,6 +398,7 @@ async def process(self):
396398
self._task.project_id, self._task.node_id, self._task.internal_id)
397399

398400
async def run(self):
401+
import pdb; pdb.set_trace()
399402
log.debug('Running Pipeline %s:node %s:internal id %s from container',
400403
self._task.project_id, self._task.node_id, self._task.internal_id)
401404
# NOTE: the rabbit has a timeout of 60seconds so blocking this channel for more is a no go.
@@ -448,7 +451,7 @@ async def inspect(self, celery_task, user_id: str, project_id: str, node_id: str
448451
user_id, project_id, node_id)
449452

450453
next_task_nodes = []
451-
454+
import pdb; pdb.set_trace()
452455
with session_scope(self._db.Session) as _session:
453456
_pipeline = _session.query(ComputationalPipeline).filter_by(
454457
project_id=project_id).one()
@@ -501,7 +504,7 @@ async def inspect(self, celery_task, user_id: str, project_id: str, node_id: str
501504
_session.add(task)
502505
_session.commit()
503506

504-
self.initialize(task, user_id)
507+
await self.initialize(task, user_id)
505508

506509
# now proceed actually running the task (we do that after the db session has been closed)
507510
# try to run the task, return empyt list of next nodes if anything goes wrong
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,84 @@
1-
#pylint: disable=unused-argument
1+
# pylint: disable=unused-argument
2+
# pylint: disable=redefined-outer-name
23

3-
# import pytest
4+
import os
5+
from typing import Dict
6+
from uuid import uuid4
7+
8+
import pytest
9+
import sqlalchemy as sa
10+
11+
from simcore_sdk.models.pipeline_models import (
12+
ComputationalPipeline,
13+
ComputationalTask,
14+
comp_pipeline,
15+
comp_tasks,
16+
)
417

518
# Selection of core and tool services started in this swarm fixture (integration)
619
core_services = ["storage", "postgres", "rabbit"]
720

821
ops_services = ["minio", "adminer"]
922

1023

24+
@pytest.fixture
25+
def project_id() -> str:
26+
return str(uuid4())
27+
28+
29+
@pytest.fixture
30+
def user_id() -> int:
31+
return 1
32+
33+
34+
@pytest.fixture
35+
def node_uuid() -> str:
36+
return str(uuid4())
37+
38+
39+
@pytest.fixture
40+
def pipeline_db(
41+
postgres_session: sa.orm.session.Session, project_id: str, node_uuid
42+
) -> ComputationalPipeline:
43+
pipeline = ComputationalPipeline(project_id=project_id, dag_adjacency_list={node_uuid:[]})
44+
postgres_session.add(pipeline)
45+
postgres_session.commit()
46+
yield pipeline
47+
48+
@pytest.fixture
49+
def task_db(
50+
postgres_session: sa.orm.session.Session,
51+
sleeper_service: Dict[str, str],
52+
pipeline_db: ComputationalPipeline,
53+
node_uuid: str,
54+
) -> ComputationalTask:
55+
comp_task = ComputationalTask(
56+
project_id=pipeline_db.project_id,
57+
node_id=node_uuid,
58+
schema=sleeper_service["schema"],
59+
image=sleeper_service["image"],
60+
inputs={},
61+
outputs={},
62+
)
63+
postgres_session.add(comp_task)
64+
postgres_session.commit()
65+
66+
yield comp_task
67+
68+
async def test_run_sleepers(
69+
loop,
70+
docker_stack: Dict,
71+
postgres_session: sa.orm.session.Session,
72+
sleeper_service: Dict[str, str],
73+
task_db: ComputationalTask,
74+
user_id: int,
75+
mocker,
76+
):
77+
celery_task = mocker.MagicMock()
78+
celery_task.request.id = 1
1179

12-
async def test_run_sleepers(loop, docker_stack, postgres_session, sleeper_service):
13-
import pdb
14-
pdb.set_trace()
15-
# next_task_nodes = await SIDECAR.inspect(task, user_id, pipeline_id, node_id)
80+
# Note this must happen here since DB is set already at that time
81+
from sidecar.core import SIDECAR
82+
next_task_nodes = await SIDECAR.inspect(
83+
celery_task, user_id, task_db.project_id, task_db.node_id
84+
)

0 commit comments

Comments
 (0)