Skip to content

Commit 8e1adef

Browse files
Merge pull request #31 from gabriel-samfira/add-ws-reader
Add websocket reader
2 parents 186120d + 97563d4 commit 8e1adef

File tree

534 files changed

+249
-231094
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

534 files changed

+249
-231094
lines changed

go.mod

+10-8
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
11
module github.com/cloudbase/garm-provider-common
22

3-
go 1.20
3+
go 1.22
44

55
require (
6-
github.com/google/uuid v1.3.0
7-
github.com/gorilla/handlers v1.5.1
8-
github.com/mattn/go-isatty v0.0.19
9-
github.com/minio/sio v0.3.1
6+
github.com/google/uuid v1.6.0
7+
github.com/gorilla/handlers v1.5.2
8+
github.com/gorilla/websocket v1.5.4-0.20240702125206-a62d9d2a8413
9+
github.com/mattn/go-isatty v0.0.20
10+
github.com/minio/sio v0.4.0
1011
github.com/pkg/errors v0.9.1
1112
github.com/stretchr/testify v1.8.4
1213
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569
13-
golang.org/x/crypto v0.12.0
14-
golang.org/x/sys v0.11.0
14+
golang.org/x/crypto v0.25.0
15+
golang.org/x/sys v0.22.0
1516
gopkg.in/natefinch/lumberjack.v2 v2.2.1
1617
gopkg.in/yaml.v3 v3.0.1
1718
)
1819

1920
require (
2021
github.com/davecgh/go-spew v1.1.1 // indirect
21-
github.com/felixge/httpsnoop v1.0.1 // indirect
22+
github.com/felixge/httpsnoop v1.0.4 // indirect
2223
github.com/pmezard/go-difflib v1.0.0 // indirect
24+
golang.org/x/net v0.27.0 // indirect
2325
)

go.sum

+18-14
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
22
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3-
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
4-
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
5-
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
6-
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
7-
github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4=
8-
github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q=
9-
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
10-
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
11-
github.com/minio/sio v0.3.1 h1:d59r5RTHb1OsQaSl1EaTWurzMMDRLA5fgNmjzD4eVu4=
12-
github.com/minio/sio v0.3.1/go.mod h1:S0ovgVgc+sTlQyhiXA1ppBLv7REM7TYi5yyq2qL/Y6o=
3+
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
4+
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
5+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
6+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
7+
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
8+
github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w=
9+
github.com/gorilla/websocket v1.5.4-0.20240702125206-a62d9d2a8413 h1:0Zn/h+BUQg6QHkybGvjFD7BnIbjjz3oWUObacn//1Go=
10+
github.com/gorilla/websocket v1.5.4-0.20240702125206-a62d9d2a8413/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
11+
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
12+
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
13+
github.com/minio/sio v0.4.0 h1:u4SWVEm5lXSqU42ZWawV0D9I5AZ5YMmo2RXpEQ/kRhc=
14+
github.com/minio/sio v0.4.0/go.mod h1:oBSjJeGbBdRMZZwna07sX9EFzZy+ywu5aofRiV1g79I=
1315
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
1416
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
1517
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -18,11 +20,13 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
1820
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
1921
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569 h1:xzABM9let0HLLqFypcxvLmlvEciCHL7+Lv+4vwZqecI=
2022
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569/go.mod h1:2Ly+NIftZN4de9zRmENdYbvPQeaVIYKWpLFStLFEBgI=
21-
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
22-
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
23+
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
24+
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
25+
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
26+
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
2327
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
24-
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
25-
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
28+
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
29+
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
2630
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
2731
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2832
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=

util/websocket/reader.go

