Skip to content

Commit 059fdce

Browse files
rafalbigajtekton-robot
authored andcommitted
Improve DAG validation for pipelines with hundreds of tasks
DAG validation rewritten using Kahn's algorithm to find cycles in task dependencies. Original implementation, as pointed at #5420 is a root cause of poor validation webhook performance, which fails on default timeout (10s).
1 parent 9937ec3 commit 059fdce

File tree

4 files changed

+234
-31
lines changed

4 files changed

+234
-31
lines changed

pkg/apis/pipeline/v1beta1/pipeline_validation_test.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -700,9 +700,11 @@ func TestPipelineSpec_Validate_Failure_CycleDAG(t *testing.T) {
700700
name := "invalid pipeline spec with DAG having cyclic dependency"
701701
ps := &PipelineSpec{
702702
Tasks: []PipelineTask{{
703-
Name: "foo", TaskRef: &TaskRef{Name: "foo-task"}, RunAfter: []string{"bar"},
703+
Name: "foo", TaskRef: &TaskRef{Name: "foo-task"}, RunAfter: []string{"baz"},
704704
}, {
705705
Name: "bar", TaskRef: &TaskRef{Name: "bar-task"}, RunAfter: []string{"foo"},
706+
}, {
707+
Name: "baz", TaskRef: &TaskRef{Name: "baz-task"}, RunAfter: []string{"bar"},
706708
}},
707709
}
708710
err := ps.Validate(context.Background())
@@ -1044,9 +1046,15 @@ func TestValidateGraph_Failure(t *testing.T) {
10441046
}, {
10451047
Name: "bar", TaskRef: &TaskRef{Name: "bar-task"}, RunAfter: []string{"foo"},
10461048
}}
1047-
if err := validateGraph(tasks); err == nil {
1049+
expectedError := apis.FieldError{
1050+
Message: `invalid value: cycle detected; task "bar" depends on "foo"`,
1051+
Paths: []string{"tasks"},
1052+
}
1053+
err := validateGraph(tasks)
1054+
if err == nil {
10481055
t.Error("Pipeline.validateGraph() did not return error for invalid DAG of pipeline tasks:", desc)
1049-
1056+
} else if d := cmp.Diff(expectedError.Error(), err.Error(), cmpopts.IgnoreUnexported(apis.FieldError{})); d != "" {
1057+
t.Errorf("Pipeline.validateGraph() errors diff %s", diff.PrintWantGot(d))
10501058
}
10511059
}
10521060

pkg/reconciler/pipeline/dag/dag.go

+63-28
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package dag
1919
import (
2020
"errors"
2121
"fmt"
22+
"sort"
2223
"strings"
2324

2425
"github.com/tektoncd/pipeline/pkg/list"
@@ -79,6 +80,11 @@ func Build(tasks Tasks, deps map[string][]string) (*Graph, error) {
7980
}
8081
}
8182

