Skip to content

Commit ac2bd83

Browse files
heschigopherbot
authored andcommitted
internal/workflow: add native retry support
Waiting for the workflow to stop before retrying is very annoying. Add support for retrying while the workflow runs. Previously, the workflow stopped when no more tasks were runnable. This raised difficult questions: does the workflow accept retry commands while it's stopped? Should there be a way to force it to do at least one task before giving up? Rather than deal with those, change the behavior: the Run function now returns only on completion or context cancelation. Update the tests to match. As far as I know, relui itself doesn't care. Note, though: there is now no way to resume a stopped workflow. I don't think there's much reason for us to stop them now so I don't think it's a big problem? Fixes golang/go#54304. Change-Id: I4de56a6c50d71dddf0eaafce2e9c135c65e4cfec Reviewed-on: https://go-review.googlesource.com/c/build/+/422098 Reviewed-by: Jenny Rakoczy <[email protected]> TryBot-Result: Gopher Robot <[email protected]> Auto-Submit: Heschi Kreinick <[email protected]> Run-TryBot: Heschi Kreinick <[email protected]>
1 parent aa54a98 commit ac2bd83

File tree

9 files changed

+221
-271
lines changed

9 files changed

+221
-271
lines changed

internal/relui/buildrelease_test.go

+41-7
Original file line numberDiff line numberDiff line change
@@ -300,14 +300,14 @@ func testSecurity(t *testing.T, mergeFixes bool) {
300300
if err != nil {
301301
t.Fatal(err)
302302
}
303-
_, err = w.Run(deps.ctx, &verboseListener{t, deps.outputListener})
304-
if mergeFixes && err != nil {
305-
t.Fatal(err)
306-
}
307-
if !mergeFixes {
308-
if err == nil {
309-
t.Fatal("release succeeded without merging fixes to the public repository")
303+
304+
if mergeFixes {
305+
_, err = w.Run(deps.ctx, &verboseListener{t, deps.outputListener})
306+
if err != nil {
307+
t.Fatal(err)
310308
}
309+
} else {
310+
runToFailure(t, deps.ctx, w, "Check branch state matches source archive", &verboseListener{t, deps.outputListener})
311311
return
312312
}
313313
checkTGZ(t, deps.buildTasks.DownloadURL, deps.publishedFiles, "src.tar.gz", &WebsiteFile{
@@ -818,6 +818,40 @@ func (l *testLogger) Printf(format string, v ...interface{}) {
818818
l.t.Logf("task %-10v: LOG: %s", l.task, fmt.Sprintf(format, v...))
819819
}
820820

821+
func runToFailure(t *testing.T, ctx context.Context, w *workflow.Workflow, task string, wrap workflow.Listener) string {
822+
ctx, cancel := context.WithCancel(ctx)
823+
defer cancel()
824+
t.Helper()
825+
var message string
826+
listener := &errorListener{
827+
taskName: task,
828+
callback: func(m string) {
829+
message = m
830+
cancel()
831+
},
832+
Listener: wrap,
833+
}
834+
_, err := w.Run(ctx, listener)
835+
if err == nil {
836+
t.Fatalf("workflow unexpectedly succeeded")
837+
}
838+
return message
839+
}
840+
841+
type errorListener struct {
842+
taskName string
843+
callback func(string)
844+
workflow.Listener
845+
}
846+
847+
func (l *errorListener) TaskStateChanged(id uuid.UUID, taskID string, st *workflow.TaskState) error {
848+
if st.Name == l.taskName && st.Finished && st.Error != "" {
849+
l.callback(st.Error)
850+
}
851+
l.Listener.TaskStateChanged(id, taskID, st)
852+
return nil
853+
}
854+
821855
// fakeSign acts like a human running the signbinaries job periodically.
822856
func fakeSign(ctx context.Context, t *testing.T, dir string) {
823857
seen := map[string]bool{}

internal/relui/templates/task_list.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
name="task.reset"
8585
type="submit"
8686
value="Retry"
87-
onclick="return this.form.reportValidity() && confirm('This will retry the task and clear workflow errors.\n\nReady to proceed?')" />
87+
onclick="return this.form.reportValidity() && confirm('This will retry the task.\n\nReady to proceed?')" />
8888
</form>
8989
</div>
9090
{{else if and (not .ApprovedAt.Valid) (.ReadyForApproval)}}

internal/relui/web.go

+4-37
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (s *Server) homeHandler(w http.ResponseWriter, r *http.Request) {
154154
}
155155
hr := &homeResponse{SiteHeader: s.header}
156156
for _, w := range ws {
157-
if s.w.running[w.ID.String()] != nil {
157+
if _, ok := s.w.running[w.ID.String()]; ok {
158158
hr.ActiveWorkflows = append(hr.ActiveWorkflows, w)
159159
continue
160160
}
@@ -300,43 +300,10 @@ func (s *Server) retryTaskHandler(w http.ResponseWriter, r *http.Request, params
300300
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
301301
return
302302
}
303-
if err := s.retryTask(r.Context(), id, params.ByName("name")); err != nil {
304-
log.Printf("s.retryTask(_, %q, %q): %v", id, params.ByName("id"), err)
305-
if errors.Is(err, sql.ErrNoRows) || errors.Is(err, pgx.ErrNoRows) {
306-
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
307-
return
308-
}
309-
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
310-
return
303+
if err := s.w.RetryTask(r.Context(), id, params.ByName("name")); err != nil {
304+
log.Printf("s.w.RetryTask(_, %q): %v", id, err)
311305
}
312-
if err := s.w.Resume(r.Context(), id); err != nil {
313-
log.Printf("s.w.Resume(_, %q): %v", id, err)
314-
}
315-
http.Redirect(w, r, s.BaseLink("/"), http.StatusSeeOther)
316-
}
317-
318-
func (s *Server) retryTask(ctx context.Context, id uuid.UUID, name string) error {
319-
return s.db.BeginFunc(ctx, func(tx pgx.Tx) error {
320-
q := db.New(tx)
321-
wf, err := q.Workflow(ctx, id)
322-
if err != nil {
323-
return fmt.Errorf("q.Workflow: %w", err)
324-
}
325-
task, err := q.Task(ctx, db.TaskParams{WorkflowID: id, Name: name})
326-
if err != nil {
327-
return fmt.Errorf("q.Task: %w", err)
328-
}
329-
if _, err := q.ResetTask(ctx, db.ResetTaskParams{WorkflowID: id, Name: name, UpdatedAt: time.Now()}); err != nil {
330-
return fmt.Errorf("q.ResetTask: %w", err)
331-
}
332-
if _, err := q.ResetWorkflow(ctx, db.ResetWorkflowParams{ID: id, UpdatedAt: time.Now()}); err != nil {
333-
return fmt.Errorf("q.ResetWorkflow: %w", err)
334-
}
335-
l := s.w.l.Logger(id, name)
336-
l.Printf("task reset. Previous state: %#v", task)
337-
l.Printf("workflow reset. Previous state: %#v", wf)
338-
return nil
339-
})
306+
http.Redirect(w, r, s.BaseLink("/workflows", id.String()), http.StatusSeeOther)
340307
}
341308

342309
func (s *Server) approveTaskHandler(w http.ResponseWriter, r *http.Request, params httprouter.Params) {

internal/relui/web_test.go

-155
Original file line numberDiff line numberDiff line change
@@ -437,161 +437,6 @@ func TestServerBaseLink(t *testing.T) {
437437
}
438438
}
439439

440-
func TestServerRetryTaskHandler(t *testing.T) {
441-
ctx, cancel := context.WithCancel(context.Background())
442-
defer cancel()
443-
444-
hourAgo := time.Now().Add(-1 * time.Hour)
445-
wfID := uuid.New()
446-
unchangedWorkflow := db.Workflow{
447-
ID: wfID,
448-
Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
449-
Name: nullString(`echo`),
450-
Finished: true,
451-
Output: `{"some": "thing"}`,
452-
Error: "internal explosion",
453-
CreatedAt: hourAgo, // cmpopts.EquateApproxTime
454-
UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
455-
}
456-
457-
cases := []struct {
458-
desc string
459-
params map[string]string
460-
wantCode int
461-
wantHeaders map[string]string
462-
wantWorkflows []db.Workflow
463-
}{
464-
{
465-
desc: "no params",
466-
wantCode: http.StatusNotFound,
467-
wantWorkflows: []db.Workflow{unchangedWorkflow},
468-
},
469-
{
470-
desc: "invalid workflow id",
471-
params: map[string]string{"id": "invalid", "name": "greeting"},
472-
wantCode: http.StatusBadRequest,
473-
wantWorkflows: []db.Workflow{unchangedWorkflow},
474-
},
475-
{
476-
desc: "wrong workflow id",
477-
params: map[string]string{"id": uuid.New().String(), "name": "greeting"},
478-
wantCode: http.StatusNotFound,
479-
wantWorkflows: []db.Workflow{unchangedWorkflow},
480-
},
481-
{
482-
desc: "invalid task name",
483-
params: map[string]string{"id": wfID.String(), "name": "invalid"},
484-
wantCode: http.StatusNotFound,
485-
wantWorkflows: []db.Workflow{
486-
{
487-
ID: wfID,
488-
Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
489-
Name: nullString(`echo`),
490-
Finished: true,
491-
Output: `{"some": "thing"}`,
492-
Error: "internal explosion",
493-
CreatedAt: hourAgo, // cmpopts.EquateApproxTime
494-
UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
495-
},
496-
},
497-
},
498-
{
499-
desc: "successful reset",
500-
params: map[string]string{"id": wfID.String(), "name": "greeting"},
501-
wantCode: http.StatusSeeOther,
502-
wantHeaders: map[string]string{
503-
"Location": "/",
504-
},
505-
wantWorkflows: []db.Workflow{
506-
{
507-
ID: wfID,
508-
Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
509-
Name: nullString(`echo`),
510-
Output: "{}",
511-
CreatedAt: hourAgo, // cmpopts.EquateApproxTime
512-
UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
513-
},
514-
},
515-
},
516-
}
517-
for _, c := range cases {
518-
t.Run(c.desc, func(t *testing.T) {
519-
p := testDB(ctx, t)
520-
q := db.New(p)
521-
522-
wf := db.CreateWorkflowParams{
523-
ID: wfID,
524-
Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
525-
Name: nullString(`echo`),
526-
CreatedAt: hourAgo,
527-
UpdatedAt: hourAgo,
528-
}
529-
if _, err := q.CreateWorkflow(ctx, wf); err != nil {
530-
t.Fatalf("CreateWorkflow(_, %v) = _, %v, wanted no error", wf, err)
531-
}
532-
wff := db.WorkflowFinishedParams{
533-
ID: wf.ID,
534-
Finished: true,
535-
Output: `{"some": "thing"}`,
536-
Error: "internal explosion",
537-
UpdatedAt: hourAgo,
538-
}
539-
if _, err := q.WorkflowFinished(ctx, wff); err != nil {
540-
t.Fatalf("WorkflowFinished(_, %v) = _, %v, wanted no error", wff, err)
541-
}
542-
gtg := db.CreateTaskParams{
543-
WorkflowID: wf.ID,
544-
Name: "greeting",
545-
Finished: true,
546-
Error: nullString("internal explosion"),
547-
CreatedAt: hourAgo,
548-
UpdatedAt: hourAgo,
549-
}
550-
if _, err := q.CreateTask(ctx, gtg); err != nil {
551-
t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", gtg, err)
552-
}
553-
fw := db.CreateTaskParams{
554-
WorkflowID: wf.ID,
555-
Name: "farewell",
556-
Finished: true,
557-
Error: nullString("internal explosion"),
558-
CreatedAt: hourAgo,
559-
UpdatedAt: hourAgo,
560-
}
561-
if _, err := q.CreateTask(ctx, fw); err != nil {
562-
t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", fw, err)
563-
}
564-
565-
req := httptest.NewRequest(http.MethodPost, path.Join("/workflows/", c.params["id"], "tasks", c.params["name"], "retry"), nil)
566-
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
567-
rec := httptest.NewRecorder()
568-
s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{}, nil)
569-
570-
s.m.ServeHTTP(rec, req)
571-
resp := rec.Result()
572-
573-
if resp.StatusCode != c.wantCode {
574-
t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode)
575-
}
576-
for k, v := range c.wantHeaders {
577-
if resp.Header.Get(k) != v {
578-
t.Errorf("resp.Header.Get(%q) = %q, wanted %q", k, resp.Header.Get(k), v)
579-
}
580-
}
581-
if c.wantCode == http.StatusBadRequest {
582-
return
583-
}
584-
wfs, err := q.Workflows(ctx)
585-
if err != nil {
586-
t.Fatalf("q.Workflows() = %v, %v, wanted no error", wfs, err)
587-
}
588-
if diff := cmp.Diff(c.wantWorkflows, wfs, SameUUIDVariant(), cmpopts.EquateApproxTime(time.Minute)); diff != "" {
589-
t.Fatalf("q.Workflows() mismatch (-want +got):\n%s", diff)
590-
}
591-
})
592-
}
593-
}
594-
595440
func TestServerApproveTaskHandler(t *testing.T) {
596441
ctx, cancel := context.WithCancel(context.Background())
597442
defer cancel()

internal/relui/worker.go

+21-7
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ type Listener interface {
2727
WorkflowFinished(ctx context.Context, workflowID uuid.UUID, outputs map[string]interface{}, err error) error
2828
}
2929

30-
type stopFunc func()
31-
3230
// Worker runs workflows, and persists their state.
3331
type Worker struct {
3432
dh *DefinitionHolder
@@ -43,7 +41,12 @@ type Worker struct {
4341
// running is a set of currently running Workflow ids. Run uses
4442
// this set to prevent starting a simultaneous execution of a
4543
// currently running Workflow.
46-
running map[string]stopFunc
44+
running map[string]runningWorkflow
45+
}
46+
47+
type runningWorkflow struct {
48+
w *workflow.Workflow
49+
stop func()
4750
}
4851

4952
// NewWorker returns a Worker ready to accept and run workflows.
@@ -54,7 +57,7 @@ func NewWorker(dh *DefinitionHolder, db db.PGDBTX, l Listener) *Worker {
5457
l: l,
5558
done: make(chan struct{}),
5659
pending: make(chan *workflow.Workflow, 1),
57-
running: make(map[string]stopFunc),
60+
running: make(map[string]runningWorkflow),
5861
}
5962
}
6063

@@ -98,7 +101,7 @@ func (w *Worker) markRunning(wf *workflow.Workflow, stop func()) error {
98101
if _, ok := w.running[wf.ID.String()]; ok {
99102
return fmt.Errorf("workflow %q already running", wf.ID)
100103
}
101-
w.running[wf.ID.String()] = stop
104+
w.running[wf.ID.String()] = runningWorkflow{wf, stop}
102105
return nil
103106
}
104107

@@ -111,11 +114,11 @@ func (w *Worker) markStopped(wf *workflow.Workflow) {
111114
func (w *Worker) cancelWorkflow(id uuid.UUID) bool {
112115
w.mu.Lock()
113116
defer w.mu.Unlock()
114-
stop, ok := w.running[id.String()]
117+
rwf, ok := w.running[id.String()]
115118
if !ok {
116119
return ok
117120
}
118-
stop()
121+
rwf.stop()
119122
return ok
120123
}
121124

@@ -236,3 +239,14 @@ func (w *Worker) Resume(ctx context.Context, id uuid.UUID) error {
236239
}
237240
return w.run(res)
238241
}
242+
243+
// RetryTask retries a task in a running workflow.
244+
func (w *Worker) RetryTask(ctx context.Context, id uuid.UUID, name string) error {
245+
w.mu.Lock()
246+
rwf, ok := w.running[id.String()]
247+
w.mu.Unlock()
248+
if ok {
249+
return fmt.Errorf("no workflow with id %q", id)
250+
}
251+
return rwf.w.RetryTask(ctx, name)
252+
}

internal/relui/worker_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,16 @@ func TestWorkflowResumeRetry(t *testing.T) {
222222
counter++
223223
}
224224
}
225+
if counter > 4 {
226+
return "", nil
227+
}
225228
return "", errors.New("expected")
226229
})
227230
workflow.Output(wd, "nothing", nothing)
228231
dh.RegisterDefinition(t.Name(), wd)
229232

230-
// Run the workflow. It will try the task 3 times and then fail; stop the
231-
// worker during its second run, then resume it and verify the task retries.
233+
// Run the workflow. It will try the task up to 3 times. Stop the worker
234+
// during its second run, then resume it and verify the task retries.
232235
go func() {
233236
for i := 0; i < 3; i++ {
234237
<-blockingChan
@@ -267,9 +270,6 @@ func TestWorkflowResumeRetry(t *testing.T) {
267270
t.Fatalf("w.Resume(_, %v) = %v, wanted no error", wfid, err)
268271
}
269272
<-wfDone
270-
if counter-3 != 2 {
271-
t.Fatalf("task sent %v more times, wanted 2", counter-3)
272-
}
273273
}
274274

275275
func newTestEchoWorkflow() *workflow.Definition {

0 commit comments

Comments
 (0)