-
Notifications
You must be signed in to change notification settings - Fork 439
/
Copy pathprocess_requirements.go
248 lines (223 loc) · 7.67 KB
/
process_requirements.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package workflow
import (
"context"
"fmt"
"strings"
"github.com/go-gorp/gorp"
"github.com/rockbears/log"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/workermodel"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/interpolate"
)
// processNodeJobRunRequirements returns requirements list interpolated, and true or false if at least
// one requirement is of type "Service"
func processNodeJobRunRequirements(ctx context.Context, db gorp.SqlExecutor, j sdk.Job, run *sdk.WorkflowNodeRun, execsGroupIDs []int64, integrationPlugins []sdk.GRPCPlugin, integrationsConfigs []sdk.IntegrationConfig) (sdk.RequirementList, bool, *sdk.Model, *sdk.MultiError) {
var requirements sdk.RequirementList
var errm sdk.MultiError
var containsService bool
var model string
var tmp = sdk.ParametersToMap(run.BuildParameters)
if defaultOS != "" && defaultArch != "" {
var modelFound, osArchFound bool
for _, req := range j.Action.Requirements {
if req.Type == sdk.ModelRequirement {
modelFound = true
}
if req.Type == sdk.OSArchRequirement {
osArchFound = true
}
}
if !modelFound && !osArchFound {
j.Action.Requirements = append(j.Action.Requirements, sdk.Requirement{
Name: defaultOS + "/" + defaultArch,
Type: sdk.OSArchRequirement,
Value: defaultOS + "/" + defaultArch,
})
}
}
integrationRequirements := make([]sdk.Requirement, 0)
for _, c := range integrationsConfigs {
for k, v := range c {
if v.Type != sdk.IntegrationConfigTypeRegion {
continue
}
integrationRequirements = append(integrationRequirements, sdk.Requirement{
Name: k,
Type: sdk.RegionRequirement,
Value: v.Value,
})
}
}
j.Action.Requirements = append(j.Action.Requirements, integrationRequirements...)
j.Action.Requirements = sdk.RequirementListDeduplicate(j.Action.Requirements)
for _, v := range j.Action.Requirements {
name, errName := interpolate.Do(v.Name, tmp)
if errName != nil {
errm.Append(errName)
continue
}
value, errValue := interpolate.Do(v.Value, tmp)
if errValue != nil {
errm.Append(errValue)
continue
}
if v.Type == sdk.ServiceRequirement {
containsService = true
}
if v.Type == sdk.ModelRequirement {
// It is forbidden to have more than one model requirement.
if j.Action.Enabled && model != "" {
errm.Append(sdk.ErrInvalidJobRequirementDuplicateModel)
break
}
model = value
}
sdk.AddRequirement(&requirements, v.ID, name, v.Type, value)
}
wm, err := processNodeJobRunRequirementsGetModel(ctx, db, model, execsGroupIDs)
if err != nil {
log.Error(ctx, "getNodeJobRunRequirements> error while getting worker model %s: %v", model, err)
errm.Append(err)
}
if wm != nil {
// Check that the worker model has the binaries capabilitites
// only if the worker model doesn't need registration
if !wm.NeedRegistration && !wm.CheckRegistration {
for _, req := range requirements {
if req.Type == sdk.BinaryRequirement {
var hasCapa bool
for _, capa := range wm.RegisteredCapabilities {
if capa.Value == req.Value {
hasCapa = true
break
}
}
if j.Action.Enabled && !hasCapa {
errm.Append(sdk.ErrInvalidJobRequirementWorkerModelCapabilitites)
break
}
}
}
}
}
// Add plugin requirement if needed based on the os/arch of the job
if len(integrationPlugins) > 0 {
var os, arch string
// Compute os/arch values from Model or OSArch requirement
if wm != nil {
if !wm.NeedRegistration && !wm.CheckRegistration {
os = *wm.RegisteredOS
arch = *wm.RegisteredArch
}
} else {
for i := range requirements {
if requirements[i].Type == sdk.OSArchRequirement {
osarch := strings.Split(requirements[i].Value, "/")
if len(osarch) != 2 {
errm.Append(fmt.Errorf("invalid requirement %s", requirements[i].Value))
} else {
os = strings.ToLower(osarch[0])
arch = strings.ToLower(osarch[1])
}
break
}
}
}
// If os/arch values were found adding requirements from plugin binary
if os != "" && arch != "" {
for _, p := range integrationPlugins {
for _, b := range p.Binaries {
if strings.ToLower(b.OS) == os && strings.ToLower(b.Arch) == arch {
for i := range b.Requirements {
sdk.AddRequirement(&requirements, b.Requirements[i].ID, b.Requirements[i].Name, b.Requirements[i].Type, b.Requirements[i].Value)
}
break
}
}
}
}
}
regionRequirementMap := make(map[string]struct{})
for _, r := range requirements {
if r.Type != sdk.RegionRequirement {
continue
}
if _, has := regionRequirementMap[r.Value]; !has {
regionRequirementMap[r.Value] = struct{}{}
}
}
if len(regionRequirementMap) > 1 {
errm.Append(sdk.NewErrorFrom(sdk.ErrInvalidJobRequirement, "Cannot have multiple region requirements %v", regionRequirementMap))
}
if errm.IsEmpty() {
return requirements, containsService, wm, nil
}
return requirements, containsService, wm, &errm
}
func prepareRequirementsToNodeJobRunParameters(reqs sdk.RequirementList) []sdk.Parameter {
params := make([]sdk.Parameter, 0)
for _, r := range reqs {
if r.Type == sdk.ServiceRequirement {
k := fmt.Sprintf("job.requirement.%s.%s", strings.ToLower(r.Type), strings.ToLower(r.Name))
values := strings.Split(r.Value, " ")
if len(values) > 1 {
sdk.AddParameter(¶ms, k+".image", sdk.StringParameter, values[0])
sdk.AddParameter(¶ms, k+".options", sdk.StringParameter, strings.Join(values[1:], " "))
}
}
k := fmt.Sprintf("job.requirement.%s.%s", strings.ToLower(r.Type), strings.ToLower(r.Name))
sdk.AddParameter(¶ms, k, sdk.StringParameter, r.Value)
}
return params
}
func processNodeJobRunRequirementsGetModel(ctx context.Context, db gorp.SqlExecutor, model string, execsGroupIDs []int64) (*sdk.Model, error) {
if model == "" {
return nil, nil
}
var wm *sdk.Model
modelName := strings.Split(model, " ")[0]
modelPath := strings.SplitN(modelName, "/", 2)
if len(modelPath) == 2 {
// if model contains group name (myGroup/myModel), try to find the model for the
g, err := group.LoadByName(ctx, db, modelPath[0])
if err != nil {
if sdk.ErrorIs(err, sdk.ErrNotFound) {
return nil, sdk.NewErrorFrom(sdk.ErrNotFound, "could not find a worker model that match %s", modelName)
}
return nil, err
}
if !sdk.IsInInt64Array(g.ID, execsGroupIDs) {
return nil, sdk.NewErrorFrom(sdk.ErrInvalidJobRequirementWorkerModelPermission, "group %s should have execution permission", g.Name)
}
wm, err = workermodel.LoadByNameAndGroupID(ctx, db, modelPath[1], g.ID, workermodel.LoadOptions.Default)
if err != nil {
if sdk.ErrorIs(err, sdk.ErrNotFound) {
return nil, sdk.NewErrorFrom(sdk.ErrNotFound, "could not find a worker model that match %s", modelName)
}
return nil, err
}
} else {
var err error
// if there is no group info, try to find a shared.infra model for given name
wm, err = workermodel.LoadByNameAndGroupID(ctx, db, modelName, group.SharedInfraGroup.ID, workermodel.LoadOptions.Default)
if err != nil && !sdk.ErrorIs(err, sdk.ErrNotFound) {
return nil, err
}
// if there is no shared.infra model we will try to find one for exec groups, backward compatibility for existing workflow runs.
if wm == nil {
wms, err := workermodel.LoadAllByNameAndGroupIDs(ctx, db, modelName, execsGroupIDs, workermodel.LoadOptions.Default)
if err != nil {
return nil, err
}
if len(wms) > 1 {
return nil, sdk.NewErrorFrom(sdk.ErrNotFound, "invalid given model name \"%s\", missing group name in requirement", modelName)
}
if len(wms) == 0 {
return nil, sdk.NewErrorFrom(sdk.ErrNotFound, "can not find a model with name \"%s\" for workflow's exec groups", modelName)
}
wm = &wms[0]
}
}
return wm, nil
}