Skip to content

Commit 4b71176

Browse files
authored
fix(Srv/stream): add ID field to PingRequest (#353)
* fix(Srv/stream): add ID field to PingRequest * fix: make session requestID unique and incremental
1 parent 7f2ea88 commit 4b71176

File tree

1 file changed

+20
-8
lines changed

1 file changed

+20
-8
lines changed

server/streamable_http.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,9 @@ func WithLogger(logger util.Logger) StreamableHTTPOption {
115115
// - Batching of requests/notifications/responses in arrays.
116116
// - Stream Resumability
117117
type StreamableHTTPServer struct {
118-
server *MCPServer
119-
sessionTools *sessionToolsStore
118+
server *MCPServer
119+
sessionTools *sessionToolsStore
120+
sessionRequestIDs sync.Map // sessionId --> last requestID(*atomic.Int64)
120121

121122
httpServer *http.Server
122123
mu sync.RWMutex
@@ -407,15 +408,16 @@ func (s *StreamableHTTPServer) handleGet(w http.ResponseWriter, r *http.Request)
407408
go func() {
408409
ticker := time.NewTicker(s.listenHeartbeatInterval)
409410
defer ticker.Stop()
410-
message := mcp.JSONRPCRequest{
411-
JSONRPC: "2.0",
412-
Request: mcp.Request{
413-
Method: "ping",
414-
},
415-
}
416411
for {
417412
select {
418413
case <-ticker.C:
414+
message := mcp.JSONRPCRequest{
415+
JSONRPC: "2.0",
416+
ID: mcp.NewRequestId(s.nextRequestID(sessionID)),
417+
Request: mcp.Request{
418+
Method: "ping",
419+
},
420+
}
419421
select {
420422
case writeChan <- message:
421423
case <-done:
@@ -465,6 +467,9 @@ func (s *StreamableHTTPServer) handleDelete(w http.ResponseWriter, r *http.Reque
465467
// remove the session relateddata from the sessionToolsStore
466468
s.sessionTools.set(sessionID, nil)
467469

470+
// remove current session's requstID information
471+
s.sessionRequestIDs.Delete(sessionID)
472+
468473
w.WriteHeader(http.StatusOK)
469474
}
470475

@@ -496,6 +501,13 @@ func (s *StreamableHTTPServer) writeJSONRPCError(
496501
}
497502
}
498503

504+
// nextRequestID gets the next incrementing requestID for the current session
505+
func (s *StreamableHTTPServer) nextRequestID(sessionID string) int64 {
506+
actual, _ := s.sessionRequestIDs.LoadOrStore(sessionID, new(atomic.Int64))
507+
counter := actual.(*atomic.Int64)
508+
return counter.Add(1)
509+
}
510+
499511
// --- session ---
500512

501513
type sessionToolsStore struct {

0 commit comments

Comments
 (0)