Skip to content

Commit 2af7ac2

Browse files
authored
refactor(ui): 1 websocket for the whole pipeline (#5893)
1 parent 0ae9e8b commit 2af7ac2

20 files changed

+355
-260
lines changed

cli/cdsctl/workflow_log.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -431,14 +431,19 @@ func workflowLogStreamRun(v cli.Values) error {
431431
})
432432

433433
buf, err := json.Marshal(sdk.CDNStreamFilter{
434-
ItemType: link.ItemType,
435-
APIRef: link.APIRef,
434+
JobRunID: log.jobID,
436435
})
437436
if err != nil {
438437
return cli.WrapError(err, "unable to marshal streamFilter")
439438
}
440439
chanMessageToSend <- buf
441440

441+
type logBlock struct {
442+
Number int64 `json:"number"`
443+
Value string `json:"value"`
444+
ApiRefHash string `json:"api_ref_hash"`
445+
}
446+
442447
for {
443448
select {
444449
case <-ctx.Done():
@@ -449,6 +454,13 @@ func workflowLogStreamRun(v cli.Values) error {
449454
}
450455
fmt.Printf("Error: %s\n", err)
451456
case m := <-chanMsgReceived:
457+
var line logBlock
458+
if err := json.Unmarshal(m, &line); err != nil {
459+
return err
460+
}
461+
if line.ApiRefHash != link.APIRef {
462+
continue
463+
}
452464
fmt.Printf("%s", string(m))
453465
}
454466
}

engine/cdn/cdn_item.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func (s *Service) getItemLogValue(ctx context.Context, t sdk.CDNItemType, apiRef
291291
}
292292

293293
log.Debug(ctx, "getItemLogValue> Getting logs from cache")
294-
return it, int64(linesCount), s.LogCache.NewReader(it.ID, opts.format, opts.from, opts.size, opts.sort), filename, nil
294+
return it, int64(linesCount), s.LogCache.NewReader(*it, opts.format, opts.from, opts.size, opts.sort), filename, nil
295295
}
296296

