Skip to content

Commit 9e99da6

Browse files
authored
Merge pull request #39 from michelvocks/one_pipeline_per_run
one pipeline process per pipeline run
2 parents 22375b5 + f7b139e commit 9e99da6

File tree

8 files changed

+354
-277
lines changed

8 files changed

+354
-277
lines changed

cmd/gaia/main.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/gaia-pipeline/gaia"
1414
"github.com/gaia-pipeline/gaia/handlers"
1515
"github.com/gaia-pipeline/gaia/pipeline"
16+
"github.com/gaia-pipeline/gaia/plugin"
1617
scheduler "github.com/gaia-pipeline/gaia/scheduler"
1718
"github.com/gaia-pipeline/gaia/store"
1819
hclog "github.com/hashicorp/go-hclog"
@@ -132,8 +133,11 @@ func main() {
132133
os.Exit(1)
133134
}
134135

136+
// Create new plugin system
137+
pS := &plugin.Plugin{}
138+
135139
// Initialize scheduler
136-
scheduler := scheduler.NewScheduler(store)
140+
scheduler := scheduler.NewScheduler(store, pS)
137141
err = scheduler.Init()
138142
if err != nil {
139143
gaia.Cfg.Logger.Error("cannot initialize scheduler:", "error", err.Error())

frontend/client/views/pipeline/detail.vue

+1-12
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ export default {
9090
}
9191
],
9292
runsRows: [],
93-
job: null,
9493
pipelineViewOptions: {
9594
physics: { stabilization: true },
9695
layout: {
@@ -333,11 +332,6 @@ export default {
333332
// Create vis network
334333
// We have to move out the instance out of vue because of https://github.com/almende/vis/issues/2567
335334
window.pipelineView = new Vis.Network(container, data, this.pipelineViewOptions)
336-
337-
// Create an selectNode event
338-
window.pipelineView.on('selectNode', function (params) {
339-
this.job = this.nodes.get(params.nodes[0])
340-
}.bind(this))
341335
}
342336
},
343337
@@ -358,13 +352,8 @@ export default {
358352
},
359353
360354
jobLog () {
361-
var jobid = null
362-
if (this.job) {
363-
jobid = this.job.internalID
364-
}
365-
366355
// Route
367-
this.$router.push({path: '/pipeline/log', query: { pipelineid: this.pipelineID, runid: this.runID, jobid: jobid }})
356+
this.$router.push({path: '/pipeline/log', query: { pipelineid: this.pipelineID, runid: this.runID }})
368357
},
369358
370359
startPipeline (pipelineid) {

frontend/client/views/pipeline/log.vue

+16-35
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ export default {
2222
logText: '',
2323
jobRunning: true,
2424
runID: null,
25-
pipelineID: null,
26-
jobID: null,
27-
currentPath: ''
25+
pipelineID: null
2826
}
2927
},
3028
@@ -36,64 +34,47 @@ export default {
3634
this.fetchData()
3735
3836
// periodically update dashboard
39-
this.intervalID = setInterval(function () {
37+
var intervalID = setInterval(function () {
4038
this.fetchData()
4139
}.bind(this), 3000)
42-
this.currentPath = this.$route.path
40+
41+
// Append interval id to store
42+
this.$store.commit('appendInterval', intervalID)
4343
},
4444
4545
watch: {
4646
'$route': 'fetchData'
4747
},
4848
49+
destroyed () {
50+
this.$store.commit('clearIntervals')
51+
},
52+
4953
components: {
5054
Message
5155
},
5256
5357
methods: {
5458
fetchData () {
55-
if (this.$route.path !== this.currentPath) {
56-
this.$store.commit('clearIntervals')
57-
}
58-
5959
// look up required url parameters
6060
this.pipelineID = this.$route.query.pipelineid
6161
this.runID = this.$route.query.runid
6262
if (!this.runID || !this.pipelineID) {
6363
return
6464
}
6565
66-
// job id is optional. If ommitted, all logs from all jobs
67-
// are displayed.
68-
this.jobID = this.$route.query.jobid
69-
7066
this.$http
71-
.get('/api/v1/pipelinerun/' + this.pipelineID + '/' + this.runID + '/log', {
72-
showProgressBar: false,
73-
params: {
74-
jobid: this.jobID
75-
}
76-
})
67+
.get('/api/v1/pipelinerun/' + this.pipelineID + '/' + this.runID + '/log', { showProgressBar: false })
7768
.then(response => {
7869
if (response.data) {
79-
// Check if we got multiple objects
80-
var finished = true
81-
this.logText = ''
82-
for (let i = 0, l = response.data.length; i < l; i++) {
83-
// We add the received log
84-
this.logText += response.data[i].log
85-
86-
// LF does not work for HTML. Replace with <br />
87-
this.logText = this.logText.replace(/\n/g, '<br />')
88-
89-
// Job not finished?
90-
if (!response.data[i].finished) {
91-
finished = false
92-
}
93-
}
70+
// We add the received log
71+
this.logText = response.data.log
72+
73+
// LF does not work for HTML. Replace with <br />
74+
this.logText = this.logText.replace(/\n/g, '<br />')
9475
9576
// All jobs finished. Stop interval.
96-
if (finished && response.data.length > 0) {
77+
if (response.data.finished) {
9778
this.jobRunning = false
9879
clearInterval(this.intervalID)
9980
}

gaia.go

+3
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ const (
6666

6767
// LogsFolderName represents the Name of the logs folder in pipeline run folder
6868
LogsFolderName = "logs"
69+
70+
// LogsFileName represents the file name of the logs output
71+
LogsFileName = "output.log"
6972
)
7073

7174
// User is the user object

handlers/pipeline_run.go

+14-81
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"net/http"
66
"os"
77
"path/filepath"
8-
"sort"
98
"strconv"
109

1110
"github.com/gaia-pipeline/gaia"
@@ -84,20 +83,15 @@ func PipelineGetLatestRun(c echo.Context) error {
8483
return c.JSON(http.StatusOK, run)
8584
}
8685

87-
// GetJobLogs returns jobs for a given job.
88-
// If no jobID is given, a collection of all jobs logs will be returned.
86+
// GetJobLogs returns logs from a pipeline run.
8987
//
9088
// Required parameters:
9189
// pipelineid - Related pipeline id
9290
// pipelinerunid - Related pipeline run id
93-
//
94-
// Optional parameters:
95-
// jobid - Job id
9691
func GetJobLogs(c echo.Context) error {
9792
// Get parameters and validate
9893
pipelineID := c.Param("pipelineid")
9994
pipelineRunID := c.Param("runid")
100-
jobID := c.QueryParam("jobid")
10195

10296
// Transform pipelineid to int
10397
p, err := strconv.Atoi(pipelineID)
@@ -111,92 +105,31 @@ func GetJobLogs(c echo.Context) error {
111105
return c.String(http.StatusBadRequest, "invalid pipeline run id given")
112106
}
113107

114-
// Get pipeline run from store
115108
run, err := storeService.PipelineGetRunByPipelineIDAndID(p, r)
116109
if err != nil {
117110
return c.String(http.StatusBadRequest, "cannot find pipeline run with given pipeline id and pipeline run id")
118111
}
119112

120-
// jobID is not empty, just return the logs from this job
121-
if jobID != "" {
122-
for _, job := range run.Jobs {
123-
if strconv.FormatUint(uint64(job.ID), 10) == jobID {
124-
// Get logs
125-
jL, err := getLogs(pipelineID, pipelineRunID, jobID, false)
126-
if err != nil {
127-
return c.String(http.StatusBadRequest, err.Error())
128-
}
129-
130-
// Check if job is finished
131-
if job.Status == gaia.JobSuccess || job.Status == gaia.JobFailed {
132-
jL.Finished = true
133-
}
134-
135-
// We always return an array.
136-
// It makes a bit easier in the frontend.
137-
jobLogsList := []jobLogs{}
138-
jobLogsList = append(jobLogsList, *jL)
139-
return c.JSON(http.StatusOK, jobLogsList)
140-
}
141-
}
113+
// Create return object
114+
jL := jobLogs{}
142115

143-
// Logs for given job id not found
144-
return c.String(http.StatusBadRequest, "cannot find job with given job id")
116+
// Determine if job has been finished
117+
if run.Status == gaia.RunFailed || run.Status == gaia.RunSuccess {
118+
jL.Finished = true
145119
}
146120

147-
// Sort the slice. This is important for the order of the returned logs.
148-
sort.Slice(run.Jobs, func(i, j int) bool {
149-
return run.Jobs[i].Priority < run.Jobs[j].Priority
150-
})
151-
152-
// Return a collection of all logs
153-
jobs := []jobLogs{}
154-
for _, job := range run.Jobs {
155-
// Get logs
156-
jL, err := getLogs(pipelineID, pipelineRunID, strconv.FormatUint(uint64(job.ID), 10), true)
121+
// Check if log file exists
122+
logFilePath := filepath.Join(gaia.Cfg.WorkspacePath, pipelineID, pipelineRunID, gaia.LogsFolderName, gaia.LogsFileName)
123+
if _, err := os.Stat(logFilePath); err == nil {
124+
content, err := ioutil.ReadFile(logFilePath)
157125
if err != nil {
158-
return c.String(http.StatusBadRequest, err.Error())
159-
}
160-
161-
// No error but also no job logs. Job must be in the queue.
162-
// We skip it so no error will break things.
163-
if jL == nil {
164-
continue
165-
}
166-
167-
// Check if job is finished
168-
if job.Status == gaia.JobSuccess || job.Status == gaia.JobFailed {
169-
jL.Finished = true
126+
return c.String(http.StatusInternalServerError, "cannot read pipeline run log file")
170127
}
171128

172-
jobs = append(jobs, *jL)
129+
// Convert logs
130+
jL.Log = string(content)
173131
}
174132

175133
// Return logs
176-
return c.JSON(http.StatusOK, jobs)
177-
}
178-
179-
func getLogs(pipelineID, pipelineRunID, jobID string, getAllJobLogs bool) (*jobLogs, error) {
180-
// Lookup log file
181-
logFilePath := filepath.Join(gaia.Cfg.WorkspacePath, pipelineID, pipelineRunID, gaia.LogsFolderName, jobID)
182-
183-
// We only check if logs exist when a specific job log was requested.
184-
// If we don't do this, get all job logs will fail during a pipeline run.
185-
if _, err := os.Stat(logFilePath); os.IsNotExist(err) {
186-
if !getAllJobLogs {
187-
return nil, err
188-
}
189-
return nil, nil
190-
}
191-
192-
// Read file
193-
content, err := ioutil.ReadFile(logFilePath)
194-
if err != nil {
195-
return nil, err
196-
}
197-
198-
// Create return struct
199-
return &jobLogs{
200-
Log: string(content),
201-
}, nil
134+
return c.JSON(http.StatusOK, jL)
202135
}

plugin/plugin.go

+18-14
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os/exec"
99

1010
"github.com/gaia-pipeline/gaia"
11+
"github.com/gaia-pipeline/gaia/scheduler"
1112
"github.com/gaia-pipeline/protobuf"
1213
plugin "github.com/hashicorp/go-plugin"
1314
)
@@ -45,23 +46,29 @@ type Plugin struct {
4546

4647
// NewPlugin creates a new instance of Plugin.
4748
// One Plugin instance represents one connection to a plugin.
48-
//
49-
// It expects the start command to start the plugin and the log path (including file)
50-
// where the output should be logged to.
51-
func NewPlugin(command *exec.Cmd, logPath *string) (p *Plugin, err error) {
52-
// Allocate
53-
p = &Plugin{}
49+
func (p *Plugin) NewPlugin() scheduler.Plugin {
50+
return &Plugin{}
51+
}
5452

53+
// Connect prepares the log path, starts the plugin, initiates the
54+
// gRPC connection and looks up the plugin.
55+
// It's up to the caller to call plugin.Close to shutdown the plugin
56+
// and close the gRPC connection.
57+
//
58+
// It expects the start command for the plugin and the path where
59+
// the log file should be stored.
60+
func (p *Plugin) Connect(command *exec.Cmd, logPath *string) error {
5561
// Create log file and open it.
5662
// We will close this file in the close method.
5763
if logPath != nil {
64+
var err error
5865
p.logFile, err = os.OpenFile(
5966
*logPath,
6067
os.O_CREATE|os.O_WRONLY,
6168
0666,
6269
)
6370
if err != nil {
64-
return nil, err
71+
return err
6572
}
6673
}
6774

@@ -77,13 +84,6 @@ func NewPlugin(command *exec.Cmd, logPath *string) (p *Plugin, err error) {
7784
Stderr: p.writer,
7885
})
7986

80-
return p, nil
81-
}
82-
83-
// Connect starts the plugin, initiates the gRPC connection and looks up the plugin.
84-
// It's up to the caller to call plugin.Close to shutdown the plugin
85-
// and close the gRPC connection.
86-
func (p *Plugin) Connect() error {
8787
// Connect via gRPC
8888
gRPCClient, err := p.client.Client()
8989
if err != nil {
@@ -116,6 +116,10 @@ func (p *Plugin) Execute(j *gaia.Job) error {
116116

117117
// Execute the job
118118
_, err := p.pluginConn.ExecuteJob(job)
119+
120+
// Flush logs
121+
p.writer.Flush()
122+
119123
return err
120124
}
121125

0 commit comments

Comments
 (0)