Skip to content

Commit bf01fcc

Browse files
committed
internal/workflow: add expansions
One of the upcoming use cases for relui, tagging x/ repos, has a dependency graph we can't know without doing significant work. This is in conflict with one of the workflow package's design principles, that workflows be defined ahead of time. So here's one idea for bending that rule. Introduce the concept of an expansion task. An expansion task runs the same as a normal task, but in addition to its inputs it receives (a copy of) the workflow definition and can add tasks and outputs to it as usual. Once the task finishes running, the workflow swaps in the new definition and continues. Other tasks can run concurrently. One significant difference is that the expansion task must run every time the workflow is started or resumed, so they really should be pure functions with no network access or side effects. To encourage this, they don't receive a Context. Introducing this required some refactoring of the existing Run loop; we need to allow these apparently-unused tasks to run, and deal with new tasks popping up mid-run. But overall I think it's not too huge a distortion to the design. For golang/go#48523. Change-Id: I92c164883e06474fa951abba5fa18fd78eacb8ce Reviewed-on: https://go-review.googlesource.com/c/build/+/423037 Run-TryBot: Heschi Kreinick <[email protected]> TryBot-Result: Gopher Robot <[email protected]> Reviewed-by: Jenny Rakoczy <[email protected]>
1 parent 7cf3efd commit bf01fcc

File tree

2 files changed

+285
-62
lines changed

2 files changed

+285
-62
lines changed

Diff for: internal/workflow/workflow.go

+185-55
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77
//
88
// Workflows are a set of tasks and actions that produce and consume Values.
99
// Tasks don't run until the workflow is started, so Values represent data that
10-
// doesn't exist yet, and can't be used directly. Each value has a dynamic type,
11-
// which must match its uses.
10+
// doesn't exist yet, and can't be used directly.
1211
//
1312
// To wrap an existing Go object in a Value, use Const. To define a
1413
// parameter that will be set when the workflow is started, use Param.
@@ -32,6 +31,13 @@
3231
// definitions to create an ordering dependency that doesn't correspond to a
3332
// function argument.
3433
//
34+
// Expansions are a third type of function that adds to a running workflow
35+
// definition rather than producing an output. Unlike Actions and Tasks, they
36+
// execute multiple times and must produce exactly the same workflow
37+
// modifications each time. As such, they should be pure functions of their
38+
// inputs. Producing different modifications, or running multiple expansions
39+
// concurrently, is an error that will corrupt the workflow's state.
40+
//
3541
// Once a Definition is complete, call Start to set its parameters and
3642
// instantiate it into a Workflow. Call Run to execute the workflow until
3743
// completion.
@@ -75,6 +81,17 @@ func (d *Definition) name(name string) string {
7581
return d.namePrefix + name
7682
}
7783

