Skip to content
This repository was archived by the owner on Sep 29, 2024. It is now read-only.

Commit 9002ea7

Browse files
authored
Attempt at a client (#611)
* Attempt at client * adding readme and example
1 parent 59c0983 commit 9002ea7

File tree

5 files changed

+427
-0
lines changed

5 files changed

+427
-0
lines changed

_examples/README.md

+27
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,33 @@ func main() {
5858
}
5959
```
6060

61+
## How to use the client
62+
63+
This is still very beta code and should not be relied upon.
64+
65+
```go
66+
package main
67+
68+
import (
69+
...
70+
socketio "github.com/googollee/go-socket.io"
71+
)
72+
73+
func main() {
74+
uri := "http://127.0.0.1:8000"
75+
76+
client, _ := socketio.NewClient(uri, nil)
77+
78+
// Handle an incoming event
79+
client.OnEvent("reply", func(s socketio.Conn, msg string) {
80+
log.Println("Receive Message /reply: ", "reply", msg)
81+
})
82+
83+
client.Connect()
84+
client.Emit("notice", "hello")
85+
client.Close()
86+
}
87+
```
6188

6289
## How to use Redis broadcast adapter
6390
```go

_examples/client/main.go

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"time"
6+
7+
socketio "github.com/googollee/go-socket.io"
8+
)
9+
10+
func main() {
11+
// Simple client to talk to default-http example
12+
uri := "http://127.0.0.1:8000"
13+
14+
client, err := socketio.NewClient(uri, nil)
15+
if err != nil {
16+
panic(err)
17+
}
18+
19+
// Handle an incoming event
20+
client.OnEvent("reply", func(s socketio.Conn, msg string) {
21+
log.Println("Receive Message /reply: ", "reply", msg)
22+
})
23+
24+
err = client.Connect()
25+
if err != nil {
26+
panic(err)
27+
}
28+
29+
client.Emit("notice", "hello")
30+
31+
time.Sleep(1 * time.Second)
32+
err = client.Close()
33+
if err != nil {
34+
panic(err)
35+
}
36+
}

_examples/default-http/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func main() {
4040
})
4141

4242
server.OnEvent("/chat", "msg", func(s socketio.Conn, msg string) string {
43+
log.Println("chat:", msg)
4344
s.SetContext(msg)
4445
return "recv " + msg
4546
})

client.go

