Skip to content

Commit 1a69860

Browse files
Add websocket reader
This change adds a simple WebSocket reader that can be used with the GARM logging and events endpoints. Signed-off-by: Gabriel Adrian Samfira <[email protected]>
1 parent 186120d commit 1a69860

File tree

4 files changed

+231
-6
lines changed

4 files changed

+231
-6
lines changed

go.mod

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@ go 1.20
55
require (
66
github.com/google/uuid v1.3.0
77
github.com/gorilla/handlers v1.5.1
8+
github.com/gorilla/websocket v1.5.4-0.20240702125206-a62d9d2a8413
89
github.com/mattn/go-isatty v0.0.19
910
github.com/minio/sio v0.3.1
1011
github.com/pkg/errors v0.9.1
1112
github.com/stretchr/testify v1.8.4
1213
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569
13-
golang.org/x/crypto v0.12.0
14-
golang.org/x/sys v0.11.0
14+
golang.org/x/crypto v0.24.0
15+
golang.org/x/sys v0.21.0
1516
gopkg.in/natefinch/lumberjack.v2 v2.2.1
1617
gopkg.in/yaml.v3 v3.0.1
1718
)
@@ -20,4 +21,5 @@ require (
2021
github.com/davecgh/go-spew v1.1.1 // indirect
2122
github.com/felixge/httpsnoop v1.0.1 // indirect
2223
github.com/pmezard/go-difflib v1.0.0 // indirect
24+
golang.org/x/net v0.26.0 // indirect
2325
)

go.sum

+8-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
66
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
77
github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4=
88
github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q=
9+
github.com/gorilla/websocket v1.5.4-0.20240702125206-a62d9d2a8413 h1:0Zn/h+BUQg6QHkybGvjFD7BnIbjjz3oWUObacn//1Go=
10+
github.com/gorilla/websocket v1.5.4-0.20240702125206-a62d9d2a8413/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
911
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
1012
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
1113
github.com/minio/sio v0.3.1 h1:d59r5RTHb1OsQaSl1EaTWurzMMDRLA5fgNmjzD4eVu4=
@@ -18,11 +20,13 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
1820
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
1921
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569 h1:xzABM9let0HLLqFypcxvLmlvEciCHL7+Lv+4vwZqecI=
2022
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569/go.mod h1:2Ly+NIftZN4de9zRmENdYbvPQeaVIYKWpLFStLFEBgI=
21-
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
22-
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
23+
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
24+
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
25+
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
26+
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
2327
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
24-
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
25-
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
28+
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
29+
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
2630
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
2731
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2832
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=

util/websocket/reader.go

