@@ -308,7 +308,9 @@ def requires(*tasks_to_require):
308
308
309
309
310
310
class Workflow (object ):
311
-
311
+ """
312
+ The class is used to orchestrate tasks and define a task pipeline
313
+ """
312
314
313
315
def __init__ (self , task = None , params = None , path = None ):
314
316
self .params = {} if params is None else params
@@ -317,26 +319,71 @@ def __init__(self, task = None, params=None, path=None):
317
319
318
320
319
321
def preview (self , tasks = None , indent = '' , last = True , show_params = True , clip_params = False ):
322
+ """
323
+ Preview task flows with the workflow parameters
324
+
325
+ Args:
326
+ tasks (class, list): task class or list of tasks class
327
+ """
320
328
if not isinstance (tasks , (list ,)):
321
329
tasks = [tasks ]
322
330
tasks_inst = [self .get_task (x ) for x in tasks ]
323
331
return preview (tasks = tasks_inst , indent = indent , last = last , show_params = show_params , clip_params = clip_params )
324
332
325
333
326
334
def run (self ,tasks = None , forced = None , forced_all = False , forced_all_upstream = False , confirm = True , workers = 1 , abort = True , execution_summary = None , ** kwargs ):
335
+ """
336
+ Run tasks with the workflow parameters. See luigi.build for additional details
337
+
338
+ Args:
339
+ tasks (class, list): task class or list of tasks class
340
+ forced (list): list of forced tasks
341
+ forced_all (bool): force all tasks
342
+ forced_all_upstream (bool): force all tasks including upstream
343
+ confirm (list): confirm invalidating tasks
344
+ workers (int): number of workers
345
+ abort (bool): on errors raise exception
346
+ execution_summary (bool): print execution summary
347
+ kwargs: keywords to pass to luigi.build
348
+
349
+ """
327
350
if not isinstance (tasks , (list ,)):
328
351
tasks = [tasks ]
329
352
tasks_inst = [self .get_task (x ) for x in tasks ]
330
353
return run (tasks_inst , forced = forced , forced_all = forced_all , forced_all_upstream = forced_all_upstream , confirm = confirm , workers = workers , abort = abort , execution_summary = execution_summary , ** kwargs )
331
354
332
355
333
356
def outputLoad (self , task = None , keys = None , as_dict = False , cached = False ):
357
+ """
358
+ Load output from task with the workflow parameters
359
+
360
+ Args:
361
+ task (class): task class
362
+ keys (list): list of data to load
363
+ as_dict (bool): cache data in memory
364
+ cached (bool): cache data in memory
365
+
366
+ Returns: list or dict of all task output
367
+ """
334
368
return self .get_task (task ).outputLoad (keys = keys , as_dict = as_dict , cached = cached )
335
369
370
+
336
371
def outputLoadMeta (self , task = None ):
337
372
return self .get_task (task ).outputLoadMeta ()
338
373
374
+
339
375
def outputLoadAll (self , task = None , keys = None , as_dict = False , cached = False ):
376
+ """
377
+ Load all output from task with the workflow parameters
378
+
379
+ Args:
380
+ task (class): task class
381
+ keys (list): list of data to load
382
+ as_dict (bool): cache data in memory
383
+ cached (bool): cache data in memory
384
+
385
+ Returns: list or dict of all task output
386
+ """
340
387
task_inst = self .get_task (task )
341
388
data_dict = {}
342
389
tasks = taskflow_upstream (task_inst )
@@ -351,6 +398,16 @@ def reset(self, task, confirm=True):
351
398
352
399
353
400
def reset_downstream (self , task , task_downstream , confirm = True ):
401
+ """
402
+ Invalidate all downstream tasks in a flow.
403
+
404
+ For example, you have 3 dependant tasks. Normally you run Task3 but you've changed parameters for Task1. By invalidating Task3 it will check the full DAG and realize Task1 needs to be invalidated and therefore Task2 and Task3 also.
405
+
406
+ Args:
407
+ task (obj): task to invalidate. This should be an downstream task for which you want to check downstream dependencies for invalidation conditions
408
+ task_downstream (obj): downstream task target
409
+ confirm (bool): confirm operation
410
+ """
354
411
task_inst = self .get_task (task )
355
412
task_downstream_inst = self .get_task (task_downstream )
356
413
return taskflow_downstream (task_inst , task_downstream_inst , confirm )
@@ -360,10 +417,26 @@ def reset_upstream(self, task, confirm=True):
360
417
task_inst = self .get_task (task )
361
418
return invalidate_upstream (task_inst , confirm )
362
419
420
+
363
421
def set_default (self , task ):
422
+ """
423
+ Set default task for the workflow object
424
+
425
+ Args:
426
+ task(obj) The task to be set as a default task
427
+ """
364
428
self .default_task = task
365
429
430
+
366
431
def get_task (self , task = None ):
432
+ """
433
+ Get task with the workflow parameters
434
+
435
+ Args:
436
+ task(class)
437
+
438
+ Retuns: An instance of task class with the workflow parameters
439
+ """
367
440
if task is None :
368
441
if self .default_task is None :
369
442
raise RuntimeError ('no default tasks set' )
@@ -373,6 +446,10 @@ def get_task(self, task = None):
373
446
374
447
375
448
class WorkflowMulti (object ):
449
+ """
450
+ A multi experiment workflow can be defined with multiple flows and separate parameters for each flow and a default task. It is mandatory to define the flows and parameters for each of the flows.
451
+
452
+ """
376
453
377
454
def __init__ (self , task = None , params = None , path = None ):
378
455
self .params = params
@@ -386,6 +463,23 @@ def __init__(self, task = None, params = None, path=None):
386
463
387
464
388
465
def run (self , flow = None , tasks = None , forced = None , forced_all = False , forced_all_upstream = False , confirm = True , workers = 1 , abort = True , execution_summary = None , ** kwargs ):
466
+ """
467
+ Run tasks with the workflow parameters for a flow. See luigi.build for additional details
468
+
469
+ Args:
470
+ flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
471
+ tasks (class, list): task class or list of tasks class
472
+ forced (list): list of forced tasks
473
+ forced_all (bool): force all tasks
474
+ forced_all_upstream (bool): force all tasks including upstream
475
+ confirm (list): confirm invalidating tasks
476
+ workers (int): number of workers
477
+ abort (bool): on errors raise exception
478
+ execution_summary (bool): print execution summary
479
+ kwargs: keywords to pass to luigi.build
480
+
481
+ """
482
+
389
483
if flow is not None :
390
484
return self .workflow_objs [flow ].run (tasks = tasks , forced = forced , forced_all = forced_all ,
391
485
forced_all_upstream = forced_all_upstream , confirm = confirm , workers = workers ,
@@ -400,6 +494,18 @@ def run(self, flow = None, tasks=None, forced=None, forced_all=False, forced_all
400
494
401
495
402
496
def outputLoad (self , flow = None , task = None , keys = None , as_dict = False , cached = False ):
497
+ """
498
+ Load output from task with the workflow parameters for a flow
499
+
500
+ Args:
501
+ flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
502
+ task (class): task class
503
+ keys (list): list of data to load
504
+ as_dict (bool): cache data in memory
505
+ cached (bool): cache data in memory
506
+
507
+ Returns: list or dict of all task output
508
+ """
403
509
if flow is not None :
404
510
return self .workflow_objs [flow ].outputLoad (task , keys , as_dict , cached )
405
511
data = {}
@@ -417,6 +523,18 @@ def outputLoadMeta(self, flow = None, task=None):
417
523
418
524
419
525
def outputLoadAll (self , flow = None , task = None , keys = None , as_dict = False , cached = False ):
526
+ """
527
+ Load all output from task with the workflow parameters for a flow
528
+
529
+ Args:
530
+ flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
531
+ task (class): task class
532
+ keys (list): list of data to load
533
+ as_dict (bool): cache data in memory
534
+ cached (bool): cache data in memory
535
+
536
+ Returns: list or dict of all task output
537
+ """
420
538
if flow is not None :
421
539
return self .workflow_objs [flow ].outputLoadAll (task , keys , as_dict , cached )
422
540
data = {}
@@ -437,6 +555,13 @@ def reset_upstream(self, flow = None, task=None, confirm=True):
437
555
438
556
439
557
def preview (self , flow = None , tasks = None , indent = '' , last = True , show_params = True , clip_params = False ):
558
+ """
559
+ Preview task flows with the workflow parameters for a flow
560
+
561
+ Args:
562
+ flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
563
+ tasks (class, list): task class or list of tasks class
564
+ """
440
565
if not isinstance (tasks , (list ,)):
441
566
tasks = [tasks ]
442
567
if flow is not None :
@@ -448,12 +573,27 @@ def preview(self, flow = None, tasks = None, indent='', last=True, show_params=T
448
573
449
574
450
575
def set_default (self , task ):
576
+ """
577
+ Set default task for the workflow. The default task is set for all the experiments
578
+
579
+ Args:
580
+ task(obj) The task to be set as a default task
581
+ """
451
582
self .default_task = task
452
583
for exp_name in self .params .keys ():
453
584
self .workflow_objs [exp_name ].set_default (task )
454
585
455
586
456
587
def get_task (self , flow = None , task = None ):
588
+ """
589
+ Get task with the workflow parameters for a flow
590
+
591
+ Args:
592
+ flow (string): The name of the experiment for which the flow is to be run. If nothing is passed, all the flows are run
593
+ task(class): task class
594
+
595
+ Retuns: An instance of task class with the workflow parameters
596
+ """
457
597
if task is None :
458
598
if self .default_task is None :
459
599
raise RuntimeError ('no default tasks set' )
0 commit comments