297297
func (s *Service) pushItemLogIntoCache(ctx context.Context, it sdk.CDNItem, unitName string) error {

engine/cdn/cdn_log_store.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (s *Service) storeLogs(ctx context.Context, itemType sdk.CDNItemType, signa
8484

8585
// Send an event in WS broker to refresh streams on current item
8686
s.GoRoutines.Exec(ctx, "storeLogsPublishWSEvent", func(ctx context.Context) {
87-
s.publishWSEvent(*it)
87+
s.publishWSEvent(*iu)
8888
})
8989

9090
// If we have all lines or buffer is full and we received the last line

engine/cdn/item/dao.go

+13
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,19 @@ func LoadWorkerCacheItemsByProjectAndCacheTag(ctx context.Context, m *gorpmapper
182182
return getItems(ctx, m, db, query)
183183
}
184184

185+
// LoadByJobRunID load an item by his job id and type
186+
func LoadByJobRunID(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, jobRunId int64, itemTypes []string, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItem, error) {
187+
query := gorpmapper.NewQuery(`
188+
SELECT *
189+
FROM item
190+
WHERE api_ref->>'node_run_job_id' = $1
191+
AND type = ANY($2)
192+
AND to_delete = false
193+
ORDER BY created DESC
194+
`).Args(strconv.FormatInt(jobRunId, 10), pq.StringArray(itemTypes))
195+
return getItems(ctx, m, db, query, opts...)
196+
}
197+
185198
// LoadByAPIRefHashAndType load an item by his job id, step order and type
186199
func LoadByAPIRefHashAndType(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, hash string, itemType sdk.CDNItemType, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItem, error) {
187200
query := gorpmapper.NewQuery(`

engine/cdn/item_logs_handler.go

+68-34
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,20 @@ import (
1212
"github.com/gorilla/mux"
1313
"github.com/rockbears/log"
1414

15-
"github.com/ovh/cds/engine/cdn/item"
1615
"github.com/ovh/cds/engine/cdn/redis"
1716
"github.com/ovh/cds/engine/cdn/storage"
1817
"github.com/ovh/cds/engine/service"
1918
"github.com/ovh/cds/engine/websocket"
2019
"github.com/ovh/cds/sdk"
2120
)
2221

22+
type WSLine struct {
23+
Number int64 `json:"number"`
24+
Value string `json:"value"`
25+
Since int64 `json:"since,omitempty"`
26+
ApiRefHash string `json:"api_ref_hash"`
27+
}
28+
2329
func (s *Service) getItemLogsStreamHandler() service.Handler {
2430
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
2531
c, err := websocket.Upgrader.Upgrade(w, r, nil)
@@ -29,8 +35,8 @@ func (s *Service) getItemLogsStreamHandler() service.Handler {
2935
}
3036
defer c.Close() //nolint
3137

32-
jwt := ctx.Value(service.ContextJWT).(*jwt.Token)
33-
claims := jwt.Claims.(*sdk.AuthSessionJWTClaims)
38+
jwtToken := ctx.Value(service.ContextJWT).(*jwt.Token)
39+
claims := jwtToken.Claims.(*sdk.AuthSessionJWTClaims)
3440
sessionID := claims.StandardClaims.Id
3541

3642
wsClient := websocket.NewClient(c)
@@ -39,7 +45,23 @@ func (s *Service) getItemLogsStreamHandler() service.Handler {
3945
defer s.WSServer.RemoveClient(wsClient.UUID())
4046

4147
wsClient.OnMessage(func(m []byte) {
42-
if err := wsClientData.UpdateFilter(m); err != nil {
48+
var filter sdk.CDNStreamFilter
49+
if err := sdk.JSONUnmarshal(m, &filter); err != nil {
50+
ctx = sdk.ContextWithStacktrace(ctx, err)
51+
log.Warn(ctx, err.Error())
52+
return
53+
}
54+
55+
// Load last running step
56+
var iuID string
57+
if filter.JobRunID > 0 {
58+
iu, _ := storage.LoadLastItemUnitByJobUnitType(ctx, s.Mapper, s.mustDBWithCtx(ctx), s.Units.LogsBuffer().ID(), filter.JobRunID, sdk.CDNTypeItemStepLog)
59+
if iu != nil {
60+
iuID = iu.ID
61+
}
62+
}
63+
64+
if err := wsClientData.UpdateFilter(filter, iuID); err != nil {
4365
ctx = sdk.ContextWithStacktrace(ctx, err)
4466
log.Warn(ctx, err.Error())
4567
return
@@ -92,38 +114,45 @@ func (s *Service) sendLogsToWSClient(ctx context.Context, wsClient websocket.Cli
92114
return nil
93115
}
94116

95-
if wsClientData.itemUnit == nil {
96-
it, err := item.LoadByAPIRefHashAndType(ctx, s.Mapper, s.mustDBWithCtx(ctx), wsClientData.itemFilter.APIRef, wsClientData.itemFilter.ItemType)
97-
if err != nil {
98-
// Catch not found error as the item can be created after the client stream subscription
99-
if sdk.ErrorIs(err, sdk.ErrNotFound) {
100-
log.Debug(ctx, "sendLogsToWSClient> can't found item with type %s and ref %s for client %s: %+v", wsClientData.itemFilter.ItemType, wsClientData.itemFilter.APIRef, wsClient.UUID(), err)
101-
return nil
117+
if wsClientData.itemUnitsData == nil {
118+
wsClientData.itemUnitsData = make(map[string]ItemUnitClientData)
119+
}
120+
121+
for k := range wsClientData.itemUnitsData {
122+
if wsClientData.itemUnitsData[k].itemUnit == nil {
123+
iu, err := storage.LoadItemUnitByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), k)
124+
if err != nil {
125+
return err
102126
}
103-
return nil
104-
}
105127

106-
if err := s.itemAccessCheck(ctx, *it); err != nil {
107-
var projectKey, workflow string
108-
logRef, has := it.GetCDNLogApiRef()
109-
if has {
110-
projectKey = logRef.ProjectKey
111-
workflow = logRef.WorkflowName
128+
if err := s.itemAccessCheck(ctx, *iu.Item); err != nil {
129+
var projectKey, workflow string
130+
logRef, has := iu.Item.GetCDNLogApiRef()
131+
if has {
132+
projectKey = logRef.ProjectKey
133+
workflow = logRef.WorkflowName
134+
}
135+
return sdk.WrapError(err, "client %s can't access logs for workflow %s/%s", wsClient.UUID(), projectKey, workflow)
136+
}
137+
wsClientData.itemUnitsData[k] = ItemUnitClientData{
138+
itemUnit: iu,
139+
scoreNextLineToSend: wsClientData.itemUnitsData[k].scoreNextLineToSend,
112140
}
113-
return sdk.WrapError(err, "client %s can't access logs for workflow %s/%s", wsClient.UUID(), projectKey, workflow)
114141
}
115142

116-
iu, err := storage.LoadItemUnitByUnit(ctx, s.Mapper, s.mustDBWithCtx(ctx), s.Units.LogsBuffer().ID(), it.ID)
117-
if err != nil {
143+
if err := s.sendStepLog(ctx, wsClient, wsClientData, k); err != nil {
118144
return err
119145
}
120146

121-
wsClientData.itemUnit = iu
122147
}
148+
return nil
149+
}
123150

124-
log.Debug(ctx, "getItemLogsStreamHandler> send log to client %s from %d", wsClient.UUID(), wsClientData.scoreNextLineToSend)
151+
func (s *Service) sendStepLog(ctx context.Context, wsClient websocket.Client, wsClientData *websocketClientData, mapIndex string) error {
152+
data := wsClientData.itemUnitsData[mapIndex]
125153

126-
rc, err := s.Units.LogsBuffer().NewAdvancedReader(ctx, *wsClientData.itemUnit, sdk.CDNReaderFormatJSON, wsClientData.scoreNextLineToSend, 100, 0)
154+
log.Debug(ctx, "getItemLogsStreamHandler> send log to client %s from %d", wsClient.UUID(), data.scoreNextLineToSend)
155+
rc, err := s.Units.LogsBuffer().NewAdvancedReader(ctx, *data.itemUnit, sdk.CDNReaderFormatJSON, data.scoreNextLineToSend, 100, 0)
127156
if err != nil {
128157
return err
129158
}
@@ -138,26 +167,31 @@ func (s *Service) sendLogsToWSClient(ctx context.Context, wsClient websocket.Cli
138167
}
139168

140169
log.Debug(ctx, "getItemLogsStreamHandler> iterate over %d lines to send for client %s", len(lines), wsClient.UUID())
141-
oldNextLineToSend := wsClientData.scoreNextLineToSend
170+
oldNextLineToSend := data.scoreNextLineToSend
142171
for i := range lines {
143-
if wsClientData.scoreNextLineToSend > 0 && wsClientData.scoreNextLineToSend != lines[i].Number {
172+
if data.scoreNextLineToSend > 0 && data.scoreNextLineToSend != lines[i].Number {
144173
break
145174
}
146-
if err := wsClient.Send(lines[i]); err != nil {
175+
176+
if err := wsClient.Send(WSLine{
177+
Number: lines[i].Number,
178+
Value: lines[i].Value,
179+
Since: lines[i].Since,
180+
ApiRefHash: data.itemUnit.Item.APIRefHash,
181+
}); err != nil {
147182
return err
148183
}
149-
if wsClientData.scoreNextLineToSend < 0 {
150-
wsClientData.scoreNextLineToSend = lines[i].Number + 1
184+
if data.scoreNextLineToSend < 0 {
185+
data.scoreNextLineToSend = lines[i].Number + 1
151186
} else {
152-
wsClientData.scoreNextLineToSend++
187+
data.scoreNextLineToSend++
153188
}
154189
}
155-
190+
wsClientData.itemUnitsData[mapIndex] = data
156191
// If all the lines were sent, we can trigger another update, if only one line was send do not trigger an update wait for next event from broker
157-
if len(lines) > 1 && (oldNextLineToSend > 0 || int(wsClientData.scoreNextLineToSend-oldNextLineToSend) == len(lines)) {
192+
if len(lines) > 1 && (oldNextLineToSend > 0 || int(data.scoreNextLineToSend-oldNextLineToSend) == len(lines)) {
158193
wsClientData.TriggerUpdate()
159194
}
160-
161195
return nil
162196
}
163197

engine/cdn/item_logs_handler_test.go

+15-30
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,15 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
292292
require.NoError(t, s.initWebsocket())
293293
ts := httptest.NewServer(s.Router.Mux)
294294

295+
_, err := db.Exec("DELETE FROM storage_unit_item")
296+
require.NoError(t, err)
297+
_, err = db.Exec("DELETE FROM ITEM")
298+
require.NoError(t, err)
299+
295300
s.Client = cdsclient.New(cdsclient.Config{Host: "http://lolcat.api", InsecureSkipVerifyTLS: false})
296301
gock.InterceptClient(s.Client.(cdsclient.Raw).HTTPClient())
297302
t.Cleanup(gock.Off)
298-
gock.New("http://lolcat.api").Get("/project/" + projectKey + "/workflows/1/type/step-log/access").Reply(http.StatusOK).JSON(nil)
303+
gock.New("http://lolcat.api").Get("/project/" + projectKey + "/workflows/1/type/step-log/access").Times(1).Reply(http.StatusOK).JSON(nil)
299304

300305
ctx, cancel := context.WithCancel(context.TODO())
301306
t.Cleanup(cancel)
@@ -309,7 +314,7 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
309314
NodeRunID: 1,
310315
NodeRunName: "MyPipeline",
311316
JobName: "MyJob",
312-
JobID: 1,
317+
JobID: 123456789,
313318
Worker: &cdn.SignatureWorker{
314319
StepName: "script1",
315320
StepOrder: 1,
@@ -332,22 +337,6 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
332337
jwtTokenRaw, err := signer.SignJWT(jwtToken)
333338
require.NoError(t, err)
334339

335-
apiRef := sdk.CDNLogAPIRef{
336-
ProjectKey: signature.ProjectKey,
337-
WorkflowName: signature.WorkflowName,
338-
WorkflowID: signature.WorkflowID,
339-
RunID: signature.RunID,
340-
NodeRunName: signature.NodeRunName,
341-
NodeRunID: signature.NodeRunID,
342-
NodeRunJobName: signature.JobName,
343-
NodeRunJobID: signature.JobID,
344-
StepName: signature.Worker.StepName,
345-
StepOrder: signature.Worker.StepOrder,
346-
}
347-
apiRefHashU, err := hashstructure.Hash(apiRef, nil)
348-
require.NoError(t, err)
349-
apiRefHash := strconv.FormatUint(apiRefHashU, 10)
350-
351340
var messageCounter int64
352341
sendMessage := func() {
353342
hm := handledMessage{
@@ -385,9 +374,7 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
385374
chanErrorReceived <- client.RequestWebsocket(ctx, sdk.NewGoRoutines(ctx), uri, chanMsgToSend, chanMsgReceived, chanErrorReceived)
386375
}()
387376
buf, err := json.Marshal(sdk.CDNStreamFilter{
388-
ItemType: sdk.CDNTypeItemStepLog,
389-
APIRef: apiRefHash,
390-
Offset: 0,
377+
JobRunID: signature.JobID,
391378
})
392379
require.NoError(t, err)
393380
chanMsgToSend <- buf
@@ -443,15 +430,13 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
443430
chanErrorReceived <- client.RequestWebsocket(ctx, sdk.NewGoRoutines(ctx), uri, chanMsgToSend, chanMsgReceived, chanErrorReceived)
444431
}()
445432
buf, err = json.Marshal(sdk.CDNStreamFilter{
446-
ItemType: sdk.CDNTypeItemStepLog,
447-
APIRef: apiRefHash,
448-
Offset: 15,
433+
JobRunID: signature.JobID,
449434
})
450435
require.NoError(t, err)
451436
chanMsgToSend <- buf
452437

453438
lines = make([]redis.Line, 0)
454-
for ctx.Err() == nil && len(lines) < 5 {
439+
for ctx.Err() == nil && len(lines) < 10 {
455440
select {
456441
case <-ctx.Done():
457442
break
@@ -465,9 +450,9 @@ func TestGetItemLogsStreamHandler(t *testing.T) {
465450
}
466451
}
467452

468-
require.Len(t, lines, 5)
469-
require.Equal(t, "message 15\n", lines[0].Value)
470-
require.Equal(t, int64(15), lines[0].Number)
471-
require.Equal(t, "message 19\n", lines[4].Value)
472-
require.Equal(t, int64(19), lines[4].Number)
453+
require.Len(t, lines, 10)
454+
require.Equal(t, "message 10\n", lines[0].Value)
455+
require.Equal(t, int64(10), lines[0].Number)
456+
require.Equal(t, "message 19\n", lines[9].Value)
457+
require.Equal(t, int64(19), lines[9].Number)
473458
}

engine/cdn/lru/redis.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,16 @@ func (r *Redis) NewWriter(itemID string) io.WriteCloser {
132132
}
133133

134134
// NewReader instanciates a new reader
135-
func (r *Redis) NewReader(itemID string, format sdk.CDNReaderFormat, from int64, size uint, sort int64) io.ReadCloser {
135+
func (r *Redis) NewReader(item sdk.CDNItem, format sdk.CDNReaderFormat, from int64, size uint, sort int64) io.ReadCloser {
136136
return &redis.Reader{
137-
Store: r.store,
138-
ItemID: itemID,
139-
PrefixKey: redisLruItemCacheKey,
140-
Size: size,
141-
From: from,
142-
Format: format,
143-
Sort: sort,
137+
Store: r.store,
138+
ItemID: item.ID,
139+
ApiRefHash: item.APIRefHash,
140+
PrefixKey: redisLruItemCacheKey,
141+
Size: size,
142+
From: from,
143+
Format: format,
144+
Sort: sort,
144145
}
145146
}
146147

engine/cdn/lru/redis_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,13 @@ func TestRedisLRU(t *testing.T) {
6666
require.Equal(t, int64(45), size)
6767

6868
// Get first item
69-
reader1 := r.NewReader(item1.ID, sdk.CDNReaderFormatText, 0, 2, 0)
69+
reader1 := r.NewReader(item1, sdk.CDNReaderFormatText, 0, 2, 0)
7070
buf1 := new(strings.Builder)
7171
_, err = io.Copy(buf1, reader1)
7272
require.NoError(t, reader1.Close())
7373
require.NoError(t, err)
7474
require.Equal(t, "this is the first line\nthis is the second line\n", buf1.String())
75-
reader2 := r.NewReader(item1.ID, sdk.CDNReaderFormatText, 0, 0, 0)
75+
reader2 := r.NewReader(item1, sdk.CDNReaderFormatText, 0, 0, 0)
7676
buf2 := new(strings.Builder)
7777
_, err = io.Copy(buf2, reader2)
7878
require.NoError(t, reader2.Close())
@@ -83,15 +83,15 @@ func TestRedisLRU(t *testing.T) {
8383
writer2 := r.NewWriter(item2.ID)
8484
_, err = io.Copy(writer2, strings.NewReader("this is the first line\nthis is the second line"))
8585
require.NoError(t, err)
86-
reader3 := r.NewReader(item2.ID, sdk.CDNReaderFormatText, 0, 0, 0)
86+
reader3 := r.NewReader(item2, sdk.CDNReaderFormatText, 0, 0, 0)
8787
buf3 := new(strings.Builder)
8888
_, err = io.Copy(buf3, reader3)
8989
require.NoError(t, reader3.Close())
9090
require.NoError(t, err)
9191
require.Equal(t, "this is the first line\n", buf3.String())
9292
// close the writer should add the last line
9393
require.NoError(t, writer2.Close())
94-
reader4 := r.NewReader(item2.ID, sdk.CDNReaderFormatText, 0, 0, 0)
94+
reader4 := r.NewReader(item2, sdk.CDNReaderFormatText, 0, 0, 0)
9595
buf4 := new(strings.Builder)
9696
_, err = io.Copy(buf4, reader4)
9797
require.NoError(t, reader4.Close())

0 commit comments

Comments
 (0)