Skip to content

Commit ce9c916

Browse files
rafalbigajtekton-robot
authored andcommitted
Improve DAG validation for pipelines with hundreds of tasks
DAG validation rewritten using Kahn's algorithm to find cycles in task dependencies. Original implementation, as pointed at #5420 is a root cause of poor validation webhook performance, which fails on default timeout (10s).
1 parent 2760764 commit ce9c916

File tree

4 files changed

+234
-31
lines changed

4 files changed

+234
-31
lines changed

pkg/apis/pipeline/v1beta1/pipeline_validation_test.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -700,9 +700,11 @@ func TestPipelineSpec_Validate_Failure_CycleDAG(t *testing.T) {
700700
name := "invalid pipeline spec with DAG having cyclic dependency"
701701
ps := &PipelineSpec{
702702
Tasks: []PipelineTask{{
703-
Name: "foo", TaskRef: &TaskRef{Name: "foo-task"}, RunAfter: []string{"bar"},
703+
Name: "foo", TaskRef: &TaskRef{Name: "foo-task"}, RunAfter: []string{"baz"},
704704
}, {
705705
Name: "bar", TaskRef: &TaskRef{Name: "bar-task"}, RunAfter: []string{"foo"},
706+
}, {
707+
Name: "baz", TaskRef: &TaskRef{Name: "baz-task"}, RunAfter: []string{"bar"},
706708
}},
707709
}
708710
err := ps.Validate(context.Background())
@@ -1086,9 +1088,15 @@ func TestValidateGraph_Failure(t *testing.T) {
10861088
}, {
10871089
Name: "bar", TaskRef: &TaskRef{Name: "bar-task"}, RunAfter: []string{"foo"},
10881090
}}
1089-
if err := validateGraph(tasks); err == nil {
1091+
expectedError := apis.FieldError{
1092+
Message: `invalid value: cycle detected; task "bar" depends on "foo"`,
1093+
Paths: []string{"tasks"},
1094+
}
1095+
err := validateGraph(tasks)
1096+
if err == nil {
10901097
t.Error("Pipeline.validateGraph() did not return error for invalid DAG of pipeline tasks:", desc)
1091-
1098+
} else if d := cmp.Diff(expectedError.Error(), err.Error(), cmpopts.IgnoreUnexported(apis.FieldError{})); d != "" {
1099+
t.Errorf("Pipeline.validateGraph() errors diff %s", diff.PrintWantGot(d))
10921100
}
10931101
}
10941102

pkg/reconciler/pipeline/dag/dag.go

+63-28
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package dag
1919
import (
2020
"errors"
2121
"fmt"
22+
"sort"
2223
"strings"
2324

2425
"github.com/tektoncd/pipeline/pkg/list"
@@ -79,6 +80,11 @@ func Build(tasks Tasks, deps map[string][]string) (*Graph, error) {
7980
}
8081
}
8182

