20
20
21
21
from . import director_sdk
22
22
from .application_keys import APP_CONFIG_KEY
23
- from .comp_backend_worker import celery
23
+ from .computation_worker import celery
24
24
25
25
# TODO: this should be coordinated with postgres options from config/server.yaml
26
26
#from simcore_sdk.config.db import Config as DbConfig
34
34
35
35
36
36
db_session = None
37
- comp_backend_routes = web .RouteTableDef ()
37
+ computation_routes = web .RouteTableDef ()
38
38
39
39
async def init_database (_app ):
40
40
#pylint: disable=W0603
@@ -88,7 +88,7 @@ async def _get_node_details(node_key:str, node_version:str)->dict:
88
88
try :
89
89
services_enveloped = await director_sdk .get_director ().services_by_key_version_get (node_key , node_version )
90
90
node_details = services_enveloped .data [0 ].to_dict ()
91
- return node_details
91
+ return node_details
92
92
except ApiException as err :
93
93
log .exception ("Error could not find service %s:%s" , node_key , node_version )
94
94
raise web_exceptions .HTTPNotFound (reason = str (err ))
@@ -120,7 +120,7 @@ async def _build_adjacency_list(node_uuid:str, node_schema:dict, node_inputs:dic
120
120
if node_uuid not in dag_adjacency_list [input_node_uuid ] and is_node_computational :
121
121
dag_adjacency_list [input_node_uuid ].append (node_uuid )
122
122
return dag_adjacency_list
123
-
123
+
124
124
async def _parse_pipeline (pipeline_data :dict ): # pylint: disable=R0912
125
125
dag_adjacency_list = dict ()
126
126
tasks = dict ()
@@ -133,7 +133,7 @@ async def _parse_pipeline(pipeline_data:dict): # pylint: disable=R0912
133
133
node_key = value ["key" ]
134
134
node_version = value ["version" ]
135
135
136
- # get the task data
136
+ # get the task data
137
137
node_inputs = None
138
138
if "inputs" in value :
139
139
node_inputs = value ["inputs" ]
@@ -167,20 +167,20 @@ async def _parse_pipeline(pipeline_data:dict): # pylint: disable=R0912
167
167
168
168
async def _set_adjacency_in_pipeline_db (project_id , dag_adjacency_list ):
169
169
try :
170
- pipeline = db_session .query (ComputationalPipeline ).filter (ComputationalPipeline .project_id == project_id ).one ()
170
+ pipeline = db_session .query (ComputationalPipeline ).filter (ComputationalPipeline .project_id == project_id ).one ()
171
171
log .debug ("Pipeline object found" )
172
172
pipeline .state = 0
173
173
pipeline .dag_adjacency_list = dag_adjacency_list
174
174
except sqlalchemy .orm .exc .NoResultFound :
175
175
# let's create one then
176
- pipeline = ComputationalPipeline (project_id = project_id , dag_adjacency_list = dag_adjacency_list , state = 0 )
176
+ pipeline = ComputationalPipeline (project_id = project_id , dag_adjacency_list = dag_adjacency_list , state = 0 )
177
177
log .debug ("Pipeline object created" )
178
178
db_session .add (pipeline )
179
179
except sqlalchemy .orm .exc .MultipleResultsFound :
180
180
log .exception ("the computation pipeline %s is not unique" , project_id )
181
181
raise
182
182
183
- async def _set_tasks_in_tasks_db (project_id , tasks ):
183
+ async def _set_tasks_in_tasks_db (project_id , tasks ):
184
184
tasks_db = db_session .query (ComputationalTask ).filter (ComputationalTask .project_id == project_id ).all ()
185
185
# delete tasks that were deleted from the db
186
186
for task_db in tasks_db :
@@ -213,7 +213,7 @@ async def _set_tasks_in_tasks_db(project_id, tasks):
213
213
db_session .add (comp_task )
214
214
215
215
# pylint:disable=too-many-branches, too-many-statements
216
- @comp_backend_routes .post ("/start_pipeline" )
216
+ @computation_routes .post ("/start_pipeline" )
217
217
async def start_pipeline (request ):
218
218
#pylint:disable=broad-except
219
219
# FIXME: this should be implemented generaly using async lazy initialization of db_session??
@@ -230,7 +230,7 @@ async def start_pipeline(request):
230
230
231
231
log .debug ("Client calls start_pipeline with project id: %s, pipeline data %s" , project_id , pipeline_data )
232
232
dag_adjacency_list , tasks = await _parse_pipeline (pipeline_data )
233
- log .debug ("Pipeline parsed:\n list: %s\n tasks: %s" , str (dag_adjacency_list ), str (tasks ))
233
+ log .debug ("Pipeline parsed:\n list: %s\n tasks: %s" , str (dag_adjacency_list ), str (tasks ))
234
234
try :
235
235
await _set_adjacency_in_pipeline_db (project_id , dag_adjacency_list )
236
236
await _set_tasks_in_tasks_db (project_id , tasks )
0 commit comments