-
Notifications
You must be signed in to change notification settings - Fork 439
/
Copy pathprocess_start.go
149 lines (125 loc) · 5.13 KB
/
process_start.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package workflow
import (
"context"
"fmt"
"github.com/rockbears/log"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
)
func processStartFromNode(ctx context.Context, db gorpmapper.SqlExecutorWithTx, store cache.Store, proj sdk.Project,
wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node, startingFromNode *int64, maxsn int64,
hookEvent *sdk.WorkflowNodeRunHookEvent, manual *sdk.WorkflowNodeRunManual) (*ProcessorReport, bool, error) {
report := new(ProcessorReport)
start := mapNodes[*startingFromNode]
if start == nil {
return nil, false, sdk.ErrWorkflowNodeNotFound
}
//Run the node : manual or from an event
nextSubNumber := maxsn
nodeRuns, ok := wr.WorkflowNodeRuns[*startingFromNode]
if ok && len(nodeRuns) > 0 {
nextSubNumber++
}
log.Debug(ctx, "processWorkflowRun> starting from node %v", startingFromNode)
// Find ancestors
nodeIds := start.Ancestors(wr.Workflow.WorkflowData)
sourceNodesRun := make([]*sdk.WorkflowNodeRun, 0, len(nodeIds))
for i := range nodeIds {
nodesRuns, ok := wr.WorkflowNodeRuns[nodeIds[i]]
if ok && len(nodesRuns) > 0 {
sourceNodesRun = append(sourceNodesRun, &nodesRuns[0])
} else {
return nil, false, sdk.ErrWorkflowNodeParentNotRun
}
}
r1, conditionOK, errP := processNodeRun(ctx, db, store, proj, wr, mapNodes, start, int(nextSubNumber), sourceNodesRun, hookEvent, manual)
if errP != nil {
return nil, conditionOK, sdk.WrapError(errP, "processWorkflowRun> Unable to processNodeRun")
}
report.Merge(ctx, r1)
wr.Status = sdk.StatusWaiting
return report, conditionOK, nil
}
func processStartFromRootNode(ctx context.Context, db gorpmapper.SqlExecutorWithTx, store cache.Store, proj sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node, hookEvent *sdk.WorkflowNodeRunHookEvent, manual *sdk.WorkflowNodeRunManual) (*ProcessorReport, bool, error) {
log.Debug(ctx, "processWorkflowRun> starting from the root: %d (pipeline %s)", wr.Workflow.WorkflowData.Node.ID, wr.Workflow.Pipelines[wr.Workflow.WorkflowData.Node.Context.PipelineID].Name)
report := new(ProcessorReport)
//Run the root: manual or from an event
AddWorkflowRunInfo(wr, sdk.SpawnMsgNew(*sdk.MsgWorkflowStarting, wr.Workflow.Name, fmt.Sprintf("%d.%d", wr.Number, 0)))
r1, conditionOK, errP := processNodeRun(ctx, db, store, proj, wr, mapNodes, &wr.Workflow.WorkflowData.Node, 0, nil, hookEvent, manual)
if errP != nil {
return nil, false, sdk.WrapError(errP, "Unable to process workflow node run")
}
report.Merge(ctx, r1)
return report, conditionOK, nil
}
func processAllNodesTriggers(ctx context.Context, db gorpmapper.SqlExecutorWithTx, store cache.Store, proj sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node) (*ProcessorReport, error) {
report := new(ProcessorReport)
//Checks the triggers
for k := range wr.WorkflowNodeRuns {
// only check the last node run
nodeRun := &wr.WorkflowNodeRuns[k][0]
//Trigger only if the node is over (successful or not)
if sdk.StatusIsTerminated(nodeRun.Status) && nodeRun.Status != sdk.StatusNeverBuilt {
//Find the node in the workflow
node := mapNodes[nodeRun.WorkflowNodeID]
r1, _ := processNodeTriggers(ctx, db, store, proj, wr, mapNodes, []*sdk.WorkflowNodeRun{nodeRun}, node, int(nodeRun.SubNumber))
report.Merge(ctx, r1)
}
}
return report, nil
}
func processAllJoins(ctx context.Context, db gorpmapper.SqlExecutorWithTx, store cache.Store, proj sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node) (*ProcessorReport, error) {
report := new(ProcessorReport)
//Checks the joins
for i := range wr.Workflow.WorkflowData.Joins {
j := &wr.Workflow.WorkflowData.Joins[i]
// Find node run
_, has := wr.WorkflowNodeRuns[j.ID]
if has {
continue
}
sources := make([]*sdk.WorkflowNodeRun, 0)
//we have to check noderun for every sources
for _, nodeJoin := range j.JoinContext {
if _, okF := wr.WorkflowNodeRuns[nodeJoin.ParentID]; okF {
// Get latest run on parent
sources = append(sources, &wr.WorkflowNodeRuns[nodeJoin.ParentID][0])
}
}
//now checks if all sources have been completed
var ok = true
sourcesParams := map[string]string{}
for _, nodeRun := range sources {
if nodeRun == nil {
ok = false
break
}
if !sdk.StatusIsTerminated(nodeRun.Status) {
ok = false
break
}
// If there is no conditions on join, keep default condition ( only continue on success )
if j.Context == nil || (len(j.Context.Conditions.PlainConditions) == 0 && j.Context.Conditions.LuaScript == "") {
if nodeRun.Status == sdk.StatusFail || nodeRun.Status == sdk.StatusNeverBuilt || nodeRun.Status == sdk.StatusStopped {
ok = false
break
}
}
//Merge build parameters from all sources
sourcesParams = sdk.ParametersMapMerge(sourcesParams, sdk.ParametersToMap(nodeRun.BuildParameters))
}
if len(sources) != len(j.JoinContext) {
ok = false
}
//All the sources are completed
if ok {
r1, _, err := processNodeRun(ctx, db, store, proj, wr, mapNodes, j, int(wr.LastSubNumber), sources, nil, nil)
if err != nil {
return report, sdk.WrapError(err, "unable to process join node")
}
report.Merge(ctx, r1)
}
}
return report, nil
}