+295
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
package socketio
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"net/url"
7+
"path"
8+
"strings"
9+
10+
"github.com/googollee/go-socket.io/engineio"
11+
"github.com/googollee/go-socket.io/engineio/transport"
12+
"github.com/googollee/go-socket.io/engineio/transport/polling"
13+
"github.com/googollee/go-socket.io/logger"
14+
"github.com/googollee/go-socket.io/parser"
15+
)
16+
17+
// Server is a go-socket.io server.
18+
type Client struct {
19+
conn *conn
20+
namespace string
21+
handlers *namespaceHandlers
22+
url string
23+
opts *engineio.Options
24+
}
25+
26+
// NewServer returns a server.
27+
func NewClient(uri string, opts *engineio.Options) (*Client, error) {
28+
// uri like http://asd.com:8080/namesapce
29+
30+
url, err := url.Parse(uri)
31+
if err != nil {
32+
return nil, err
33+
}
34+
namespace := url.Path
35+
36+
// Not allowing other than default
37+
url.Path = path.Join("/socket.io", namespace)
38+
url.Path = url.EscapedPath()
39+
if strings.HasSuffix(url.Path, "socket.io") {
40+
url.Path += "/"
41+
}
42+
43+
client := &Client{
44+
conn: nil,
45+
namespace: namespace,
46+
url: url.String(),
47+
handlers: newNamespaceHandlers(),
48+
opts: opts,
49+
}
50+
51+
fmt.Println(client)
52+
53+
return client, nil
54+
}
55+
56+
func (s *Client) Connect() error {
57+
dialer := engineio.Dialer{
58+
Transports: []transport.Transport{polling.Default},
59+
}
60+
enginioCon, err := dialer.Dial(s.url, nil)
61+
if err != nil {
62+
return err
63+
}
64+
65+
// Set the engine connection
66+
c := newConn(enginioCon, s.handlers)
67+
68+
s.conn = c
69+
70+
if err := c.connectClient(); err != nil {
71+
_ = c.Close()
72+
if root, ok := s.handlers.Get(rootNamespace); ok && root.onError != nil {
73+
root.onError(nil, err)
74+
}
75+
76+
return err
77+
}
78+
79+
go s.clientError(c)
80+
go s.clientWrite(c)
81+
go s.clientRead(c)
82+
return nil
83+
}
84+
85+
// Close closes server.
86+
func (s *Client) Close() error {
87+
return s.conn.Close()
88+
}
89+
90+
func (s *Client) Emit(event string, args ...interface{}) {
91+
nsp := s.namespace
92+
if nsp == aliasRootNamespace {
93+
nsp = rootNamespace
94+
}
95+
96+
ns, ok := s.conn.namespaces.Get(nsp)
97+
if !ok {
98+
logger.Info("Connection Namespace not initialized")
99+
return
100+
}
101+
ns.Emit(event, args...)
102+
}
103+
104+
// OnConnect set a handler function f to handle open event for namespace.
105+
func (s *Client) OnConnect(f func(Conn) error) {
106+
h := s.getNamespace(s.namespace)
107+
if h == nil {
108+
h = s.createNamespace(s.namespace)
109+
}
110+
111+
h.OnConnect(f)
112+
}
113+
114+
// OnDisconnect set a handler function f to handle disconnect event for namespace.
115+
func (s *Client) OnDisconnect(f func(Conn, string)) {
116+
h := s.getNamespace(s.namespace)
117+
if h == nil {
118+
h = s.createNamespace(s.namespace)
119+
}
120+
121+
h.OnDisconnect(f)
122+
}
123+
124+
// OnError set a handler function f to handle error for namespace.
125+
func (s *Client) OnError(f func(Conn, error)) {
126+
h := s.getNamespace(s.namespace)
127+
if h == nil {
128+
h = s.createNamespace(s.namespace)
129+
}
130+
131+
h.OnError(f)
132+
}
133+
134+
// OnEvent set a handler function f to handle event for namespace.
135+
func (s *Client) OnEvent(event string, f interface{}) {
136+
h := s.getNamespace(s.namespace)
137+
if h == nil {
138+
h = s.createNamespace(s.namespace)
139+
}
140+
141+
h.OnEvent(event, f)
142+
}
143+
144+
/////////////////////////
145+
// Private Functions
146+
/////////////////////////
147+
148+
func (s *Client) clientError(c *conn) {
149+
defer func() {
150+
if err := c.Close(); err != nil {
151+
logger.Error("close connect:", err)
152+
}
153+
154+
}()
155+
156+
for {
157+
select {
158+
case <-c.quitChan:
159+
return
160+
case err := <-c.errorChan:
161+
logger.Error("clientError", err)
162+
163+
var errMsg *errorMessage
164+
if !errors.As(err, &errMsg) {
165+
continue
166+
}
167+
168+
if handler := c.namespace(errMsg.namespace); handler != nil {
169+
if handler.onError != nil {
170+
nsConn, ok := c.namespaces.Get(errMsg.namespace)
171+
if !ok {
172+
continue
173+
}
174+
handler.onError(nsConn, errMsg.err)
175+
}
176+
}
177+
}
178+
}
179+
}
180+
181+
func (s *Client) clientWrite(c *conn) {
182+
defer func() {
183+
if err := c.Close(); err != nil {
184+
logger.Error("close connect:", err)
185+
}
186+
187+
}()
188+
189+
for {
190+
select {
191+
case <-c.quitChan:
192+
logger.Info("clientWrite Writer loop has stopped")
193+
return
194+
case pkg := <-c.writeChan:
195+
if err := c.encoder.Encode(pkg.Header, pkg.Data); err != nil {
196+
c.onError(pkg.Header.Namespace, err)
197+
}
198+
}
199+
}
200+
}
201+
202+
func (s *Client) clientRead(c *conn) {
203+
defer func() {
204+
if err := c.Close(); err != nil {
205+
logger.Error("close connect:", err)
206+
}
207+
}()
208+
209+
var event string
210+
211+
for {
212+
var header parser.Header
213+
214+
if err := c.decoder.DecodeHeader(&header, &event); err != nil {
215+
c.onError(rootNamespace, err)
216+
logger.Error("clientRead Error in Decoder", err)
217+
return
218+
}
219+
220+
if header.Namespace == aliasRootNamespace {
221+
header.Namespace = rootNamespace
222+
}
223+
224+
var err error
225+
switch header.Type {
226+
case parser.Ack:
227+
err = ackPacketHandler(c, header)
228+
case parser.Connect:
229+
err = clientConnectPacketHandler(c, header)
230+
case parser.Disconnect:
231+
err = clientDisconnectPacketHandler(c, header)
232+
case parser.Event:
233+
err = eventPacketHandler(c, event, header)
234+
}
235+
236+
if err != nil {
237+
logger.Error("client read:", err)
238+
return
239+
}
240+
}
241+
}
242+
243+
func (s *Client) createNamespace(nsp string) *namespaceHandler {
244+
if nsp == aliasRootNamespace {
245+
nsp = rootNamespace
246+
}
247+
248+
handler := newNamespaceHandler(nsp, nil)
249+
s.handlers.Set(nsp, handler)
250+
251+
return handler
252+
}
253+
254+
func (s *Client) getNamespace(nsp string) *namespaceHandler {
255+
if nsp == aliasRootNamespace {
256+
nsp = rootNamespace
257+
}
258+
259+
ret, ok := s.handlers.Get(nsp)
260+
if !ok {
261+
return nil
262+
}
263+
264+
return ret
265+
}
266+
267+
////
268+
// Handlers
269+
////
270+
271+
func (c *conn) connectClient() error {
272+
rootHandler, ok := c.handlers.Get(rootNamespace)
273+
if !ok {
274+
return errUnavailableRootHandler
275+
}
276+
277+
root := newNamespaceConn(c, aliasRootNamespace, rootHandler.broadcast)
278+
c.namespaces.Set(rootNamespace, root)
279+
280+
root.Join(root.Conn.ID())
281+
282+
c.namespaces.Range(func(ns string, nc *namespaceConn) {
283+
nc.SetContext(c.Conn.Context())
284+
})
285+
286+
header := parser.Header{
287+
Type: parser.Connect,
288+
}
289+
290+
if err := c.encoder.Encode(header); err != nil {
291+
return err
292+
}
293+
294+
return nil
295+
}

0 commit comments

Comments
 (0)