83+
// Ensure no cycles in the graph
84+
if err := findCyclesInDependencies(deps); err != nil {
85+
return nil, fmt.Errorf("cycle detected; %w", err)
86+
}
87+
8288
// Process all from and runAfter constraints to add task dependency
8389
for pt, taskDeps := range deps {
8490
for _, previousTask := range taskDeps {
@@ -120,41 +126,72 @@ func GetCandidateTasks(g *Graph, doneTasks ...string) (sets.String, error) {
120126
return d, nil
121127
}
122128

123-
func linkPipelineTasks(prev *Node, next *Node) error {
124-
// Check for self cycle
125-
if prev.Task.HashKey() == next.Task.HashKey() {
126-
return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey())
127-
}
128-
// Check if we are adding cycles.
129-
path := []string{next.Task.HashKey(), prev.Task.HashKey()}
130-
if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil {
131-
return fmt.Errorf("cycle detected: %w", err)
132-
}
129+
func linkPipelineTasks(prev *Node, next *Node) {
133130
next.Prev = append(next.Prev, prev)
134131
prev.Next = append(prev.Next, next)
135-
return nil
136132
}
137133

138-
func lookForNode(nodes []*Node, path []string, next string) error {
139-
for _, n := range nodes {
140-
path = append(path, n.Task.HashKey())
141-
if n.Task.HashKey() == next {
142-
return errors.New(getVisitedPath(path))
134+
// use Kahn's algorithm to find cycles in dependencies
135+
func findCyclesInDependencies(deps map[string][]string) error {
136+
independentTasks := sets.NewString()
137+
dag := make(map[string]sets.String, len(deps))
138+
childMap := make(map[string]sets.String, len(deps))
139+
for task, taskDeps := range deps {
140+
if len(taskDeps) == 0 {
141+
continue
143142
}
144-
if err := lookForNode(n.Prev, path, next); err != nil {
145-
return err
143+
dag[task] = sets.NewString(taskDeps...)
144+
for _, dep := range taskDeps {
145+
if len(deps[dep]) == 0 {
146+
independentTasks.Insert(dep)
147+
}
148+
if children, ok := childMap[dep]; ok {
149+
children.Insert(task)
150+
} else {
151+
childMap[dep] = sets.NewString(task)
152+
}
146153
}
147154
}
148-
return nil
155+
156+
for {
157+
parent, ok := independentTasks.PopAny()
158+
if !ok {
159+
break
160+
}
161+
children := childMap[parent]
162+
for {
163+
child, ok := children.PopAny()
164+
if !ok {
165+
break
166+
}
167+
dag[child].Delete(parent)
168+
if dag[child].Len() == 0 {
169+
independentTasks.Insert(child)
170+
delete(dag, child)
171+
}
172+
}
173+
}
174+
175+
return getInterdependencyError(dag)
149176
}
150177

151-
func getVisitedPath(path []string) string {
152-
// Reverse the path since we traversed the Graph using prev pointers.
153-
for i := len(path)/2 - 1; i >= 0; i-- {
154-
opp := len(path) - 1 - i
155-
path[i], path[opp] = path[opp], path[i]
178+
func getInterdependencyError(dag map[string]sets.String) error {
179+
if len(dag) == 0 {
180+
return nil
156181
}
157-
return strings.Join(path, " -> ")
182+
firstChild := ""
183+
for task := range dag {
184+
if firstChild == "" || firstChild > task {
185+
firstChild = task
186+
}
187+
}
188+
deps := dag[firstChild].List()
189+
depNames := make([]string, 0, len(deps))
190+
sort.Strings(deps)
191+
for _, dep := range deps {
192+
depNames = append(depNames, fmt.Sprintf("%q", dep))
193+
}
194+
return fmt.Errorf("task %q depends on %s", firstChild, strings.Join(depNames, ", "))
158195
}
159196

160197
func addLink(pt string, previousTask string, nodes map[string]*Node) error {
@@ -163,9 +200,7 @@ func addLink(pt string, previousTask string, nodes map[string]*Node) error {
163200
return fmt.Errorf("task %s depends on %s but %s wasn't present in Pipeline", pt, previousTask, previousTask)
164201
}
165202
next := nodes[pt]
166-
if err := linkPipelineTasks(prev, next); err != nil {
167-
return fmt.Errorf("couldn't create link from %s to %s: %w", prev.Task.HashKey(), next.Task.HashKey(), err)
168-
}
203+
linkPipelineTasks(prev, next)
169204
return nil
170205
}
171206

pkg/reconciler/pipeline/dag/dag_test.go

+139
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package dag_test
1818

1919
import (
20+
"fmt"
2021
"strings"
2122
"testing"
2223

@@ -549,6 +550,78 @@ func TestBuild_InvalidDAG(t *testing.T) {
549550
}
550551
}
551552

553+
func TestBuildGraphWithHundredsOfTasks_Success(t *testing.T) {
554+
var tasks []v1beta1.PipelineTask
555+
// separate branches with sequential tasks and redundant links (each task explicitly depends on all predecessors)
556+
// b00 - 000 - 001 - ... - 100
557+
// b01 - 000 - 001 - ... - 100
558+
// ..
559+
// b04 - 000 - 001 - ... - 100
560+
nBranches, nTasks := 5, 100
561+
for branchIdx := 0; branchIdx < nBranches; branchIdx++ {
562+
var taskDeps []string
563+
firstTaskName := fmt.Sprintf("b%02d", branchIdx)
564+
firstTask := v1beta1.PipelineTask{
565+
Name: firstTaskName,
566+
TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"},
567+
RunAfter: taskDeps,
568+
}
569+
tasks = append(tasks, firstTask)
570+
taskDeps = append(taskDeps, firstTaskName)
571+
for taskIdx := 0; taskIdx < nTasks; taskIdx++ {
572+
taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx)
573+
task := v1beta1.PipelineTask{
574+
Name: taskName,
575+
TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"},
576+
RunAfter: taskDeps,
577+
}
578+
tasks = append(tasks, task)
579+
taskDeps = append(taskDeps, taskName)
580+
}
581+
}
582+
583+
_, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
584+
if err != nil {
585+
t.Error(err)
586+
}
587+
}
588+
589+
func TestBuildGraphWithHundredsOfTasks_InvalidDAG(t *testing.T) {
590+
var tasks []v1beta1.PipelineTask
591+
// branches with circular interdependencies
592+
nBranches, nTasks := 5, 100
593+
for branchIdx := 0; branchIdx < nBranches; branchIdx++ {
594+
depBranchIdx := branchIdx + 1
595+
if depBranchIdx == nBranches {
596+
depBranchIdx = 0
597+
}
598+
taskDeps := []string{fmt.Sprintf("b%02d", depBranchIdx)}
599+
firstTaskName := fmt.Sprintf("b%02d", branchIdx)
600+
firstTask := v1beta1.PipelineTask{
601+
Name: firstTaskName,
602+
TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"},
603+
RunAfter: taskDeps,
604+
}
605+
tasks = append(tasks, firstTask)
606+
taskDeps = append(taskDeps, firstTaskName)
607+
for taskIdx := 0; taskIdx < nTasks; taskIdx++ {
608+
taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx)
609+
task := v1beta1.PipelineTask{
610+
Name: taskName,
611+
TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"},
612+
RunAfter: taskDeps,
613+
}
614+
tasks = append(tasks, task)
615+
taskDeps = append(taskDeps, taskName)
616+
}
617+
}
618+
619+
_, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
620+
if err == nil {
621+
t.Errorf("Pipeline.Validate() did not return error for invalid pipeline with cycles")
622+
}
623+
}
624+
552625
func testGraph(t *testing.T) *dag.Graph {
553626
// b a
554627
// | / \
@@ -630,3 +703,69 @@ func assertSameDAG(t *testing.T, l, r *dag.Graph) {
630703
}
631704
}
632705
}
706+
707+
func TestFindCyclesInDependencies(t *testing.T) {
708+
deps := map[string][]string{
709+
"a": {},
710+
"b": {"c", "d"},
711+
"c": {},
712+
"d": {},
713+
}
714+
715+
err := dag.FindCyclesInDependencies(deps)
716+
if err != nil {
717+
t.Error(err)
718+
}
719+
720+
tcs := []struct {
721+
name string
722+
deps map[string][]string
723+
err string
724+
}{{
725+
name: "valid-empty-deps",
726+
deps: map[string][]string{
727+
"a": {},
728+
"b": {"c", "d"},
729+
"c": {},
730+
"d": {},
731+
},
732+
}, {
733+
name: "self-link",
734+
deps: map[string][]string{
735+
"a": {"a"},
736+
},
737+
err: `task "a" depends on "a"`,
738+
}, {
739+
name: "interdependent-tasks",
740+
deps: map[string][]string{
741+
"a": {"b"},
742+
"b": {"a"},
743+
},
744+
err: `task "a" depends on "b"`,
745+
}, {
746+
name: "multiple-cycles",
747+
deps: map[string][]string{
748+
"a": {"b", "c"},
749+
"b": {"a"},
750+
"c": {"d"},
751+
"d": {"a", "b"},
752+
},
753+
err: `task "a" depends on "b", "c"`,
754+
},
755+
}
756+
for _, tc := range tcs {
757+
t.Run(tc.name, func(t *testing.T) {
758+
err := dag.FindCyclesInDependencies(tc.deps)
759+
if tc.err == "" {
760+
if err != nil {
761+
t.Errorf("expected to see no error for valid DAG but had: %v", err)
762+
}
763+
} else {
764+
if err == nil || !strings.Contains(err.Error(), tc.err) {
765+
t.Errorf("expected to see an error: %q for invalid DAG but had: %v", tc.err, err)
766+
}
767+
}
768+
})
769+
}
770+
771+
}
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
Copyright 2022 The Tekton Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dag
18+
19+
// exports for tests
20+
21+
var FindCyclesInDependencies = findCyclesInDependencies

0 commit comments

Comments
 (0)