@@ -67,7 +67,6 @@ def _process_task_input(self, port, input_ports):
67
67
#parse the link assuming it is link.id.file.ending
68
68
_parts = port_value .split ("." )
69
69
object_name = os .path .join (str (self ._task .pipeline_id ), _parts [1 ], "." .join (_parts [2 :]))
70
- #object_name = os.path.join(str(self._task.pipeline_id),*port_value.split(".")[1:])
71
70
input_file = os .path .join (self ._executor .in_dir , port_name )
72
71
_LOGGER .debug ('Downloading from S3 %s/%s' , self ._s3 .bucket , object_name )
73
72
success = False
@@ -138,7 +137,17 @@ def _pull_image(self):
138
137
139
138
self ._docker .client .images .pull (self ._docker .image_name , tag = self ._docker .image_tag )
140
139
141
- def _bg_job (self , task , log_file ):
140
+ def _log (self , channel , msg ):
141
+ log_data = {"Channel" : "Log" , "Node" : self ._task .node_id , "Message" : msg }
142
+ log_body = json .dumps (log_data )
143
+ channel .basic_publish (exchange = self ._pika .log_channel , routing_key = '' , body = log_body )
144
+
145
+ def _progress (self , channel , progress ):
146
+ prog_data = {"Channel" : "Progress" , "Node" : self ._task .node_id , "Progress" : progress }
147
+ prog_body = json .dumps (prog_data )
148
+ channel .basic_publish (exchange = self ._pika .progress_channel , routing_key = '' , body = prog_body )
149
+
150
+ def _bg_job (self , log_file ):
142
151
connection = pika .BlockingConnection (self ._pika .parameters )
143
152
144
153
channel = connection .channel ()
@@ -158,16 +167,11 @@ def _bg_job(self, task, log_file):
158
167
clean_line = line .strip ()
159
168
if clean_line .lower ().startswith ("[progress]" ):
160
169
progress = clean_line .lower ().lstrip ("[progress]" ).rstrip ("%" ).strip ()
161
- prog_data = { "Channel" : "Progress" , "Node" : task . node_id , "Progress" : progress }
170
+ self . _progress ( channel , progress )
162
171
_LOGGER .debug ('PROGRESS %s' , progress )
163
- prog_body = json .dumps (prog_data )
164
- channel .basic_publish (exchange = self ._pika .progress_channel , routing_key = '' , body = prog_body )
165
172
else :
166
- log_data = { "Channel" : "Log" , "Node" : task . node_id , "Message" : clean_line }
173
+ self . _log ( channel , clean_line )
167
174
_LOGGER .debug ('LOG %s' , clean_line )
168
- log_body = json .dumps (log_data )
169
- channel .basic_publish (exchange = self ._pika .log_channel , routing_key = '' , body = log_body )
170
-
171
175
172
176
connection .close ()
173
177
@@ -266,7 +270,7 @@ def process(self):
266
270
log_file = os .path .join (self ._executor .log_dir , "log.dat" )
267
271
268
272
Path (log_file ).touch ()
269
- fut = self ._executor .pool .submit (self ._bg_job , self . _task , log_file )
273
+ fut = self ._executor .pool .submit (self ._bg_job , log_file )
270
274
271
275
try :
272
276
docker_image = self ._docker .image_name + ":" + self ._docker .image_tag
@@ -292,10 +296,30 @@ def process(self):
292
296
_LOGGER .debug ('DONE Processing Pipeline %s and node %s from container' , self ._task .pipeline_id , self ._task .internal_id )
293
297
294
298
def run (self ):
295
- _LOGGER .debug ("ENTERING run" )
299
+ connection = pika .BlockingConnection (self ._pika .parameters )
300
+
301
+ channel = connection .channel ()
302
+ channel .exchange_declare (exchange = self ._pika .log_channel , exchange_type = 'fanout' , auto_delete = True )
303
+
304
+ msg = "Preprocessing start..."
305
+ self ._log (channel , msg )
296
306
self .preprocess ()
307
+ msg = "...preprocessing end"
308
+ self ._log (channel , msg )
309
+
310
+ msg = "Processing start..."
311
+ self ._log (channel , msg )
297
312
self .process ()
313
+ msg = "...processing end"
314
+ self ._log (channel , msg )
315
+
316
+ msg = "Postprocessing start..."
317
+ self ._log (channel , msg )
298
318
self .postprocess ()
319
+ msg = "...postprocessing end"
320
+ self ._log (channel , msg )
321
+ connection .close ()
322
+
299
323
300
324
def postprocess (self ):
301
325
_LOGGER .debug ('Post-Processing Pipeline %s and node %s from container' , self ._task .pipeline_id , self ._task .internal_id )
0 commit comments