84+
func (d *Definition) shallowClone() *Definition {
85+
clone := New()
86+
for k, v := range d.tasks {
87+
clone.tasks[k] = v
88+
}
89+
for k, v := range d.outputs {
90+
clone.outputs[k] = v
91+
}
92+
return clone
93+
}
94+
7895
type definitionState struct {
7996
parameters []MetaParameter // Ordered according to registration, unique parameter names.
8097
tasks map[string]*taskDefinition
@@ -324,7 +341,7 @@ func Task5[C context.Context, I1, I2, I3, I4, I5, O1 any](d *Definition, name st
324341
return addTask[O1](d, name, f, []metaValue{i1, i2, i3, i4, i5}, opts)
325342
}
326343

327-
func addTask[O1 any](d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) *taskResult[O1] {
344+
func addFunc(d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) *taskDefinition {
328345
name = d.name(name)
329346
td := &taskDefinition{name: name, f: f, args: inputs}
330347
for _, input := range inputs {
@@ -334,12 +351,22 @@ func addTask[O1 any](d *Definition, name string, f interface{}, inputs []metaVal
334351
td.deps = append(td.deps, opt.(*after).deps...)
335352
}
336353
d.tasks[name] = td
354+
return td
355+
}
356+
357+
func addTask[O1 any](d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) *taskResult[O1] {
358+
td := addFunc(d, name, f, inputs, opts)
337359
return &taskResult[O1]{td}
338360
}
339361

340362
func addAction(d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) *dependency {
341-
tr := addTask[interface{}](d, name, f, inputs, opts)
342-
return &dependency{tr.task}
363+
td := addFunc(d, name, f, inputs, opts)
364+
return &dependency{td}
365+
}
366+
367+
func addExpansion(d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) {
368+
td := addFunc(d, name, f, inputs, opts)
369+
td.isExpansion = true
343370
}
344371

345372
// ActionN adds an Action to the workflow definition. Its behavior and
@@ -377,6 +404,39 @@ func (d *dependency) dependencies() []*taskDefinition {
377404
return []*taskDefinition{d.task}
378405
}
379406

407+
// ExpandN adds a workflow expansion task to the workflow definition.
408+
// Expansion tasks run similarly to normal tasks, but instead of computing
409+
// a result, they can add to the workflow definition.
410+
//
411+
// Unlike normal tasks, expansions may run multiple times and must produce
412+
// the exact same changes to the definition each time.
413+
//
414+
// Running more than one expansion concurrently is an error and will corrupt
415+
// the workflow.
416+
func Expand0(d *Definition, name string, f func(*Definition) error, opts ...TaskOption) {
417+
addExpansion(d, name, f, nil, opts)
418+
}
419+
420+
func Expand1[I1 any](d *Definition, name string, f func(*Definition, I1) error, i1 Value[I1], opts ...TaskOption) {
421+
addExpansion(d, name, f, []metaValue{i1}, opts)
422+
}
423+
424+
func Expand2[I1, I2 any](d *Definition, name string, f func(*Definition, I1, I2) error, i1 Value[I1], i2 Value[I2], opts ...TaskOption) {
425+
addExpansion(d, name, f, []metaValue{i1, i2}, opts)
426+
}
427+
428+
func Expand3[I1, I2, I3 any](d *Definition, name string, f func(*Definition, I1, I2, I3) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) {
429+
addExpansion(d, name, f, []metaValue{i1, i2, i3}, opts)
430+
}
431+
432+
func Expand4[I1, I2, I3, I4 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) {
433+
addExpansion(d, name, f, []metaValue{i1, i2, i3, i4}, opts)
434+
}
435+
436+
func Expand5[I1, I2, I3, I4, I5 any](d *Definition, name string, f func(*Definition, I1, I2, I3, I4, I5) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) {
437+
addExpansion(d, name, f, []metaValue{i1, i2, i3, i4, i5}, opts)
438+
}
439+
380440
// A TaskContext is a context.Context, plus workflow-related features.
381441
type TaskContext struct {
382442
disableRetries bool
@@ -439,10 +499,11 @@ type Logger interface {
439499
}
440500

441501
type taskDefinition struct {
442-
name string
443-
args []metaValue
444-
deps []*taskDefinition
445-
f interface{}
502+
name string
503+
isExpansion bool
504+
args []metaValue
505+
deps []*taskDefinition
506+
f interface{}
446507
}
447508

448509
type taskResult[T any] struct {
@@ -477,16 +538,25 @@ type Workflow struct {
477538

478539
def *Definition
479540
tasks map[*taskDefinition]*taskState
541+
// pendingStates stores states that haven't been loaded because their
542+
// tasks didn't exist at Resume time.
543+
pendingStates map[string]*TaskState
480544
}
481545

482546
type taskState struct {
483-
def *taskDefinition
484-
started bool
485-
finished bool
547+
def *taskDefinition
548+
created bool
549+
started bool
550+
finished bool
551+
err error
552+
553+
// normal tasks
486554
result interface{}
487555
serializedResult []byte
488-
err error
489556
retryCount int
557+
558+
// workflow expansion
559+
expanded *Definition
490560
}
491561

492562
func (t *taskState) toExported() *TaskState {
@@ -534,7 +604,7 @@ func (w *Workflow) validate() error {
534604
used[output] = true
535605
}
536606
for _, task := range w.def.tasks {
537-
if !used[task] {
607+
if !used[task] && !task.isExpansion {
538608
return fmt.Errorf("task %v is not referenced and should be deleted", task.name)
539609
}
540610
}
@@ -568,39 +638,55 @@ func (w *Workflow) validate() error {
568638
func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) {
569639
w := &Workflow{
570640
ID: state.ID,
571-
def: def,
572641
params: state.Params,
573-
tasks: map[*taskDefinition]*taskState{},
574642
retryCommands: make(chan retryCommand, len(def.tasks)),
643+
def: def,
644+
tasks: map[*taskDefinition]*taskState{},
645+
pendingStates: taskStates,
575646
}
576647
if err := w.validate(); err != nil {
577648
return nil, err
578649
}
579650
for _, taskDef := range def.tasks {
580-
tState, ok := taskStates[taskDef.name]
581-
if !ok {
582-
return nil, fmt.Errorf("task state for %q not found", taskDef.name)
651+
var err error
652+
w.tasks[taskDef], err = loadTaskState(w.pendingStates, taskDef, false)
653+
if err != nil {
654+
return nil, fmt.Errorf("loading state for %v: %v", taskDef.name, err)
583655
}
584-
state := &taskState{
585-
def: taskDef,
586-
started: tState.Finished, // Can't resume tasks, so either it's new or done.
587-
finished: tState.Finished,
588-
serializedResult: tState.SerializedResult,
589-
retryCount: tState.RetryCount,
590-
}
591-
if state.serializedResult != nil {
592-
result, err := unmarshalNew(reflect.ValueOf(taskDef.f).Type().Out(0), tState.SerializedResult)
593-
if err != nil {
594-
return nil, fmt.Errorf("failed to unmarshal result of %v: %v", taskDef.name, err)
595-
}
596-
state.result = result
656+
}
657+
return w, nil
658+
}
659+
660+
func loadTaskState(states map[string]*TaskState, def *taskDefinition, allowMissing bool) (*taskState, error) {
661+
tState, ok := states[def.name]
662+
if !ok {
663+
if !allowMissing {
664+
return nil, fmt.Errorf("task state not found")
597665
}
598-
if tState.Error != "" {
599-
state.err = fmt.Errorf("serialized error: %v", tState.Error) // untyped, but hopefully that doesn't matter.
666+
tState = &TaskState{}
667+
}
668+
// Can't resume tasks, so either it's new or done.
669+
// Expansions need to run every time.
670+
finished := tState.Finished && !def.isExpansion
671+
state := &taskState{
672+
def: def,
673+
created: ok,
674+
started: finished,
675+
finished: finished,
676+
serializedResult: tState.SerializedResult,
677+
retryCount: tState.RetryCount,
678+
}
679+
if state.serializedResult != nil {
680+
result, err := unmarshalNew(reflect.ValueOf(def.f).Type().Out(0), tState.SerializedResult)
681+
if err != nil {
682+
return nil, fmt.Errorf("failed to unmarshal result: %v", err)
600683
}
601-
w.tasks[taskDef] = state
684+
state.result = result
602685
}
603-
return w, nil
686+
if tState.Error != "" {
687+
state.err = fmt.Errorf("serialized error: %v", tState.Error) // untyped, but hopefully that doesn't matter.
688+
}
689+
return state, nil
604690
}
605691

606692
func unmarshalNew(t reflect.Type, data []byte) (interface{}, error) {
@@ -620,29 +706,25 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
620706
listener = &defaultListener{}
621707
}
622708

623-
for _, task := range w.tasks {
624-
listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
625-
}
626-
627709
stateChan := make(chan taskState, 2*len(w.def.tasks))
628710
doneOnce := ctx.Done()
629711
for {
630-
// If we have all the outputs, the workflow is done.
631-
outValues := map[string]interface{}{}
632-
for outName, outDef := range w.def.outputs {
633-
if task := w.tasks[outDef]; task.finished && task.err == nil {
634-
outValues[outName] = task.result
635-
}
636-
}
637-
if len(outValues) == len(w.def.outputs) {
638-
return outValues, nil
639-
}
640-
641712
running := 0
713+
allDone := true
642714
for _, task := range w.tasks {
715+
if !task.created {
716+
task.created = true
717+
listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
718+
}
643719
if task.started && !task.finished {
644720
running++
645721
}
722+
if !task.finished || task.err != nil {
723+
allDone = false
724+
}
725+
}
726+
if allDone {
727+
break
646728
}
647729

648730
if ctx.Err() == nil {
@@ -658,9 +740,13 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
658740
task.started = true
659741
running++
660742
listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
661-
go func(task taskState) {
662-
stateChan <- runTask(ctx, w.ID, listener, task, args)
663-
}(*task)
743+
taskCopy := *task
744+
if task.def.isExpansion {
745+
defCopy := w.def.shallowClone()
746+
go func() { stateChan <- runExpansion(defCopy, taskCopy, args) }()
747+
} else {
748+
go func() { stateChan <- runTask(ctx, w.ID, listener, taskCopy, args) }()
749+
}
664750
}
665751
}
666752

@@ -675,6 +761,9 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
675761

676762
select {
677763
case state := <-stateChan:
764+
if state.def.isExpansion && state.finished && state.err == nil {
765+
state.err = w.expand(state.expanded)
766+
}
678767
listener.TaskStateChanged(w.ID, state.def.name, state.toExported())
679768
w.tasks[state.def] = &state
680769
case retry := <-w.retryCommands:
@@ -689,14 +778,20 @@ func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]inter
689778
break
690779
}
691780
listener.Logger(w.ID, def.name).Printf("Manual retry requested")
692-
stateChan <- taskState{def: def}
781+
stateChan <- taskState{def: def, created: true}
693782
retry.reply <- nil
694783
// Don't get stuck when cancellation comes in after all tasks have
695784
// finished, but also don't busy wait if something's still running.
696785
case <-doneOnce:
697786
doneOnce = nil
698787
}
699788
}
789+
790+
outs := map[string]interface{}{}
791+
for name, def := range w.def.outputs {
792+
outs[name] = w.tasks[def].result
793+
}
794+
return outs, nil
700795
}
701796

702797
func (w *Workflow) taskArgs(def *taskDefinition) ([]reflect.Value, bool) {
@@ -753,12 +848,47 @@ func runTask(ctx context.Context, workflowID uuid.UUID, listener Listener, state
753848
tctx.Printf("task failed, will retry (%v of %v): %v", state.retryCount+1, MaxRetries, state.err)
754849
state = taskState{
755850
def: state.def,
851+
created: true,
756852
retryCount: state.retryCount + 1,
757853
}
758854
}
759855
return state
760856
}
761857

858+
func runExpansion(d *Definition, state taskState, args []reflect.Value) taskState {
859+
in := append([]reflect.Value{reflect.ValueOf(d)}, args...)
860+
fv := reflect.ValueOf(state.def.f)
861+
out := fv.Call(in)
862+
state.finished = true
863+
if out[0].IsNil() {
864+
state.expanded = d
865+
} else {
866+
state.err = out[0].Interface().(error)
867+
}
868+
return state
869+
}
870+
871+
func (w *Workflow) expand(expanded *Definition) error {
872+
origDef := w.def
873+
w.def = expanded
874+
if err := w.validate(); err != nil {
875+
w.def = origDef
876+
return err
877+
}
878+
for _, def := range w.def.tasks {
879+
if _, ok := w.tasks[def]; ok {
880+
continue
881+
}
882+
// w.tasks[def] = &taskState{def: def}
883+
var err error
884+
w.tasks[def], err = loadTaskState(w.pendingStates, def, true)
885+
if err != nil {
886+
return err
887+
}
888+
}
889+
return nil
890+
}
891+
762892
type defaultListener struct{}
763893

764894
func (s *defaultListener) TaskStateChanged(_ uuid.UUID, _ string, _ *TaskState) error {

0 commit comments

Comments
 (0)