@@ -4,44 +4,41 @@ import (
4
4
"bytes"
5
5
"context"
6
6
"fmt"
7
-
8
7
"github.com/teivah/onecontext"
9
8
"nhooyr.io/websocket"
9
+ "time"
10
10
)
11
11
12
12
type webSocketConnection struct {
13
13
ConnectionBase
14
14
conn * websocket.Conn
15
15
transferMode TransferMode
16
+ watchDogChan chan dogFood
16
17
}
17
18
18
19
func newWebSocketConnection (parentContext context.Context , requestContext context.Context , connectionID string , conn * websocket.Conn ) * webSocketConnection {
19
20
ctx , _ := onecontext .Merge (parentContext , requestContext )
20
21
w := & webSocketConnection {
21
- conn : conn ,
22
+ conn : conn ,
23
+ watchDogChan : make (chan dogFood , 1 ),
22
24
ConnectionBase : ConnectionBase {
23
25
ctx : ctx ,
24
26
connectionID : connectionID ,
25
27
},
26
28
}
29
+ go w .watchDog (ctx )
27
30
return w
28
31
}
29
32
30
33
func (w * webSocketConnection ) Write (p []byte ) (n int , err error ) {
31
34
if err := w .Context ().Err (); err != nil {
32
35
return 0 , fmt .Errorf ("webSocketConnection canceled: %w" , w .ctx .Err ())
33
36
}
34
- ctx := w .ctx
35
- if w .timeout > 0 {
36
- var cancel context.CancelFunc
37
- ctx , cancel = context .WithTimeout (w .ctx , w .Timeout ())
38
- defer cancel () // has no effect because timeoutCtx is either done or not used anymore after websocket returns. But it keeps lint quiet
39
- }
40
37
messageType := websocket .MessageText
41
38
if w .transferMode == BinaryTransferMode {
42
39
messageType = websocket .MessageBinary
43
40
}
44
- err = w .conn .Write (ctx , messageType , p )
41
+ err = w .conn .Write (w . resetWatchDog () , messageType , p )
45
42
if err != nil {
46
43
return 0 , err
47
44
}
@@ -52,19 +49,70 @@ func (w *webSocketConnection) Read(p []byte) (n int, err error) {
52
49
if err := w .Context ().Err (); err != nil {
53
50
return 0 , fmt .Errorf ("webSocketConnection canceled: %w" , w .ctx .Err ())
54
51
}
55
- ctx := w .ctx
56
- if w .timeout > 0 {
57
- var cancel context.CancelFunc
58
- ctx , cancel = context .WithTimeout (w .ctx , w .Timeout ())
59
- defer cancel () // has no effect because timeoutCtx is either done or not used anymore after websocket returns. But it keeps lint quiet
60
- }
61
- _ , data , err := w .conn .Read (ctx )
52
+ _ , data , err := w .conn .Read (w .resetWatchDog ())
62
53
if err != nil {
63
54
return 0 , err
64
55
}
65
56
return bytes .NewReader (data ).Read (p )
66
57
}
67
58
59
+ // resetWatchDog resets the common watchDog for Read and Write.
60
+ // the watchDog will stop waiting for the last set timeout and wait for the new timeout.
61
+ func (w * webSocketConnection ) resetWatchDog () context.Context {
62
+ ctx := w .ctx
63
+ food := dogFood {timeout : w .timeout }
64
+ if w .timeout > 0 {
65
+ ctx , food .bark = context .WithCancel (w .ctx )
66
+ }
67
+ w .watchDogChan <- food
68
+ return ctx
69
+ }
70
+
71
+ // dogFood is used to reset the watchDog
72
+ type dogFood struct {
73
+ // After this, the dog will bark
74
+ timeout time.Duration
75
+ bark context.CancelFunc
76
+ }
77
+
78
+ // watchDog is the common watchDog for Read and Write. It stops the connection (aka closes the Websocket)
79
+ // when the last timeout has elapsed. If resetWatchDog is called before the last timeout has elapsed,
80
+ // the watchDog will restart waiting for the new timeout. If timeout is set to 0, it will not wait at all.
81
+ func (w * webSocketConnection ) watchDog (ctx context.Context ) {
82
+ var timer * time.Timer
83
+ var cancelTimeoutChan chan struct {}
84
+ for {
85
+ select {
86
+ case <- ctx .Done ():
87
+ return
88
+ case food := <- w .watchDogChan :
89
+ if timer != nil {
90
+ if ! timer .Stop () {
91
+ go func () {
92
+ <- timer .C
93
+ }()
94
+ }
95
+ go func () {
96
+ cancelTimeoutChan <- struct {}{}
97
+ }()
98
+ }
99
+ if food .timeout != 0 {
100
+ timer = time .NewTimer (food .timeout )
101
+ cancelTimeoutChan = make (chan struct {}, 1 )
102
+ go func () {
103
+ select {
104
+ case <- cancelTimeoutChan :
105
+ case <- timer .C :
106
+ food .bark ()
107
+ }
108
+ }()
109
+ } else {
110
+ timer = nil
111
+ }
112
+ }
113
+ }
114
+ }
115
+
68
116
func (w * webSocketConnection ) TransferMode () TransferMode {
69
117
return w .transferMode
70
118
}
0 commit comments