Skip to content

Commit def0e9f

Browse files
committed
WIP job scheduling process
1 parent 735a682 commit def0e9f

File tree

4 files changed

+68
-36
lines changed

4 files changed

+68
-36
lines changed

gaia.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ type PipelineType string
1414
// can have.
1515
type PipelineRunStatus string
1616

17+
// JobStatus represents the different status a job can have.
18+
type JobStatus string
19+
1720
const (
1821
// UNKNOWN plugin type
1922
UNKNOWN PipelineType = "unknown"
@@ -35,6 +38,15 @@ const (
3538

3639
// RunRunning status
3740
RunRunning PipelineRunStatus = "running"
41+
42+
// JobWaitingExec status
43+
JobWaitingExec JobStatus = "waiting for execution"
44+
45+
// JobSuccess status
46+
JobSuccess JobStatus = "success"
47+
48+
// JobFailed status
49+
JobFailed JobStatus = "failed"
3850
)
3951

4052
// User is the user object
@@ -71,11 +83,11 @@ type GitRepo struct {
7183

7284
// Job represents a single job of a pipeline
7385
type Job struct {
74-
ID string `json:"id,omitempty"`
75-
Title string `json:"title,omitempty"`
76-
Description string `json:"desc,omitempty"`
77-
Priority int32 `json:"priority"`
78-
Success bool `json:"success"`
86+
ID string `json:"id,omitempty"`
87+
Title string `json:"title,omitempty"`
88+
Description string `json:"desc,omitempty"`
89+
Priority int64 `json:"priority"`
90+
Status JobStatus `json:"status,omitempty"`
7991
}
8092

8193
// CreatePipeline represents a pipeline which is not yet

proto/plugin.pb.go

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/plugin.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ message Job {
1111
string unique_id = 1;
1212
string title = 2;
1313
string description = 3;
14-
int32 priority = 4;
14+
int64 priority = 4;
1515
map<string, string> args = 5;
1616
Question question = 6;
1717
}

scheduler/scheduler.go

+46-26
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (s *Scheduler) Init() {
7373
func (s *Scheduler) work() {
7474
// This worker never stops working.
7575
for {
76-
// Take one scheduled run
76+
// Take one scheduled run, block if there are no scheduled pipelines
7777
r := <-s.scheduledRuns
7878

7979
// Mark the scheduled run as running
@@ -175,61 +175,81 @@ func (s *Scheduler) executePipeline(p *gaia.Pipeline, r *gaia.PipelineRun) {
175175
return
176176
}
177177

178+
// Check if this pipeline has jobs declared
179+
if len(r.Jobs) == 0 {
180+
return
181+
}
182+
183+
// Schedule jobs and execute them.
184+
// Also update the run in the store.
185+
s.scheduleJobsByPriority(r)
178186
}
179187

180188
func executeJob(job *gaia.Job, wg *sync.WaitGroup) {
181189
// TODO
182190
wg.Done()
183191
}
184192

185-
func executeJobs(jobs []*gaia.Job) {
186-
// We finished all jobs, exit recursive execution.
187-
if len(jobs) == 0 {
188-
return
189-
}
190-
193+
// scheduleJobsByPriority schedules the given jobs by their respective
194+
// priority. This method is designed to be recursive and blocking.
195+
// If jobs have the same priority, they will be executed in parallel.
196+
func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun) {
191197
// Find the job with the lowest priority
192-
var lowestPrio int32
193-
for id, job := range jobs {
194-
if job.Priority < lowestPrio || id == 0 {
198+
var lowestPrio int64
199+
for _, job := range r.Jobs {
200+
if job.Priority < lowestPrio && job.Status == gaia.JobWaitingExec {
195201
lowestPrio = job.Priority
196202
}
197203
}
198204

199-
// We allocate a new slice for jobs with higher priority.
200-
// And also a slice for jobs which we execute now.
201-
var nextJobs []*gaia.Job
202-
var execJobs []*gaia.Job
203-
204205
// We might have multiple jobs with the same priority.
205206
// It means these jobs should be started in parallel.
206207
var wg sync.WaitGroup
207-
for _, job := range jobs {
208+
for _, job := range r.Jobs {
208209
if job.Priority == lowestPrio {
209210
// Increase wait group by one
210211
wg.Add(1)
211-
execJobs = append(execJobs, job)
212212

213213
// Execute this job in a separate goroutine
214-
go executeJob(job, &wg)
215-
} else {
216-
// We add this job to the next list
217-
nextJobs = append(nextJobs, job)
214+
go executeJob(&job, &wg)
218215
}
219216
}
220217

221-
// Wait until all jobs has been finished
218+
// Create channel for storing job run results and spawn results routine
219+
results := make(chan gaia.Job)
220+
go s.getJobResultsAndStore(results, r)
221+
222+
// Wait until all jobs has been finished and close results channel
222223
wg.Wait()
224+
close(results)
223225

224226
// Check if a job has been failed. If so, stop execution.
225-
for _, job := range execJobs {
226-
if !job.Success {
227+
// We also check if all jobs has been executed.
228+
var notExecJob bool
229+
for _, job := range r.Jobs {
230+
switch job.Status {
231+
case gaia.JobFailed:
227232
return
233+
case gaia.JobWaitingExec:
234+
notExecJob = true
228235
}
229236
}
230237

231-
// Run executeJobs again until all jobs have been executed
232-
executeJobs(nextJobs)
238+
// All jobs has been executed
239+
if !notExecJob {
240+
return
241+
}
242+
243+
// Run scheduleJobsByPriority again until all jobs have been executed
244+
s.scheduleJobsByPriority(r)
245+
}
246+
247+
// getJobResultsAndStore
248+
func (s *Scheduler) getJobResultsAndStore(results chan gaia.Job, r *gaia.PipelineRun) {
249+
for _ = range results {
250+
// Store update
251+
s.storeService.PipelinePutRun(r)
252+
}
233253
}
234254

235255
// getPipelineJobs uses the plugin system to get all jobs from the given pipeline.

0 commit comments

Comments
 (0)