Skip to content

Commit de235b1

Browse files
author
Stan Lagun
committed
Performance improvements
* Removes one second delay before scheduler starts to check if dependent resources can be created. This drastically speeds up unit tests and subsequent deployments when resources already exist and thus resource is created instantly * Caches dependencies and definitions during deployment. When one flow is consumed from another, they were all fetched from k8s for each replica of the inner flow + once for the outer flow. Now it happens only once. It also makes deployment more consistent in case if definitions were modified in k8s in the middle of deployment * Check dependency graph for cycles only once per deployment
1 parent 5730a94 commit de235b1

File tree

3 files changed

+30
-12
lines changed

3 files changed

+30
-12
lines changed

pkg/scheduler/dependency_graph.go

+23-12
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,21 @@ func (d *sortableDependencyList) Swap(i, j int) {
123123
d.Items[i], d.Items[j] = d.Items[j], d.Items[i]
124124
}
125125

126-
func (sched *scheduler) getDependencies() ([]client.Dependency, error) {
126+
func (sched *scheduler) getDependencies(silent bool) ([]client.Dependency, error) {
127+
if sched.dependencyCache != nil {
128+
return sched.dependencyCache, nil
129+
}
130+
if !silent {
131+
log.Println("Getting dependencies")
132+
}
127133
depList, err := sched.client.Dependencies().List(api.ListOptions{LabelSelector: sched.selector})
128134
if err != nil {
129135
return nil, err
130136
}
131137
sortableDepList := sortableDependencyList(*depList)
132138
sort.Stable(&sortableDepList)
133139

140+
sched.dependencyCache = sortableDepList.Items
134141
return sortableDepList.Items, nil
135142

136143
}
@@ -184,7 +191,13 @@ func getResourceName(resourceDefinition client.ResourceDefinition) (string, stri
184191
return "", ""
185192
}
186193

187-
func (sched *scheduler) getResourceDefinitions() (map[string]client.ResourceDefinition, error) {
194+
func (sched *scheduler) getResourceDefinitions(silent bool) (map[string]client.ResourceDefinition, error) {
195+
if sched.resDefsCache != nil {
196+
return sched.resDefsCache, nil
197+
}
198+
if !silent {
199+
log.Println("Getting resource definitions")
200+
}
188201
resDefList, err := sched.client.ResourceDefinitions().List(api.ListOptions{LabelSelector: sched.selector})
189202
if err != nil {
190203
return nil, err
@@ -197,6 +210,7 @@ func (sched *scheduler) getResourceDefinitions() (map[string]client.ResourceDefi
197210
}
198211
result[kind+"/"+name] = resDef
199212
}
213+
sched.resDefsCache = result
200214
return result, nil
201215
}
202216

@@ -567,10 +581,7 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO
567581
options.FlowName = interfaces.DefaultFlowName
568582
}
569583

570-
if !options.Silent {
571-
log.Println("Getting resource definitions")
572-
}
573-
resDefs, err := sched.getResourceDefinitions()
584+
resDefs, err := sched.getResourceDefinitions(options.Silent)
574585
if err != nil {
575586
return nil, err
576587
}
@@ -595,19 +606,19 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO
595606
return nil, err
596607
}
597608

598-
if !options.Silent {
599-
log.Println("Getting dependencies")
600-
}
601-
depList, err := sched.getDependencies()
609+
depList, err := sched.getDependencies(options.Silent)
602610
if err != nil {
603611
return nil, err
604612
}
605613

606614
if !options.Silent {
607615
log.Println("Making sure there is no cycles in the dependency graph")
608616
}
609-
if err = EnsureNoCycles(depList, resDefs); err != nil {
610-
return nil, err
617+
if !sched.graphHasNoCycles {
618+
if err = EnsureNoCycles(depList, resDefs); err != nil {
619+
return nil, err
620+
}
621+
sched.graphHasNoCycles = true
611622
}
612623

613624
dependencies := groupDependencies(depList, resDefs)

pkg/scheduler/frontend.go

+4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ type scheduler struct {
3030
client client.Interface
3131
selector labels.Selector
3232
concurrency int
33+
34+
resDefsCache map[string]client.ResourceDefinition
35+
dependencyCache []client.Dependency
36+
graphHasNoCycles bool
3337
}
3438

3539
var _ interfaces.Scheduler = &scheduler{}

pkg/scheduler/scheduler.go

+3
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ func createResources(toCreate chan *scheduledResource, finished chan<- *schedule
227227
for _, reqKey := range r.requiredBy {
228228
req := r.context.graph.graph[reqKey]
229229
go func(req *scheduledResource, toCreate chan<- *scheduledResource) {
230+
if req.requestCreation(toCreate) {
231+
return
232+
}
230233
ticker := time.NewTicker(CheckInterval)
231234
log.Printf("Requesting creation of dependency %v", req.Key())
232235
for {

0 commit comments

Comments
 (0)