+184
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package websocket
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log/slog"
8+
"net/http"
9+
"net/url"
10+
"sync"
11+
"time"
12+
13+
"github.com/gorilla/websocket"
14+
)
15+
16+
const (
17+
// Time allowed to write a message to the peer.
18+
writeWait = 10 * time.Second
19+
20+
// Time allowed to read the next pong message from the peer.
21+
pongWait = 60 * time.Second
22+
23+
// Send pings to peer with this period. Must be less than pongWait.
24+
pingPeriod = (pongWait * 9) / 10
25+
26+
// Maximum message size allowed from peer.
27+
maxMessageSize = 16384 // 16 KB
28+
)
29+
30+
// MessageHandler is a function that processes a message received from a websocket connection.
31+
type MessageHandler func(msgType int, msg []byte) error
32+
33+
type APIErrorResponse struct {
34+
Error string `json:"error"`
35+
Details string `json:"details"`
36+
}
37+
38+
// NewReader creates a new websocket reader. The reader will pass on any message it receives to the
39+
// handler function. The handler function should return an error if it fails to process the message.
40+
func NewReader(ctx context.Context, baseURL, pth, token string, handler MessageHandler) (*Reader, error) {
41+
parsedURL, err := url.Parse(baseURL)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
wsScheme := "ws"
47+
if parsedURL.Scheme == "https" {
48+
wsScheme = "wss"
49+
}
50+
u := url.URL{Scheme: wsScheme, Host: parsedURL.Host, Path: pth}
51+
header := http.Header{}
52+
header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
53+
54+
return &Reader{
55+
ctx: ctx,
56+
url: u,
57+
header: header,
58+
handler: handler,
59+
done: make(chan struct{}),
60+
}, nil
61+
}
62+
63+
type Reader struct {
64+
ctx context.Context
65+
url url.URL
66+
header http.Header
67+
68+
done chan struct{}
69+
running bool
70+
71+
handler MessageHandler
72+
73+
conn *websocket.Conn
74+
mux sync.Mutex
75+
writeMux sync.Mutex
76+
}
77+
78+
func (w *Reader) Stop() {
79+
w.mux.Lock()
80+
defer w.mux.Unlock()
81+
if !w.running {
82+
return
83+
}
84+
w.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
85+
w.conn.Close()
86+
close(w.done)
87+
w.running = false
88+
}
89+
90+
func (w *Reader) Done() <-chan struct{} {
91+
return w.done
92+
}
93+
94+
func (w *Reader) WriteMessage(messageType int, data []byte) error {
95+
// The websocket package does not support concurrent writes and panics if it
96+
// detects that one has occurred, so we need to lock the writeMux to prevent
97+
// concurrent writes to the same connection.
98+
w.writeMux.Lock()
99+
defer w.writeMux.Unlock()
100+
if !w.running {
101+
return fmt.Errorf("websocket is not running")
102+
}
103+
if err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
104+
return err
105+
}
106+
return w.conn.WriteMessage(messageType, data)
107+
}
108+
109+
func (w *Reader) Start() error {
110+
w.mux.Lock()
111+
defer w.mux.Unlock()
112+
if w.running {
113+
return nil
114+
}
115+
116+
c, response, err := websocket.DefaultDialer.Dial(w.url.String(), w.header)
117+
if err != nil {
118+
var resp APIErrorResponse
119+
var msg string
120+
var status string
121+
if response != nil {
122+
if response.Body != nil {
123+
if err := json.NewDecoder(response.Body).Decode(&resp); err == nil {
124+
msg = resp.Details
125+
}
126+
}
127+
status = response.Status
128+
}
129+
return fmt.Errorf("failed to stream logs: %q %s (%s)", err, msg, status)
130+
}
131+
w.conn = c
132+
w.running = true
133+
go w.loop()
134+
go w.handlerReader()
135+
return nil
136+
}
137+
138+
func (w *Reader) handlerReader() {
139+
defer w.Stop()
140+
w.writeMux.Lock()
141+
w.conn.SetReadLimit(maxMessageSize)
142+
w.conn.SetReadDeadline(time.Now().Add(pongWait))
143+
w.conn.SetPongHandler(func(string) error { w.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
144+
w.writeMux.Unlock()
145+
for {
146+
msgType, message, err := w.conn.ReadMessage()
147+
if err != nil {
148+
if IsErrorOfInterest(err) {
149+
// TODO(gabriel-samfira): we should allow for an error channel that can be used to signal
150+
// the caller that the connection has been closed.
151+
slog.With(slog.Any("error", err)).Error("reading log message")
152+
}
153+
return
154+
}
155+
if w.handler != nil {
156+
if err := w.handler(msgType, message); err != nil {
157+
slog.With(slog.Any("error", err)).Error("handling log message")
158+
}
159+
}
160+
}
161+
}
162+
163+
func (w *Reader) loop() {
164+
defer w.Stop()
165+
ticker := time.NewTicker(pingPeriod)
166+
defer ticker.Stop()
167+
for {
168+
select {
169+
case <-w.ctx.Done():
170+
return
171+
case <-w.Done():
172+
return
173+
case <-ticker.C:
174+
w.writeMux.Lock()
175+
w.conn.SetWriteDeadline(time.Now().Add(writeWait))
176+
err := w.conn.WriteMessage(websocket.PingMessage, nil)
177+
if err != nil {
178+
w.writeMux.Unlock()
179+
return
180+
}
181+
w.writeMux.Unlock()
182+
}
183+
}
184+
}

util/websocket/util.go

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package websocket
2+
3+
import (
4+
"errors"
5+
"net"
6+
7+
"github.com/gorilla/websocket"
8+
)
9+
10+
func IsErrorOfInterest(err error) bool {
11+
if err == nil {
12+
return false
13+
}
14+
15+
if errors.Is(err, websocket.ErrCloseSent) {
16+
return false
17+
}
18+
19+
if errors.Is(err, websocket.ErrBadHandshake) {
20+
return false
21+
}
22+
23+
if errors.Is(err, net.ErrClosed) {
24+
return false
25+
}
26+
27+
asCloseErr, ok := err.(*websocket.CloseError)
28+
if ok {
29+
switch asCloseErr.Code {
30+
case websocket.CloseNormalClosure, websocket.CloseGoingAway,
31+
websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure:
32+
return false
33+
}
34+
}
35+
36+
return true
37+
}

vendor/github.com/davecgh/go-spew/LICENSE

-15
This file was deleted.

0 commit comments

Comments
 (0)