Skip to content

Commit c43caa6

Browse files
authored
Merge pull request #3 from sks/feature/serverMap
[Fix] Using sync.Map for concurrency in server ID storage
2 parents 4d20cea + 13afbb0 commit c43caa6

File tree

2 files changed

+32
-9
lines changed

2 files changed

+32
-9
lines changed

pkg/sse/options.go

+20
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,23 @@ type KeepAliveOption struct {
99
func (o KeepAliveOption) apply(t *sseTransport) {
1010
t.keepAliveInterval = o.Interval
1111
}
12+
13+
type PortOption struct {
14+
Port int
15+
}
16+
17+
func (o PortOption) apply(t *sseTransport) {
18+
t.port = o.Port
19+
}
20+
21+
func WithPort(port int) SSETransportOption {
22+
return PortOption{
23+
Port: port,
24+
}
25+
}
26+
27+
func WithKeepAliveInterval(interval time.Duration) SSETransportOption {
28+
return KeepAliveOption{
29+
Interval: interval,
30+
}
31+
}

pkg/sse/sse_transport.go

+12-9
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package sse
33
import (
44
"context"
55
"io"
6+
"net"
67
"net/http"
8+
"strconv"
9+
"sync"
710
"time"
811

912
foxyevent "github.com/strowk/foxy-contexts/pkg/foxy_event"
@@ -41,8 +44,8 @@ func NewTransport(options ...SSETransportOption) server.Transport {
4144
type sseTransport struct {
4245
keepAliveInterval time.Duration
4346
e *echo.Echo
44-
45-
sessionManager *session.SessionManager
47+
port int
48+
sessionManager *session.SessionManager
4649
}
4750

4851
func newResponseEvent(res jsonrpc2.JsonRpcResponse) (*Event, error) {
@@ -67,14 +70,14 @@ func (s *sseTransport) Run(
6770

6871
// e.Use(middleware.Logger())
6972

70-
servers := map[uuid.UUID]server.Server{}
73+
servers := sync.Map{}
7174

7275
postEndpoint := "/message"
7376

7477
e.GET("/sse", func(c echo.Context) error {
7578
sessionId := uuid.New()
7679
srv := server.NewServer(capabilities, serverInfo, options...)
77-
servers[sessionId] = srv
80+
servers.Store(sessionId, srv)
7881
w := c.Response()
7982
w.Header().Set("Content-Type", "text/event-stream")
8083
w.Header().Set("Cache-Control", "no-cache")
@@ -103,10 +106,10 @@ func (s *sseTransport) Run(
103106
// Protocol does not seem to have a way to notify client about server initiated shutdown
104107
// so we would just close the connection to allow server to shutdown and client to reconnect
105108
// to, hopefully, a new server instance started by orchestrator
106-
delete(servers, sessionId)
109+
servers.Delete(sessionId)
107110
return nil
108111
case <-c.Request().Context().Done():
109-
delete(servers, sessionId)
112+
servers.Delete(sessionId)
110113
srv.GetLogger().LogEvent(foxyevent.SSEClientDisconnected{ClientIP: c.RealIP()})
111114
return nil
112115
case res := <-srv.GetResponses():
@@ -141,7 +144,7 @@ func (s *sseTransport) Run(
141144
return c.String(http.StatusBadRequest, "sessionId is not a valid UUID")
142145
}
143146

144-
r, ok := servers[parsedSessionId]
147+
r, ok := servers.Load(parsedSessionId)
145148
if !ok {
146149
return c.String(http.StatusNotFound, "session not found")
147150
}
@@ -156,11 +159,11 @@ func (s *sseTransport) Run(
156159
return c.String(http.StatusNotFound, "failed to resolve session")
157160
}
158161

159-
r.Handle(ctx, b)
162+
r.(server.Server).Handle(ctx, b)
160163
return c.JSON(http.StatusAccepted, "Accepted")
161164
})
162165

163-
return e.Start("127.0.0.1:1323")
166+
return e.Start(net.JoinHostPort("127.0.0.1", strconv.Itoa(s.port)))
164167
}
165168

166169
func (s *sseTransport) Shutdown(ctx context.Context) error {

0 commit comments

Comments
 (0)