Skip to content

Commit 666cc8f

Browse files
rafalbigajtekton-robot
authored andcommitted
Improve DAG validation for pipelines with hundreds of tasks
DAG validation rewritten using Kahn's algorithm to find cycles in task dependencies. Original implementation, as pointed at #5420 is a root cause of poor validation webhook performance, which fails on default timeout (10s).
1 parent f316a2d commit 666cc8f

File tree

3 files changed

+223
-28
lines changed

3 files changed

+223
-28
lines changed

pkg/reconciler/pipeline/dag/dag.go

+63-28
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package dag
1919
import (
2020
"errors"
2121
"fmt"
22+
"sort"
2223
"strings"
2324

2425
"github.com/tektoncd/pipeline/pkg/list"
@@ -79,6 +80,11 @@ func Build(tasks Tasks, deps map[string][]string) (*Graph, error) {
7980
}
8081
}
8182

83+
// Ensure no cycles in the graph
84+
if err := findCyclesInDependencies(deps); err != nil {
85+
return nil, fmt.Errorf("cycle detected; %w", err)
86+
}
87+
8288
// Process all from and runAfter constraints to add task dependency
8389
for pt, taskDeps := range deps {
8490
for _, previousTask := range taskDeps {
@@ -120,41 +126,72 @@ func GetCandidateTasks(g *Graph, doneTasks ...string) (sets.String, error) {
120126
return d, nil
121127
}
122128

123-
func linkPipelineTasks(prev *Node, next *Node) error {
124-
// Check for self cycle
125-
if prev.Task.HashKey() == next.Task.HashKey() {
126-
return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey())
127-
}
128-
// Check if we are adding cycles.
129-
path := []string{next.Task.HashKey(), prev.Task.HashKey()}
130-
if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil {
131-
return fmt.Errorf("cycle detected: %w", err)
132-
}
129+
func linkPipelineTasks(prev *Node, next *Node) {
133130
next.Prev = append(next.Prev, prev)
134131
prev.Next = append(prev.Next, next)
135-
return nil
136132
}
137133

138-
func lookForNode(nodes []*Node, path []string, next string) error {
139-
for _, n := range nodes {
140-
path = append(path, n.Task.HashKey())
141-
if n.Task.HashKey() == next {
142-
return errors.New(getVisitedPath(path))
134+
// use Kahn's algorithm to find cycles in dependencies
135+
func findCyclesInDependencies(deps map[string][]string) error {
136+
independentTasks := sets.NewString()
137+
dag := make(map[string]sets.String, len(deps))
138+
childMap := make(map[string]sets.String, len(deps))
139+
for task, taskDeps := range deps {
140+
if len(taskDeps) == 0 {
141+
continue
143142
}
144-
if err := lookForNode(n.Prev, path, next); err != nil {
145-
return err
143+
dag[task] = sets.NewString(taskDeps...)
144+
for _, dep := range taskDeps {
145+
if len(deps[dep]) == 0 {
146+
independentTasks.Insert(dep)
147+
}
148+
if children, ok := childMap[dep]; ok {
149+
children.Insert(task)
150+
} else {
151+
childMap[dep] = sets.NewString(task)
152+
}
146153
}
147154
}
148-
return nil
155+
156+
for {
157+
parent, ok := independentTasks.PopAny()
158+
if !ok {
159+
break
160+
}
161+
children := childMap[parent]
162+
for {
163+
child, ok := children.PopAny()
164+
if !ok {
165+
break
166+
}
167+
dag[child].Delete(parent)
168+
if dag[child].Len() == 0 {
169+
independentTasks.Insert(child)
170+
delete(dag, child)
171+
}
172+
}
173+
}
174+
175+
return getInterdependencyError(dag)
149176
}
150177

151-
func getVisitedPath(path []string) string {
152-
// Reverse the path since we traversed the Graph using prev pointers.
153-
for i := len(path)/2 - 1; i >= 0; i-- {
154-
opp := len(path) - 1 - i
155-
path[i], path[opp] = path[opp], path[i]
178+
func getInterdependencyError(dag map[string]sets.String) error {
179+
if len(dag) == 0 {
180+
return nil
156181
}
157-
return strings.Join(path, " -> ")
182+
firstChild := ""
183+
for task := range dag {
184+
if firstChild == "" || firstChild > task {
185+
firstChild = task
186+
}
187+
}
188+
deps := dag[firstChild].List()
189+
depNames := make([]string, 0, len(deps))
190+
sort.Strings(deps)
191+
for _, dep := range deps {
192+
depNames = append(depNames, fmt.Sprintf("%q", dep))
193+
}
194+
return fmt.Errorf("task %q depends on %s", firstChild, strings.Join(depNames, ", "))
158195
}
159196

160197
func addLink(pt string, previousTask string, nodes map[string]*Node) error {
@@ -163,9 +200,7 @@ func addLink(pt string, previousTask string, nodes map[string]*Node) error {
163200
return fmt.Errorf("task %s depends on %s but %s wasn't present in Pipeline", pt, previousTask, previousTask)
164201
}
165202
next := nodes[pt]
166-
if err := linkPipelineTasks(prev, next); err != nil {
167-
return fmt.Errorf("couldn't create link from %s to %s: %w", prev.Task.HashKey(), next.Task.HashKey(), err)
168-
}
203+
linkPipelineTasks(prev, next)
169204
return nil
170205
}
171206

pkg/reconciler/pipeline/dag/dag_test.go

+139
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package dag_test
1818

1919
import (
20+
"fmt"
2021
"strings"
2122
"testing"
2223

@@ -665,6 +666,78 @@ func TestBuild_InvalidDAG(t *testing.T) {
665666
}
666667
}
667668

669+
func TestBuildGraphWithHundredsOfTasks_Success(t *testing.T) {
670+
var tasks []v1beta1.PipelineTask
671+
// separate branches with sequential tasks and redundant links (each task explicitly depends on all predecessors)
672+
// b00 - 000 - 001 - ... - 100
673+
// b01 - 000 - 001 - ... - 100
674+
// ..
675+
// b04 - 000 - 001 - ... - 100
676+
nBranches, nTasks := 5, 100
677+
for branchIdx := 0; branchIdx < nBranches; branchIdx++ {
678+
var taskDeps []string
679+
firstTaskName := fmt.Sprintf("b%02d", branchIdx)
680+
firstTask := v1beta1.PipelineTask{
681+
Name: firstTaskName,
682+
TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"},
683+
RunAfter: taskDeps,
684+
}
685+
tasks = append(tasks, firstTask)
686+
taskDeps = append(taskDeps, firstTaskName)
687+
for taskIdx := 0; taskIdx < nTasks; taskIdx++ {
688+
taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx)
689+
task := v1beta1.PipelineTask{
690+
Name: taskName,
691+
TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"},
692+
RunAfter: taskDeps,
693+
}
694+
tasks = append(tasks, task)
695+
taskDeps = append(taskDeps, taskName)
696+
}
697+
}
698+
699+
_, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
700+
if err != nil {
701+
t.Error(err)
702+
}
703+
}
704+
705+
func TestBuildGraphWithHundredsOfTasks_InvalidDAG(t *testing.T) {
706+
var tasks []v1beta1.PipelineTask
707+
// branches with circular interdependencies
708+
nBranches, nTasks := 5, 100
709+
for branchIdx := 0; branchIdx < nBranches; branchIdx++ {
710+
depBranchIdx := branchIdx + 1
711+
if depBranchIdx == nBranches {
712+
depBranchIdx = 0
713+
}
714+
taskDeps := []string{fmt.Sprintf("b%02d", depBranchIdx)}
715+
firstTaskName := fmt.Sprintf("b%02d", branchIdx)
716+
firstTask := v1beta1.PipelineTask{
717+
Name: firstTaskName,
718+
TaskRef: &v1beta1.TaskRef{Name: firstTaskName + "-task"},
719+
RunAfter: taskDeps,
720+
}
721+
tasks = append(tasks, firstTask)
722+
taskDeps = append(taskDeps, firstTaskName)
723+
for taskIdx := 0; taskIdx < nTasks; taskIdx++ {
724+
taskName := fmt.Sprintf("%s-%03d", firstTaskName, taskIdx)
725+
task := v1beta1.PipelineTask{
726+
Name: taskName,
727+
TaskRef: &v1beta1.TaskRef{Name: taskName + "-task"},
728+
RunAfter: taskDeps,
729+
}
730+
tasks = append(tasks, task)
731+
taskDeps = append(taskDeps, taskName)
732+
}
733+
}
734+
735+
_, err := dag.Build(v1beta1.PipelineTaskList(tasks), v1beta1.PipelineTaskList(tasks).Deps())
736+
if err == nil {
737+
t.Errorf("Pipeline.Validate() did not return error for invalid pipeline with cycles")
738+
}
739+
}
740+
668741
func testGraph(t *testing.T) *dag.Graph {
669742
// b a
670743
// | / \
@@ -746,3 +819,69 @@ func assertSameDAG(t *testing.T, l, r *dag.Graph) {
746819
}
747820
}
748821
}
822+
823+
func TestFindCyclesInDependencies(t *testing.T) {
824+
deps := map[string][]string{
825+
"a": {},
826+
"b": {"c", "d"},
827+
"c": {},
828+
"d": {},
829+
}
830+
831+
err := dag.FindCyclesInDependencies(deps)
832+
if err != nil {
833+
t.Error(err)
834+
}
835+
836+
tcs := []struct {
837+
name string
838+
deps map[string][]string
839+
err string
840+
}{{
841+
name: "valid-empty-deps",
842+
deps: map[string][]string{
843+
"a": {},
844+
"b": {"c", "d"},
845+
"c": {},
846+
"d": {},
847+
},
848+
}, {
849+
name: "self-link",
850+
deps: map[string][]string{
851+
"a": {"a"},
852+
},
853+
err: `task "a" depends on "a"`,
854+
}, {
855+
name: "interdependent-tasks",
856+
deps: map[string][]string{
857+
"a": {"b"},
858+
"b": {"a"},
859+
},
860+
err: `task "a" depends on "b"`,
861+
}, {
862+
name: "multiple-cycles",
863+
deps: map[string][]string{
864+
"a": {"b", "c"},
865+
"b": {"a"},
866+
"c": {"d"},
867+
"d": {"a", "b"},
868+
},
869+
err: `task "a" depends on "b", "c"`,
870+
},
871+
}
872+
for _, tc := range tcs {
873+
t.Run(tc.name, func(t *testing.T) {
874+
err := dag.FindCyclesInDependencies(tc.deps)
875+
if tc.err == "" {
876+
if err != nil {
877+
t.Errorf("expected to see no error for valid DAG but had: %v", err)
878+
}
879+
} else {
880+
if err == nil || !strings.Contains(err.Error(), tc.err) {
881+
t.Errorf("expected to see an error: %q for invalid DAG but had: %v", tc.err, err)
882+
}
883+
}
884+
})
885+
}
886+
887+
}
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
Copyright 2022 The Tekton Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package dag
18+
19+
// exports for tests
20+
21+
var FindCyclesInDependencies = findCyclesInDependencies

0 commit comments

Comments
 (0)