@@ -467,16 +467,20 @@ func (tr *taskResult[T]) dependencies() []*taskDefinition {
467
467
// A Workflow is an instantiated workflow instance, ready to run.
468
468
type Workflow struct {
469
469
ID uuid.UUID
470
- def * Definition
471
470
params map [string ]interface {}
472
471
retryCommands chan retryCommand
473
472
473
+ // Notes on ownership and concurrency:
474
+ // The taskDefinitions used below are immutable. Everything else should be
475
+ // treated as mutable, used only in the Run goroutine, and never published
476
+ // to a background goroutine.
477
+
478
+ def * Definition
474
479
tasks map [* taskDefinition ]* taskState
475
480
}
476
481
477
482
type taskState struct {
478
483
def * taskDefinition
479
- w * Workflow
480
484
started bool
481
485
finished bool
482
486
result interface {}
@@ -485,19 +489,6 @@ type taskState struct {
485
489
retryCount int
486
490
}
487
491
488
- func (t * taskState ) args () ([]reflect.Value , bool ) {
489
- for _ , dep := range t .def .deps {
490
- if depState , ok := t .w .tasks [dep ]; ! ok || ! depState .finished || depState .err != nil {
491
- return nil , false
492
- }
493
- }
494
- var args []reflect.Value
495
- for _ , v := range t .def .args {
496
- args = append (args , v .value (t .w ))
497
- }
498
- return args , true
499
- }
500
-
501
492
func (t * taskState ) toExported () * TaskState {
502
493
state := & TaskState {
503
494
Name : t .def .name ,
@@ -526,7 +517,7 @@ func Start(def *Definition, params map[string]interface{}) (*Workflow, error) {
526
517
return nil , err
527
518
}
528
519
for _ , taskDef := range def .tasks {
529
- w .tasks [taskDef ] = & taskState {def : taskDef , w : w }
520
+ w .tasks [taskDef ] = & taskState {def : taskDef }
530
521
}
531
522
return w , nil
532
523
}
@@ -592,7 +583,6 @@ func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskSt
592
583
}
593
584
state := & taskState {
594
585
def : taskDef ,
595
- w : w ,
596
586
started : tState .Finished , // Can't resume tasks, so either it's new or done.
597
587
finished : tState .Finished ,
598
588
serializedResult : tState .SerializedResult ,
@@ -661,15 +651,15 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
661
651
if task .started {
662
652
continue
663
653
}
664
- in , ready := task . args ( )
654
+ args , ready := w . taskArgs ( task . def )
665
655
if ! ready {
666
656
continue
667
657
}
668
658
task .started = true
669
659
running ++
670
660
listener .TaskStateChanged (w .ID , task .def .name , task .toExported ())
671
661
go func (task taskState ) {
672
- stateChan <- w . runTask (ctx , listener , task , in )
662
+ stateChan <- runTask (ctx , w . ID , listener , task , args )
673
663
}(* task )
674
664
}
675
665
}
@@ -699,7 +689,7 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
699
689
break
700
690
}
701
691
listener .Logger (w .ID , def .name ).Printf ("Manual retry requested" )
702
- stateChan <- taskState {def : def , w : w }
692
+ stateChan <- taskState {def : def }
703
693
retry .reply <- nil
704
694
// Don't get stuck when cancellation comes in after all tasks have
705
695
// finished, but also don't busy wait if something's still running.
@@ -709,20 +699,33 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
709
699
}
710
700
}
711
701
702
+ func (w * Workflow ) taskArgs (def * taskDefinition ) ([]reflect.Value , bool ) {
703
+ for _ , dep := range def .deps {
704
+ if depState , ok := w .tasks [dep ]; ! ok || ! depState .finished || depState .err != nil {
705
+ return nil , false
706
+ }
707
+ }
708
+ var args []reflect.Value
709
+ for _ , v := range def .args {
710
+ args = append (args , v .value (w ))
711
+ }
712
+ return args , true
713
+ }
714
+
712
715
// Maximum number of retries. This could be a workflow property.
713
716
var MaxRetries = 3
714
717
715
718
var WatchdogDelay = 10 * time .Minute
716
719
717
- func ( w * Workflow ) runTask (ctx context.Context , listener Listener , state taskState , args []reflect.Value ) taskState {
720
+ func runTask (ctx context.Context , workflowID uuid. UUID , listener Listener , state taskState , args []reflect.Value ) taskState {
718
721
ctx , cancel := context .WithCancel (ctx )
719
722
defer cancel ()
720
723
721
724
tctx := & TaskContext {
722
725
Context : ctx ,
723
- Logger : listener .Logger (w . ID , state .def .name ),
726
+ Logger : listener .Logger (workflowID , state .def .name ),
724
727
TaskName : state .def .name ,
725
- WorkflowID : w . ID ,
728
+ WorkflowID : workflowID ,
726
729
watchdogTimer : time .AfterFunc (WatchdogDelay , cancel ),
727
730
}
728
731
@@ -750,7 +753,6 @@ func (w *Workflow) runTask(ctx context.Context, listener Listener, state taskSta
750
753
tctx .Printf ("task failed, will retry (%v of %v): %v" , state .retryCount + 1 , MaxRetries , state .err )
751
754
state = taskState {
752
755
def : state .def ,
753
- w : state .w ,
754
756
retryCount : state .retryCount + 1 ,
755
757
}
756
758
}
0 commit comments