Skip to content

Commit eeb0760

Browse files
rafalbigajafrittoli
authored andcommitted
Improve DAG validation for pipelines with hundreds of tasks
DAG validation rewritten using Kahn's algorithm to find cycles in task dependencies. Original implementation, as pointed at tektoncd#5420 is a root cause of poor validation webhook performance, which fails on default timeout (10s).
1 parent f316a2d commit eeb0760

File tree

3 files changed

+223
-28
lines changed

3 files changed

+223
-28
lines changed

pkg/reconciler/pipeline/dag/dag.go

+63-28
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package dag
1919
import (
2020
"errors"
2121
"fmt"
22+
"sort"
2223
"strings"
2324

2425
"github.com/tektoncd/pipeline/pkg/list"
@@ -79,6 +80,11 @@ func Build(tasks Tasks, deps map[string][]string) (*Graph, error) {
7980
}
8081
}
8182

83+
// Ensure no cycles in the graph
84+
if err := findCyclesInDependencies(deps); err != nil {
85+
return nil, fmt.Errorf("cycle detected; %w", err)
86+
}
87+
8288
// Process all from and runAfter constraints to add task dependency
8389
for pt, taskDeps := range deps {
8490
for _, previousTask := range taskDeps {
@@ -120,41 +126,72 @@ func GetCandidateTasks(g *Graph, doneTasks ...string) (sets.String, error) {
120126
return d, nil
121127
}
122128

123-
func linkPipelineTasks(prev *Node, next *Node) error {
124-
// Check for self cycle
125-
if prev.Task.HashKey() == next.Task.HashKey() {
126-
return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey())
127-
}
128-
// Check if we are adding cycles.
129-
path := []string{next.Task.HashKey(), prev.Task.HashKey()}
130-
if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil {
131-
return fmt.Errorf("cycle detected: %w", err)
132-
}
129+
func linkPipelineTasks(prev *Node, next *Node) {
133130
next.Prev = append(next.Prev, prev)
134131
prev.Next = append(prev.Next, next)
135-
return nil
136132
}
137133

138-
func lookForNode(nodes []*Node, path []string, next string) error {
139-
for _, n := range nodes {
140-
path = append(path, n.Task.HashKey())
141-
if n.Task.HashKey() == next {
142-
return errors.New(getVisitedPath(path))
134+
// use Kahn's algorithm to find cycles in dependencies
135+
func findCyclesInDependencies(deps map[string][]string) error {
136+
independentTasks := sets.NewString()
137+
dag := make(map[string]sets.String, len(deps))
138+
childMap := make(map[string]sets.String, len(deps))
139+
for task, taskDeps := range deps {
140+
if len(taskDeps) == 0 {
141+
continue
143142
}
144-
if err := lookForNode(n.Prev, path, next); err != nil {
145-
return err
143+
dag[task] = sets.NewString(taskDeps...)
144+
for _, dep := range taskDeps {
145+
if len(deps[dep]) == 0 {
146+
independentTasks.Insert(dep)
147+
}
148+
if children, ok := childMap[dep]; ok {
149+
children.Insert(task)
150+
} else {
151+
childMap[dep] = sets.NewString(task)
152+
}
146153
}
147154
}
148-
return nil
155+
156+
for {
157+
parent, ok := independentTasks.PopAny()
158+
if !ok {
159+
break
160+
}
161+
children := childMap[parent]
162+
for {
163+
child, ok := children.PopAny()
164+
if !ok {
165+
break
166+
}
167+
dag[child].Delete(parent)
168+
if dag[child].Len() == 0 {
169+
independentTasks.Insert(child)
170+
delete(dag, child)
171+
}
172+
}
173+
}
174+
175+
return getInterdependencyError(dag)
149176
}
150177

151-
func getVisitedPath(path []string) string {
152-
// Reverse the path since we traversed the Graph using prev pointers.
153-
for i := len(path)/2 - 1; i >= 0; i-- {
154-
opp := len(path) - 1 - i
155-
path[i], path[opp] = path[opp], path[i]
178+
func getInterdependencyError(dag map[string]sets.String) error {
179+
if len(dag) == 0 {
180+
return nil
156181
}
157-
return strings.Join(path, " -> ")
182+
firstChild := ""
183+
for task := range dag {
184+
if firstChild == "" || firstChild > task {
185+
firstChild = task
186+
}
187+
}
188+
deps := dag[firstChild].List()
189+
depNames := make([]string, 0, len(deps))
190+
sort.Strings(deps)
191+
for _, dep := range deps {
192+
depNames = append(depNames, fmt.Sprintf("%q", dep))
193+
}
194+
return fmt.Errorf("task %q depends on %s", firstChild, strings.Join(depNames, ", "))
158195
}
159196

160197
func addLink(pt string, previousTask string, nodes map[string]*Node) error {
@@ -163,9 +200,7 @@ func addLink(pt string, previousTask string, nodes map[string]*Node) error {
163200
return fmt.Errorf("task %s depends on %s but %s wasn't present in Pipeline", pt, previousTask, previousTask)
164201
}
165202
next := nodes[pt]
166-
if err := linkPipelineTasks(prev, next); err != nil {
167-
return fmt.Errorf("couldn't create link from %s to %s: %w", prev.Task.HashKey(), next.Task.HashKey(), err)
168-
}
203+
linkPipelineTasks(prev, next)
169204
return nil
170205
}
171206

pkg/reconciler/pipeline/dag/dag_test.go

+139
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package dag_test
1818

1919
import (
20+
"fmt"
2021
"strings"
2122
"testing"
2223

@@ -665,6 +666,78 @@ func TestBuild_InvalidDAG(t *testing.T) {
665666
}
666667
}
667668

669+
func TestBuildGraphWithHundredsOfTasks_Success(t *testing.T) {
670+
var tasks []v1beta1.PipelineTask
671+
// separate branches with sequential tasks and redundant links (each task explicitly depends on all predecessors)
672+
// b00 - 000 - 001 - ... - 100
673+
// b01 - 000 - 001 - ... - 100
674+
// ..
675+
// b04 - 000 - 001 - ... - 100
676+
nBranches, nTasks := 5, 100
677+
for branchIdx := 0; branchIdx < nBranches; branchIdx++ {
678+
var taskDeps []string
679+
firstTaskName := fmt.Sprintf("b%02d", branchIdx)
680+
firstTask := v1beta1.PipelineTask{
681+
Name: firstTaskName,
682+
TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"},
683+
RunAfter: taskDeps,
684+
}
685+
tasks = append(tasks, firstTask)
686+
taskDeps = append(taskDeps, firstTaskName)
687+
for taskIdx := 0; taskIdx < nTasks; taskIdx++ {
688+
taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx)
689+
task := v1beta1.PipelineTask{
690+
Name: taskName,
691+
TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"},
692+
RunAfter: taskDeps,
693+
}
694+
tasks = append(tasks, task)
695+
taskDeps = append(taskDeps, taskName)
696+
}
697+
}
698+
699+
_, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
700+
if err != nil {
701+
t.Error(err)
702+
}
703+
}
704+
705+
func TestBuildGraphWithHundredsOfTasks_InvalidDAG(t *testing.T) {
706+
var tasks []v1beta1.PipelineTask
707+
// branches with circular interdependencies
708+
nBranches, nTasks := 5, 100
709+
for branchIdx := 0; branchIdx < nBranches; branchIdx++ {
710+
depBranchIdx := branchIdx + 1
711+
if depBranchIdx == nBranches {
712+
depBranchIdx = 0
713+
}
714+
taskDeps := []string{fmt.Sprintf("b%02d", depBranchIdx)}
715+
firstTaskName := fmt.Sprintf("b%02d", branchIdx)
716+
firstTask := v1beta1.PipelineTask{
717+
Name: firstTaskName,
718+
TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"},
719+
RunAfter: taskDeps,
720+
}
721+
tasks = append(tasks, firstTask)
722+
taskDeps = append(taskDeps, firstTaskName)
723+
for taskIdx := 0; taskIdx < nTasks; taskIdx++ {
724+
taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx)
725+
task := v1beta1.PipelineTask{
726+
Name: taskName,
727+
TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"},
728+
RunAfter: taskDeps,
729+
}
730+
tasks = append(tasks, task)
731+
taskDeps = append(taskDeps, taskName)
732+
}
733+
}
734+
735+
_, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
736+
if err == nil {
737+
t.Errorf("Pipeline.Validate() did not return error for invalid pipeline with cycles")
738+
}
739+
}
740+
668741
func testGraph(t *testing.T) *dag.Graph {
669742
// b a
670743
// | / \
@@ -746,3 +819,69 @@ func assertSameDAG(t *testing.T, l, r *dag.Graph) {
746819
}
747820
}
748821
}
822+
823+
func TestFindCyclesInDependencies(t *testing.T) {
824+
deps := map[string][]string{
825+
"a": {},
826+
"b": {"c", "d"},
827+
"c": {},
828+
"d": {},
829+
}
830+
831+
err := dag.FindCyclesInDependencies(deps)
832+
if err != nil {
833+
t.Error(err)
834+
}
835+
836+
tcs := []struct {
837+
name string
838+
deps map[string][]string
839+
err string
840+
}{{
841+
name: "valid-empty-deps",
842+
deps: map[string][]string{
843+
"a": {},
844+
"b": {"c", "d"},
845+
"c": {},
846+
"d": {},
847+
},
848+
}, {
849+
name: "self-link",
850+
deps: map[string][]string{
851+
"a": {"a"},
852+
},
853+
err: `task "a" depends on "a"`,
854+
}, {
855+
name: "interdependent-tasks",
856+
deps: map[string][]string{
857+
"a": {"b"},
858+
"b": {"a"},
859+
},
860+
err: `task "a" depends on "b"`,
861+
}, {
862+
name: "multiple-cycles",
863+
deps: map[string][]string{
864+
"a": {"b", "c"},
865+
"b": {"a"},
866+
"c": {"d"},
867+
"d": {"a", "b"},
868+
},
869+
err: `task "a" depends on "b", "c"`,
870+
},
871+
}
872+
for _, tc := range tcs {
873+
t.Run(tc.name, func(t *testing.T) {
874+
err := dag.FindCyclesInDependencies(tc.deps)
875+
if tc.err == "" {
876+
if err != nil {
877+
t.Errorf("expected to see no error for valid DAG but had: %v", err)
878+
}
879+
} else {
880+
if err == nil || !strings.Contains(err.Error(), tc.err) {
881+
t.Errorf("expected to see an error: %q for invalid DAG but had: %v", tc.err, err)
882+
}
883+
}
884+
})
885+
}
886+
887+
}
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
Copyright 2022 The Tekton Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dag
18+
19+
// exports for tests
20+
21+
var FindCyclesInDependencies = findCyclesInDependencies

0 commit comments

Comments
 (0)