Skip to content

Commit 7fc8290

Browse files
committed
Implemented md5 checksum check if set pipeline jobs functionality did not work
1 parent 3313495 commit 7fc8290

File tree

4 files changed

+105
-11
lines changed

4 files changed

+105
-11
lines changed

gaia.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ type User struct {
2929

3030
// Pipeline represents a single pipeline
3131
type Pipeline struct {
32-
Name string `json:"name"`
33-
Repo GitRepo `json:"repo"`
34-
Type PipelineType `json:"type"`
35-
ExecPath string `json:"execpath"`
36-
Jobs []Job `json:"jobs"`
37-
Created time.Time `json:"created"`
32+
Name string `json:"name"`
33+
Repo GitRepo `json:"repo"`
34+
Type PipelineType `json:"type"`
35+
ExecPath string `json:"execpath"`
36+
Md5Checksum []byte `json:"md5checksum"`
37+
Jobs []Job `json:"jobs"`
38+
Created time.Time `json:"created"`
3839
}
3940

4041
// GitRepo represents a single git repository

pipeline/pipeline.go

+34
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,40 @@ func (ap *ActivePipelines) Append(p gaia.Pipeline) {
104104
ap.Pipelines = append(ap.Pipelines, p)
105105
}
106106

107+
// Get looks up the pipeline with the given name.
108+
func (ap *ActivePipelines) Get(n string) *gaia.Pipeline {
109+
for pipeline := range ap.Iter() {
110+
if pipeline.Name == n {
111+
return &pipeline
112+
}
113+
}
114+
return nil
115+
}
116+
117+
// Replace takes the given pipeline and replaces it in the ActivePipelines
118+
// slice. Return true when success otherwise false.
119+
func (ap *ActivePipelines) Replace(p gaia.Pipeline) bool {
120+
ap.Lock()
121+
defer ap.Unlock()
122+
123+
// Search for the id
124+
var i = -1
125+
for id, pipeline := range ap.Pipelines {
126+
if pipeline.Name == p.Name {
127+
i = id
128+
}
129+
}
130+
131+
// We got it?
132+
if i != -1 {
133+
return false
134+
}
135+
136+
// Yes
137+
ap.Pipelines[i] = p
138+
return true
139+
}
140+
107141
// Iter iterates over the pipelines in the concurrent slice.
108142
func (ap *ActivePipelines) Iter() <-chan gaia.Pipeline {
109143
c := make(chan gaia.Pipeline)

pipeline/scheduler.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88
// setPipelineJobs uses the plugin system to get all
99
// jobs from the given pipeline.
1010
// This function is blocking and might take some time.
11-
func setPipelineJobs(p *gaia.Pipeline) {
11+
func setPipelineJobs(p *gaia.Pipeline) error {
1212
// Create the start command for the pipeline
1313
c := createPipelineCmd(p)
1414
if c == nil {
1515
gaia.Cfg.Logger.Debug("cannot set pipeline jobs", "error", errMissingType.Error(), "pipeline", p)
16-
return
16+
return errMissingType
1717
}
1818

1919
// Create new plugin instance
@@ -22,15 +22,17 @@ func setPipelineJobs(p *gaia.Pipeline) {
2222
// Connect to plugin(pipeline)
2323
if err := pC.Connect(); err != nil {
2424
gaia.Cfg.Logger.Debug("cannot connect to pipeline", "error", err.Error(), "pipeline", p)
25-
return
25+
return err
2626
}
2727
defer pC.Close()
2828

2929
// Get jobs
3030
jobs, err := pC.GetJobs()
3131
if err != nil {
3232
gaia.Cfg.Logger.Debug("cannot get jobs from pipeline", "error", err.Error(), "pipeline", p)
33-
return
33+
return err
3434
}
3535
p.Jobs = jobs
36+
37+
return nil
3638
}

pipeline/ticker.go

+58-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package pipeline
22

33
import (
4+
"bytes"
5+
"crypto/md5"
6+
"io"
47
"io/ioutil"
58
"os"
69
"strings"
@@ -61,6 +64,27 @@ func checkActivePipelines() {
6164
// already contains it.
6265
pName := getRealPipelineName(n, pType)
6366
if GlobalActivePipelines.Contains(pName) {
67+
// If Md5Checksum is set, we should check if pipeline has been changed.
68+
p := GlobalActivePipelines.Get(pName)
69+
if p != nil && p.Md5Checksum != nil {
70+
// Get MD5 Checksum
71+
checksum, err := getMd5Checksum(gaia.Cfg.PipelinePath + string(os.PathSeparator) + file.Name())
72+
if err != nil {
73+
gaia.Cfg.Logger.Debug("cannot calculate md5 checksum for pipeline", "error", err.Error(), "pipeline", p)
74+
continue
75+
}
76+
77+
// Pipeline has been changed?
78+
if bytes.Compare(p.Md5Checksum, checksum) != 0 {
79+
// Let us try again to start the plugin and receive all implemented jobs
80+
setPipelineJobsTicker(p)
81+
82+
// Replace pipeline
83+
GlobalActivePipelines.Replace(*p)
84+
}
85+
}
86+
87+
// Its already in the list
6488
continue
6589
}
6690

@@ -73,7 +97,7 @@ func checkActivePipelines() {
7397
}
7498

7599
// Let us try to start the plugin and receive all implemented jobs
76-
setPipelineJobs(&p)
100+
setPipelineJobsTicker(&p)
77101

78102
// Append new pipeline
79103
GlobalActivePipelines.Append(p)
@@ -105,3 +129,36 @@ func getPipelineType(n string) (gaia.PipelineType, error) {
105129
func getRealPipelineName(n string, pType gaia.PipelineType) string {
106130
return strings.TrimSuffix(n, typeDelimiter+pType.String())
107131
}
132+
133+
func setPipelineJobsTicker(p *gaia.Pipeline) {
134+
err := setPipelineJobs(p)
135+
if err != nil {
136+
// We were not able to get jobs from the pipeline.
137+
// We set the Md5Checksum for later to try it again.
138+
p.Md5Checksum, err = getMd5Checksum(p.ExecPath)
139+
if err != nil {
140+
gaia.Cfg.Logger.Debug("cannot calculate md5 checksum for pipeline", "error", err.Error(), "pipeline", p)
141+
}
142+
} else {
143+
// Reset md5 checksum in case we already set it
144+
p.Md5Checksum = nil
145+
}
146+
}
147+
148+
func getMd5Checksum(file string) ([]byte, error) {
149+
// Open file
150+
f, err := os.Open(file)
151+
if err != nil {
152+
return nil, err
153+
}
154+
defer f.Close()
155+
156+
// Create md5 obj and insert bytes
157+
h := md5.New()
158+
if _, err := io.Copy(h, f); err != nil {
159+
return nil, err
160+
}
161+
162+
// return md5 checksum
163+
return h.Sum(nil), nil
164+
}

0 commit comments

Comments
 (0)