83+
// Ensure no cycles in the graph
84+
if err := findCyclesInDependencies(deps); err != nil {
85+
return nil, fmt.Errorf("cycle detected; %w", err)
86+
}
87+
8288
// Process all from and runAfter constraints to add task dependency
8389
for pt, taskDeps := range deps {
8490
for _, previousTask := range taskDeps {
@@ -120,41 +126,72 @@ func GetCandidateTasks(g *Graph, doneTasks ...string) (sets.String, error) {
120126
return d, nil
121127
}
122128

123-
func linkPipelineTasks(prev *Node, next *Node) error {
124-
// Check for self cycle
125-
if prev.Task.HashKey() == next.Task.HashKey() {
126-
return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey())
127-
}
128-
// Check if we are adding cycles.
129-
path := []string{next.Task.HashKey(), prev.Task.HashKey()}
130-
if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil {
131-
return fmt.Errorf("cycle detected: %w", err)
132-
}
129+
func linkPipelineTasks(prev *Node, next *Node) {
133130
next.Prev = append(next.Prev, prev)
134131
prev.Next = append(prev.Next, next)
135-
return nil
136132
}
137133

138-
func lookForNode(nodes []*Node, path []string, next string) error {
139-
for _, n := range nodes {
140-
path = append(path, n.Task.HashKey())
141-
if n.Task.HashKey() == next {
142-
return errors.New(getVisitedPath(path))
134+
// use Kahn's algorithm to find cycles in dependencies
135+
func findCyclesInDependencies(deps map[string][]string) error {
136+
independentTasks := sets.NewString()
137+
dag := make(map[string]sets.String, len(deps))
138+
childMap := make(map[string]sets.String, len(deps))
139+
for task, taskDeps := range deps {
140+
if len(taskDeps) == 0 {
141+
continue
143142
}
144-
if err := lookForNode(n.Prev, path, next); err != nil {
145-
return err
143+
dag[task] = sets.NewString(taskDeps...)
144+
for _, dep := range taskDeps {
145+
if len(deps[dep]) == 0 {
146+
independentTasks.Insert(dep)
147+
}
148+
if children, ok := childMap[dep]; ok {
149+
children.Insert(task)
150+
} else {
151+
childMap[dep] = sets.NewString(task)
152+
}
146153
}
147154
}
148-
return nil
155+
156+
for {
157+
parent, ok := independentTasks.PopAny()
158+
if !ok {
159+
break
160+
}
161+
children := childMap[parent]
162+
for {
163+
child, ok := children.PopAny()
164+
if !ok {
165+
break
166+
}
167+
dag[child].Delete(parent)
168+
if dag[child].Len() == 0 {
169+
independentTasks.Insert(child)
170+
delete(dag, child)
171+
}
172+
}
173+
}
174+
175+
return getInterdependencyError(dag)
149176
}
150177

151-
func getVisitedPath(path []string) string {
152-
// Reverse the path since we traversed the Graph using prev pointers.
153-
for i := len(path)/2 - 1; i >= 0; i-- {
154-
opp := len(path) - 1 - i
155-
path[i], path[opp] = path[opp], path[i]
178+
func getInterdependencyError(dag map[string]sets.String) error {
179+
if len(dag) == 0 {
180+
return nil
156181
}
157-
return strings.Join(path, " -> ")
182+
firstChild := ""
183+
for task := range dag {
184+
if firstChild == "" || firstChild > task {
185+
firstChild = task
186+
}
187+
}
188+
deps := dag[firstChild].List()
189+
depNames := make([]string, 0, len(deps))
190+
sort.Strings(deps)
191+
for _, dep := range deps {
192+
depNames = append(depNames, fmt.Sprintf("%q", dep))
193+
}
194+
return fmt.Errorf("task %q depends on %s", firstChild, strings.Join(depNames, ", "))
158195
}
159196

160197
func addLink(pt string, previousTask string, nodes map[string]*Node) error {
@@ -163,9 +200,7 @@ func addLink(pt string, previousTask string, nodes map[string]*Node) error {
163200
return fmt.Errorf("task %s depends on %s but %s wasn't present in Pipeline", pt, previousTask, previousTask)
164201
}
165202
next := nodes[pt]
166-
if err := linkPipelineTasks(prev, next); err != nil {
167-
return fmt.Errorf("couldn't create link from %s to %s: %w", prev.Task.HashKey(), next.Task.HashKey(), err)
168-
}
203+
linkPipelineTasks(prev, next)
169204
return nil
170205
}
171206

pkg/reconciler/pipeline/dag/dag_test.go

+139
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package dag_test
1818

1919
import (
20+
"fmt"
2021
"strings"
2122
"testing"
2223

@@ -535,6 +536,78 @@ func TestBuild_InvalidDAG(t *testing.T) {
535536
}
536537
}
537538

539+
func TestBuildGraphWithHundredsOfTasks_Success(t *testing.T) {
540+
var tasks []v1beta1.PipelineTask
541+
// separate branches with sequential tasks and redundant links (each task explicitly depends on all predecessors)
542+
// b00 - 000 - 001 - ... - 100
543+
// b01 - 000 - 001 - ... - 100
544+
// ..
545+
// b04 - 000 - 001 - ... - 100
546+
nBranches, nTasks := 5, 100
547+
for branchIdx := 0; branchIdx < nBranches; branchIdx++ {
548+
var taskDeps []string
549+
firstTaskName := fmt.Sprintf("b%02d", branchIdx)
550+
firstTask := v1beta1.PipelineTask{
551+
Name: firstTaskName,
552+
TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"},
553+
RunAfter: taskDeps,
554+
}
555+
tasks = append(tasks, firstTask)
556+
taskDeps = append(taskDeps, firstTaskName)
557+
for taskIdx := 0; taskIdx < nTasks; taskIdx++ {
558+
taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx)
559+
task := v1beta1.PipelineTask{
560+
Name: taskName,
561+
TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"},
562+
RunAfter: taskDeps,
563+
}
564+
tasks = append(tasks, task)
565+
taskDeps = append(taskDeps, taskName)
566+
}
567+
}
568+
569+
_, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
570+
if err != nil {
571+
t.Error(err)
572+
}
573+
}
574+
575+
func TestBuildGraphWithHundredsOfTasks_InvalidDAG(t *testing.T) {
576+
var tasks []v1beta1.PipelineTask
577+
// branches with circular interdependencies
578+
nBranches, nTasks := 5, 100
579+
for branchIdx := 0; branchIdx < nBranches; branchIdx++ {
580+
depBranchIdx := branchIdx + 1
581+
if depBranchIdx == nBranches {
582+
depBranchIdx = 0
583+
}
584+
taskDeps := []string{fmt.Sprintf("b%02d", depBranchIdx)}
585+
firstTaskName := fmt.Sprintf("b%02d", branchIdx)
586+
firstTask := v1beta1.PipelineTask{
587+
Name: firstTaskName,
588+
TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"},
589+
RunAfter: taskDeps,
590+
}
591+
tasks = append(tasks, firstTask)
592+
taskDeps = append(taskDeps, firstTaskName)
593+
for taskIdx := 0; taskIdx < nTasks; taskIdx++ {
594+
taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx)
595+
task := v1beta1.PipelineTask{
596+
Name: taskName,
597+
TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"},
598+
RunAfter: taskDeps,
599+
}
600+
tasks = append(tasks, task)
601+
taskDeps = append(taskDeps, taskName)
602+
}
603+
}
604+
605+
_, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
606+
if err == nil {
607+
t.Errorf("Pipeline.Validate() did not return error for invalid pipeline with cycles")
608+
}
609+
}
610+
538611
func testGraph(t *testing.T) *dag.Graph {
539612
// b a
540613
// | / \
@@ -616,3 +689,69 @@ func assertSameDAG(t *testing.T, l, r *dag.Graph) {
616689
}
617690
}
618691
}
692+
693+
func TestFindCyclesInDependencies(t *testing.T) {
694+
deps := map[string][]string{
695+
"a": {},
696+
"b": {"c", "d"},
697+
"c": {},
698+
"d": {},
699+
}
700+
701+
err := dag.FindCyclesInDependencies(deps)
702+
if err != nil {
703+
t.Error(err)
704+
}
705+
706+
tcs := []struct {
707+
name string
708+
deps map[string][]string
709+
err string
710+
}{{
711+
name: "valid-empty-deps",
712+
deps: map[string][]string{
713+
"a": {},
714+
"b": {"c", "d"},
715+
"c": {},
716+
"d": {},
717+
},
718+
}, {
719+
name: "self-link",
720+
deps: map[string][]string{
721+
"a": {"a"},
722+
},
723+
err: `task "a" depends on "a"`,
724+
}, {
725+
name: "interdependent-tasks",
726+
deps: map[string][]string{
727+
"a": {"b"},
728+
"b": {"a"},
729+
},
730+
err: `task "a" depends on "b"`,
731+
}, {
732+
name: "multiple-cycles",
733+
deps: map[string][]string{
734+
"a": {"b", "c"},
735+
"b": {"a"},
736+
"c": {"d"},
737+
"d": {"a", "b"},
738+
},
739+
err: `task "a" depends on "b", "c"`,
740+
},
741+
}
742+
for _, tc := range tcs {
743+
t.Run(tc.name, func(t *testing.T) {
744+
err := dag.FindCyclesInDependencies(tc.deps)
745+
if tc.err == "" {
746+
if err != nil {
747+
t.Errorf("expected to see no error for valid DAG but had: %v", err)
748+
}
749+
} else {
750+
if err == nil || !strings.Contains(err.Error(), tc.err) {
751+
t.Errorf("expected to see an error: %q for invalid DAG but had: %v", tc.err, err)
752+
}
753+
}
754+
})
755+
}
756+
757+
}
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
Copyright 2022 The Tekton Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dag
18+
19+
// exports for tests
20+
21+
var FindCyclesInDependencies = findCyclesInDependencies

0 commit comments

Comments
 (0)