+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package websocket
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log/slog"
8+
"net/http"
9+
"net/url"
10+
"sync"
11+
"time"
12+
13+
"github.com/gorilla/websocket"
14+
)
15+
16+
const (
17+
// Time allowed to write a message to the peer.
18+
writeWait = 10 * time.Second
19+
20+
// Time allowed to read the next pong message from the peer.
21+
pongWait = 60 * time.Second
22+
23+
// Send pings to peer with this period. Must be less than pongWait.
24+
pingPeriod = (pongWait * 9) / 10
25+
26+
// Maximum message size allowed from peer.
27+
maxMessageSize = 16384 // 16 KB
28+
)
29+
30+
// MessageHandler is a function that processes a message received from a websocket connection.
31+
type MessageHandler func(msgType int, msg []byte) error
32+
33+
type APIErrorResponse struct {
34+
Error string `json:"error"`
35+
Details string `json:"details"`
36+
}
37+
38+
// NewReader creates a new websocket reader. The reader will pass on any message it receives to the
39+
// handler function. The handler function should return an error if it fails to process the message.
40+
func NewReader(ctx context.Context, baseURL, pth, token string, handler MessageHandler) (*Reader, error) {
41+
parsedURL, err := url.Parse(baseURL)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
wsScheme := "ws"
47+
if parsedURL.Scheme == "https" {
48+
wsScheme = "wss"
49+
}
50+
u := url.URL{Scheme: wsScheme, Host: parsedURL.Host, Path: pth}
51+
header := http.Header{}
52+
header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
53+
54+
return &Reader{
55+
ctx: ctx,
56+
url: u,
57+
header: header,
58+
handler: handler,
59+
done: make(chan struct{}),
60+
}, nil
61+
}
62+
63+
type Reader struct {
64+
ctx context.Context
65+
url url.URL
66+
header http.Header
67+
68+
done chan struct{}
69+
running bool
70+
71+
handler MessageHandler
72+
73+
conn *websocket.Conn
74+
mux sync.Mutex
75+
writeMux sync.Mutex
76+
}
77+
78+
func (w *Reader) Stop() {
79+
w.mux.Lock()
80+
defer w.mux.Unlock()
81+
if !w.running {
82+
return
83+
}
84+
w.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
85+
w.conn.Close()
86+
close(w.done)
87+
w.running = false
88+
}
89+
90+
func (w *Reader) Done() <-chan struct{} {
91+
return w.done
92+
}
93+
94+
func (w *Reader) WriteMessage(messageType int, data []byte) error {
95+
// The websocket package does not support concurrent writes and panics if it
96+
// detects that one has occurred, so we need to lock the writeMux to prevent
97+
// concurrent writes to the same connection.
98+
w.writeMux.Lock()
99+
defer w.writeMux.Unlock()
100+
if !w.running {
101+
return fmt.Errorf("websocket is not running")
102+
}
103+
if err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
104+
return err
105+
}
106+
return w.conn.WriteMessage(messageType, data)
107+
}
108+
109+
func (w *Reader) Start() error {
110+
w.mux.Lock()
111+
defer w.mux.Unlock()
112+
if w.running {
113+
return nil
114+
}
115+
116+
c, response, err := websocket.DefaultDialer.Dial(w.url.String(), w.header)
117+
if err != nil {
118+
var resp APIErrorResponse
119+
var msg string
120+
var status string
121+
if response != nil {
122+
if response.Body != nil {
123+
if err := json.NewDecoder(response.Body).Decode(&resp); err == nil {
124+
msg = resp.Details
125+
}
126+
}
127+
status = response.Status
128+
}
129+
return fmt.Errorf("failed to stream logs: %q %s (%s)", err, msg, status)
130+
}
131+
w.conn = c
132+
w.running = true
133+
go w.loop()
134+
go w.handlerReader()
135+
return nil
136+
}
137+
138+
func (w *Reader) handlerReader() {
139+
defer w.Stop()
140+
w.writeMux.Lock()
141+
w.conn.SetReadLimit(maxMessageSize)
142+
w.conn.SetReadDeadline(time.Now().Add(pongWait))
143+
w.conn.SetPongHandler(func(string) error { w.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
144+
w.writeMux.Unlock()
145+
for {
146+
msgType, message, err := w.conn.ReadMessage()
147+
if err != nil {
148+
if IsErrorOfInterest(err) {
149+
slog.With(slog.Any("error", err)).Error("reading log message")
150+
}
151+
return
152+
}
153+
if w.handler != nil {
154+
if err := w.handler(msgType, message); err != nil {
155+
slog.With(slog.Any("error", err)).Error("handling log message")
156+
}
157+
}
158+
}
159+
}
160+
161+
func (w *Reader) loop() {
162+
defer w.Stop()
163+
ticker := time.NewTicker(pingPeriod)
164+
defer ticker.Stop()
165+
for {
166+
select {
167+
case <-w.ctx.Done():
168+
return
169+
case <-w.Done():
170+
return
171+
case <-ticker.C:
172+
w.writeMux.Lock()
173+
w.conn.SetWriteDeadline(time.Now().Add(writeWait))
174+
err := w.conn.WriteMessage(websocket.PingMessage, nil)
175+
if err != nil {
176+
w.writeMux.Unlock()
177+
return
178+
}
179+
w.writeMux.Unlock()
180+
}
181+
}
182+
}

util/websocket/util.go

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package websocket
2+
3+
import (
4+
"errors"
5+
"net"
6+
7+
"github.com/gorilla/websocket"
8+
)
9+
10+
func IsErrorOfInterest(err error) bool {
11+
if err == nil {
12+
return false
13+
}
14+
15+
if errors.Is(err, websocket.ErrCloseSent) {
16+
return false
17+
}
18+
19+
if errors.Is(err, websocket.ErrBadHandshake) {
20+
return false
21+
}
22+
23+
if errors.Is(err, net.ErrClosed) {
24+
return false
25+
}
26+
27+
asCloseErr, ok := err.(*websocket.CloseError)
28+
if ok {
29+
switch asCloseErr.Code {
30+
case websocket.CloseNormalClosure, websocket.CloseGoingAway,
31+
websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure:
32+
return false
33+
}
34+
}
35+
36+
return true
37+
}

0 commit comments

Comments
 (0)