-
Notifications
You must be signed in to change notification settings - Fork 439
/
Copy pathdao_node_run_job.go
395 lines (356 loc) · 11.7 KB
/
dao_node_run_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
package workflow
import (
"context"
"database/sql"
"strconv"
"time"
"github.com/go-gorp/gorp"
"github.com/lib/pq"
"github.com/rockbears/log"
"github.com/ovh/cds/engine/api/database/gorpmapping"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/telemetry"
)
// QueueFilter contains all criteria used to fetch queue
type QueueFilter struct {
ModelType []string
Rights int
Since *time.Time
Until *time.Time
Limit *int
Statuses []string
Regions []string
}
func NewQueueFilter() QueueFilter {
now := time.Now()
return QueueFilter{
ModelType: sdk.AvailableWorkerModelType,
Rights: sdk.PermissionRead,
Since: new(time.Time),
Until: &now,
Statuses: []string{sdk.StatusWaiting},
}
}
// CountNodeJobRunQueue count all workflow_node_run_job accessible
func CountNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filter QueueFilter) (sdk.WorkflowNodeJobRunCount, error) {
var c sdk.WorkflowNodeJobRunCount
queue, err := LoadNodeJobRunQueue(ctx, db, store, filter)
if err != nil {
return c, sdk.WrapError(err, "unable to load queue")
}
c.Count = int64(len(queue))
if filter.Since != nil {
c.Since = *filter.Since
}
if filter.Until != nil {
c.Until = *filter.Until
}
return c, nil
}
func CountNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filter QueueFilter, groupIDs []int64) (sdk.WorkflowNodeJobRunCount, error) {
var c sdk.WorkflowNodeJobRunCount
queue, err := LoadNodeJobRunQueueByGroupIDs(ctx, db, store, filter, groupIDs)
if err != nil {
return c, sdk.WrapError(err, "unable to load queue")
}
c.Count = int64(len(queue))
if filter.Since != nil {
c.Since = *filter.Since
}
if filter.Until != nil {
c.Until = *filter.Until
}
return c, nil
}
// LoadNodeJobRunQueue load all workflow_node_run_job accessible
func LoadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filter QueueFilter) ([]sdk.WorkflowNodeJobRun, error) {
ctx, end := telemetry.Span(ctx, "workflow.LoadNodeJobRunQueue")
defer end()
query := gorpmapping.NewQuery(`select distinct workflow_node_run_job.*
from workflow_node_run_job
where workflow_node_run_job.queued >= $1
and workflow_node_run_job.queued <= $2
and workflow_node_run_job.status = ANY($3)
AND (model_type is NULL OR model_type = '' OR model_type = ANY($4))
AND (
workflow_node_run_job.region = ANY($5)
OR
(workflow_node_run_job.region is NULL AND '' = ANY($5))
OR
array_length($5, 1) is NULL
)
ORDER BY workflow_node_run_job.queued ASC
`).Args(
*filter.Since, // $1
*filter.Until, // $2
pq.StringArray(filter.Statuses), // $3
pq.StringArray(filter.ModelType), // $4
pq.StringArray(filter.Regions), // $5
)
return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit)
}
// LoadNodeJobRunQueueByGroupIDs load all workflow_node_run_job accessible
func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, store cache.Store, filter QueueFilter, groupIDs []int64) ([]sdk.WorkflowNodeJobRun, error) {
ctx, end := telemetry.Span(ctx, "workflow.LoadNodeJobRunQueueByGroups")
defer end()
query := gorpmapping.NewQuery(`
-- Parameters:
-- $1: Queue since
-- $2: Queue until
-- $3: List of status
-- $4: List of model types
-- $5: Comman separated list of groups ID
-- $6: shared infra group ID
-- $7: minimum level of permission
-- $8: List of regions
WITH workflow_id_with_permissions AS (
SELECT workflow_perm.workflow_id,
CASE WHEN $6 = ANY(string_to_array($5, ',')::int[]) THEN 7
ELSE max(workflow_perm.role)
END as "role"
FROM workflow_perm
JOIN project_group ON project_group.id = workflow_perm.project_group_id
WHERE
project_group.group_id = ANY(string_to_array($5, ',')::int[])
OR
$6 = ANY(string_to_array($5, ',')::int[])
GROUP BY workflow_perm.workflow_id
), workflow_node_run_job_exec_groups AS (
SELECT id, jsonb_array_elements_text(exec_groups)::jsonb->'id' AS exec_group_id
FROM workflow_node_run_job
), workflow_node_run_job_matching_exec_groups AS (
SELECT id
FROM workflow_node_run_job_exec_groups
WHERE exec_group_id::text = ANY(string_to_array($5, ','))
)
SELECT DISTINCT workflow_node_run_job.*
FROM workflow_node_run_job
JOIN workflow_node_run ON workflow_node_run.id = workflow_node_run_job.workflow_node_run_id
JOIN workflow_run ON workflow_run.id = workflow_node_run.workflow_run_id
JOIN workflow ON workflow.id = workflow_run.workflow_id
WHERE workflow.id IN (
SELECT workflow_id
FROM workflow_id_with_permissions
WHERE role >= $7
)
AND workflow_node_run_job.id IN (
SELECT id
FROM workflow_node_run_job_matching_exec_groups
)
AND workflow_node_run_job.queued >= $1
AND workflow_node_run_job.queued <= $2
AND workflow_node_run_job.status = ANY($3)
AND (
workflow_node_run_job.model_type is NULL
OR
model_type = '' OR model_type = ANY($4)
)
AND (
workflow_node_run_job.region = ANY($8)
OR
(workflow_node_run_job.region is NULL AND '' = ANY($8))
OR
array_length($8, 1) is NULL
)
ORDER BY workflow_node_run_job.queued ASC
`).Args(
*filter.Since, // $1
*filter.Until, // $2
pq.StringArray(filter.Statuses), // $3
pq.StringArray(filter.ModelType), // $4
gorpmapping.IDsToQueryString(groupIDs), // $5
group.SharedInfraGroup.ID, // $6
filter.Rights, // $7
pq.StringArray(filter.Regions), // $8
)
return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit)
}
func loadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, query gorpmapping.Query, limit *int) ([]sdk.WorkflowNodeJobRun, error) {
ctx, end := telemetry.Span(ctx, "workflow.loadNodeJobRunQueue")
defer end()
if limit != nil && *limit > 0 {
query = query.Limit(*limit)
}
var sqlJobs []JobRun
if err := gorpmapping.GetAll(ctx, db, query, &sqlJobs); err != nil {
return nil, sdk.WrapError(err, "Unable to load job runs (Select)")
}
jobs := make([]sdk.WorkflowNodeJobRun, 0, len(sqlJobs))
for i := range sqlJobs {
getHatcheryInfo(ctx, store, &sqlJobs[i])
jr, err := sqlJobs[i].WorkflowNodeRunJob()
if err != nil {
log.Error(ctx, "LoadNodeJobRunQueue> WorkflowNodeRunJob error: %v", err)
continue
}
jobs = append(jobs, jr)
}
return jobs, nil
}
// LoadNodeJobRunIDByNodeRunID Load node run job id by node run id
func LoadNodeJobRunIDByNodeRunID(db gorp.SqlExecutor, runNodeID int64) ([]int64, error) {
query := `SELECT workflow_node_run_job.id FROM workflow_node_run_job WHERE workflow_node_run_id = $1`
rows, err := db.Query(query, runNodeID)
if err != nil {
return nil, err
}
defer rows.Close()
var ids []int64
for rows.Next() {
var id int64
if err := rows.Scan(&id); err != nil {
return nil, err
}
ids = append(ids, id)
}
return ids, nil
}
//LoadNodeJobRun load a NodeJobRun given its ID
func LoadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) {
j := JobRun{}
query := `select workflow_node_run_job.* from workflow_node_run_job where id = $1`
if err := db.SelectOne(&j, query, id); err != nil {
if err == sql.ErrNoRows {
return nil, sdk.WithStack(sdk.ErrWorkflowNodeRunJobNotFound)
}
if errPG, ok := err.(*pq.Error); ok && errPG.Code == "55P03" {
return nil, sdk.WithStack(sdk.ErrJobLocked)
}
return nil, sdk.WithStack(err)
}
if store != nil {
getHatcheryInfo(ctx, store, &j)
}
jr, err := j.WorkflowNodeRunJob()
if err != nil {
return nil, err
}
return &jr, nil
}
//LoadDeadNodeJobRun load a NodeJobRun which is Building but without worker
func LoadDeadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store) ([]sdk.WorkflowNodeJobRun, error) {
var deadJobsDB []JobRun
query := `SELECT workflow_node_run_job.* FROM workflow_node_run_job WHERE worker_id IS NULL`
if _, err := db.Select(&deadJobsDB, query); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
deadJobs := make([]sdk.WorkflowNodeJobRun, len(deadJobsDB))
for i, deadJob := range deadJobsDB {
if store != nil {
getHatcheryInfo(ctx, store, &deadJob)
}
jr, err := deadJob.WorkflowNodeRunJob()
if err != nil {
return nil, err
}
deadJobs[i] = jr
}
return deadJobs, nil
}
//LoadAndLockNodeJobRunWait load for update a NodeJobRun given its ID
func LoadAndLockNodeJobRunWait(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) {
j := JobRun{}
query := `select workflow_node_run_job.* from workflow_node_run_job where id = $1 for update`
if err := db.SelectOne(&j, query, id); err != nil {
return nil, err
}
getHatcheryInfo(ctx, store, &j)
jr, err := j.WorkflowNodeRunJob()
if err != nil {
return nil, err
}
return &jr, nil
}
//LoadAndLockNodeJobRunSkipLocked load for update a NodeJobRun given its ID
func LoadAndLockNodeJobRunSkipLocked(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) {
var end func()
_, end = telemetry.Span(ctx, "workflow.LoadAndLockNodeJobRunSkipLocked")
defer end()
j := JobRun{}
query := `select workflow_node_run_job.* from workflow_node_run_job where id = $1 for update SKIP LOCKED`
if err := db.SelectOne(&j, query, id); err != nil {
if err == sql.ErrNoRows {
return nil, sdk.WithStack(sdk.ErrLocked)
}
return nil, err
}
getHatcheryInfo(ctx, store, &j)
jr, err := j.WorkflowNodeRunJob()
if err != nil {
return nil, err
}
return &jr, nil
}
func insertWorkflowNodeJobRun(db gorp.SqlExecutor, j *sdk.WorkflowNodeJobRun) error {
dbj := new(JobRun)
err := dbj.ToJobRun(j)
if err != nil {
return err
}
if err := db.Insert(dbj); err != nil {
return err
}
j.ID = dbj.ID
return nil
}
//DeleteNodeJobRuns deletes all workflow_node_run_job for a given workflow_node_run
func DeleteNodeJobRuns(db gorp.SqlExecutor, nodeID int64) error {
query := `delete from workflow_node_run_job where workflow_node_run_id = $1`
_, err := db.Exec(query, nodeID)
return err
}
// DeleteNodeJobRun deletes the given workflow_node_run_job
func DeleteNodeJobRun(db gorp.SqlExecutor, nodeRunJob int64) error {
query := `delete from workflow_node_run_job where id = $1`
_, err := db.Exec(query, nodeRunJob)
return err
}
//UpdateNodeJobRun updates a workflow_node_run_job
func UpdateNodeJobRun(ctx context.Context, db gorp.SqlExecutor, j *sdk.WorkflowNodeJobRun) error {
var end func()
_, end = telemetry.Span(ctx, "workflow.UpdateNodeJobRun")
defer end()
dbj := new(JobRun)
err := dbj.ToJobRun(j)
if err != nil {
return err
}
if _, err := db.Update(dbj); err != nil {
return err
}
return nil
}
func keyBookJob(id int64) string {
return cache.Key("book", "job", strconv.FormatInt(id, 10))
}
func getHatcheryInfo(ctx context.Context, store cache.Store, j *JobRun) {
h := sdk.Service{}
k := keyBookJob(j.ID)
find, err := store.Get(k, &h)
if err != nil {
log.Error(ctx, "cannot get from cache %s: %v", k, err)
}
if find {
j.BookedBy = sdk.BookedBy{
Name: h.Name,
ID: h.ID,
}
}
}
// replaceWorkflowJobRunInQueue restart workflow node job
func replaceWorkflowJobRunInQueue(db gorp.SqlExecutor, wNodeJob sdk.WorkflowNodeJobRun) error {
query := "UPDATE workflow_node_run_job SET status = $1, retry = $2, worker_id = NULL WHERE id = $3"
if _, err := db.Exec(query, sdk.StatusWaiting, wNodeJob.Retry+1, wNodeJob.ID); err != nil {
return sdk.WrapError(err, "Unable to set workflow_node_run_job id %d with status %s", wNodeJob.ID, sdk.StatusWaiting)
}
query = "UPDATE worker SET status = $2, job_run_id = NULL where job_run_id = $1"
if _, err := db.Exec(query, wNodeJob.ID, sdk.StatusDisabled); err != nil {
return sdk.WrapError(err, "Unable to set workers")
}
return nil
}