From 640ae5a863938b849176caf02c914d3661f3847f Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 8 Apr 2025 09:43:22 +0100 Subject: [PATCH 1/8] initial client implementation --- src/client/streamableHttp.test.ts | 194 +++++++++++++++++ src/client/streamableHttp.ts | 351 ++++++++++++++++++++++++++++++ 2 files changed, 545 insertions(+) create mode 100644 src/client/streamableHttp.test.ts create mode 100644 src/client/streamableHttp.ts diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts new file mode 100644 index 00000000..e4ba7b1d --- /dev/null +++ b/src/client/streamableHttp.test.ts @@ -0,0 +1,194 @@ +import { StreamableHTTPClientTransport } from "./streamableHttp.js"; +import { JSONRPCMessage } from "../types.js"; + + +describe("StreamableHTTPClientTransport", () => { + let transport: StreamableHTTPClientTransport; + + beforeEach(() => { + transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp")); + jest.spyOn(global, "fetch"); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + it("should send JSON-RPC messages via POST", async () => { + const message: JSONRPCMessage = { + jsonrpc: "2.0", + method: "test", + params: {}, + id: "test-id" + }; + + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 202, + headers: new Headers(), + }); + + await transport.send(message); + + expect(global.fetch).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + method: "POST", + headers: expect.any(Headers), + body: JSON.stringify(message) + }) + ); + }); + + it("should send batch messages", async () => { + const messages: JSONRPCMessage[] = [ + { jsonrpc: "2.0", method: "test1", params: {}, id: "id1" }, + { jsonrpc: "2.0", method: "test2", params: {}, id: "id2" } + ]; + + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ "content-type": "text/event-stream" }), + body: null + }); + + await transport.send(messages); + + expect(global.fetch).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + method: "POST", + headers: expect.any(Headers), + body: JSON.stringify(messages) + }) + ); + }); + + it("should store session ID received during initialization", async () => { + const message: JSONRPCMessage = { + jsonrpc: "2.0", + method: "initialize", + params: { + clientInfo: { name: "test-client", version: "1.0" }, + protocolVersion: "2025-03-26" + }, + id: "init-id" + }; + + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ "mcp-session-id": "test-session-id" }), + }); + + await transport.send(message); + + // Send a second message that should include the session ID + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 202, + headers: new Headers() + }); + + await transport.send({ jsonrpc: "2.0", method: "test", params: {} } as JSONRPCMessage); + + // Check that second request included session ID header + const calls = (global.fetch as jest.Mock).mock.calls; + const lastCall = calls[calls.length - 1]; + expect(lastCall[1].headers).toBeDefined(); + expect(lastCall[1].headers.get("mcp-session-id")).toBe("test-session-id"); + }); + + it("should handle 404 response when session expires", async () => { + const message: JSONRPCMessage = { + jsonrpc: "2.0", + method: "test", + params: {}, + id: "test-id" + }; + + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: false, + status: 404, + statusText: "Not Found", + text: () => Promise.resolve("Session not found"), + headers: new Headers() + }); + + const errorSpy = jest.fn(); + transport.onerror = errorSpy; + + await expect(transport.send(message)).rejects.toThrow("Error POSTing to endpoint (HTTP 404)"); + expect(errorSpy).toHaveBeenCalled(); + }); + + it("should handle session termination via DELETE request", async () => { + // First set the session ID by mocking initialization + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ "mcp-session-id": "session-to-terminate" }), + }); + + await transport.send({ + jsonrpc: "2.0", + method: "initialize", + params: { + clientInfo: { name: "test-client", version: "1.0" }, + protocolVersion: "2025-03-26" + }, + id: "init-id" + } as JSONRPCMessage); + + // Mock DELETE request for session termination + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers() + }); + + const closeSpy = jest.fn(); + transport.onclose = closeSpy; + + await transport.close(); + + // Check that DELETE request was sent + const calls = (global.fetch as jest.Mock).mock.calls; + const lastCall = calls[calls.length - 1]; + expect(lastCall[1].method).toBe("DELETE"); + // The headers may be a plain object in tests + expect(lastCall[1].headers["mcp-session-id"]).toBe("session-to-terminate"); + + expect(closeSpy).toHaveBeenCalled(); + }); + + it("should handle non-streaming JSON response", async () => { + const message: JSONRPCMessage = { + jsonrpc: "2.0", + method: "test", + params: {}, + id: "test-id" + }; + + const responseMessage: JSONRPCMessage = { + jsonrpc: "2.0", + result: { success: true }, + id: "test-id" + }; + + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ "content-type": "application/json" }), + json: () => Promise.resolve(responseMessage) + }); + + const messageSpy = jest.fn(); + transport.onmessage = messageSpy; + + await transport.send(message); + + expect(messageSpy).toHaveBeenCalledWith(responseMessage); + }); +}); \ No newline at end of file diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts new file mode 100644 index 00000000..ffe1b5b4 --- /dev/null +++ b/src/client/streamableHttp.ts @@ -0,0 +1,351 @@ +import { Transport } from "../shared/transport.js"; +import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; +import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js"; +import { EventSource, type ErrorEvent, type EventSourceInit } from "eventsource"; + +export class StreamableHTTPError extends Error { + constructor( + public readonly code: number | undefined, + message: string | undefined, + public readonly event: ErrorEvent, + ) { + super(`Streamable HTTP error: ${message}`); + } +} + +/** + * Configuration options for the `StreamableHTTPClientTransport`. + */ +export type StreamableHTTPClientTransportOptions = { + /** + * An OAuth client provider to use for authentication. + * + * When an `authProvider` is specified and the connection is started: + * 1. The connection is attempted with any existing access token from the `authProvider`. + * 2. If the access token has expired, the `authProvider` is used to refresh the token. + * 3. If token refresh fails or no access token exists, and auth is required, `OAuthClientProvider.redirectToAuthorization` is called, and an `UnauthorizedError` will be thrown from `connect`/`start`. + * + * After the user has finished authorizing via their user agent, and is redirected back to the MCP client application, call `StreamableHTTPClientTransport.finishAuth` with the authorization code before retrying the connection. + * + * If an `authProvider` is not provided, and auth is required, an `UnauthorizedError` will be thrown. + * + * `UnauthorizedError` might also be thrown when sending any message over the transport, indicating that the session has expired, and needs to be re-authed and reconnected. + */ + authProvider?: OAuthClientProvider; + + /** + * Customizes the initial SSE request to the server (the request that begins the stream). + */ + eventSourceInit?: EventSourceInit; + + /** + * Customizes recurring POST requests to the server. + */ + requestInit?: RequestInit; +}; + +/** + * Client transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification. + * It will connect to a server using HTTP POST for sending messages and HTTP GET with Server-Sent Events + * for receiving messages. + */ +export class StreamableHTTPClientTransport implements Transport { + private _eventSource?: EventSource; + private _abortController?: AbortController; + private _url: URL; + private _eventSourceInit?: EventSourceInit; + private _requestInit?: RequestInit; + private _authProvider?: OAuthClientProvider; + private _sessionId?: string; + private _lastEventId?: string; + + onclose?: () => void; + onerror?: (error: Error) => void; + onmessage?: (message: JSONRPCMessage) => void; + + constructor( + url: URL, + opts?: StreamableHTTPClientTransportOptions, + ) { + this._url = url; + this._eventSourceInit = opts?.eventSourceInit; + this._requestInit = opts?.requestInit; + this._authProvider = opts?.authProvider; + } + + private async _authThenStart(): Promise { + if (!this._authProvider) { + throw new UnauthorizedError("No auth provider"); + } + + let result: AuthResult; + try { + result = await auth(this._authProvider, { serverUrl: this._url }); + } catch (error) { + this.onerror?.(error as Error); + throw error; + } + + if (result !== "AUTHORIZED") { + throw new UnauthorizedError(); + } + + return await this._startOrAuth(); + } + + private async _commonHeaders(): Promise { + const headers: HeadersInit = {}; + if (this._authProvider) { + const tokens = await this._authProvider.tokens(); + if (tokens) { + headers["Authorization"] = `Bearer ${tokens.access_token}`; + } + } + + if (this._sessionId) { + headers["mcp-session-id"] = this._sessionId; + } + + return headers; + } + + private _startOrAuth(): Promise { + return new Promise((resolve, reject) => { + // Try to open an SSE stream with GET to listen for server messages + // This is optional according to the spec - server may not support it + + const headers: HeadersInit = { + 'Accept': 'text/event-stream' + }; + + // Include Last-Event-ID header for resumable streams + if (this._lastEventId) { + headers['last-event-id'] = this._lastEventId; + } + + fetch(this._url, { + method: 'GET', + headers + }).then(response => { + if (response.status === 405) { + // Server doesn't support GET for SSE, which is allowed by the spec + // We'll rely on SSE responses to POST requests for communication + resolve(); + return; + } + + if (!response.ok) { + if (response.status === 401 && this._authProvider) { + // Need to authenticate + this._authThenStart().then(resolve, reject); + return; + } + + const error = new Error(`Failed to open SSE stream: ${response.status} ${response.statusText}`); + reject(error); + this.onerror?.(error); + return; + } + + // Successful connection, handle the SSE stream + this._handleSseStream(response.body); + resolve(); + }).catch(error => { + reject(error); + this.onerror?.(error); + }); + }); + } + + async start() { + if (this._eventSource) { + throw new Error( + "StreamableHTTPClientTransport already started! If using Client class, note that connect() calls start() automatically.", + ); + } + + return await this._startOrAuth(); + } + + /** + * Call this method after the user has finished authorizing via their user agent and is redirected back to the MCP client application. This will exchange the authorization code for an access token, enabling the next connection attempt to successfully auth. + */ + async finishAuth(authorizationCode: string): Promise { + if (!this._authProvider) { + throw new UnauthorizedError("No auth provider"); + } + + const result = await auth(this._authProvider, { serverUrl: this._url, authorizationCode }); + if (result !== "AUTHORIZED") { + throw new UnauthorizedError("Failed to authorize"); + } + } + + async close(): Promise { + this._abortController?.abort(); + this._eventSource?.close(); + + // If we have a session ID, send a DELETE request to explicitly terminate the session + if (this._sessionId) { + try { + const commonHeaders = await this._commonHeaders(); + const response = await fetch(this._url, { + method: "DELETE", + headers: commonHeaders, + signal: this._abortController?.signal, + }); + + if (!response.ok) { + // Server might respond with 405 if it doesn't support explicit session termination + // We don't throw an error in that case + if (response.status !== 405) { + const text = await response.text().catch(() => null); + throw new Error(`Error terminating session (HTTP ${response.status}): ${text}`); + } + } + } catch (error) { + // We still want to invoke onclose even if the session termination fails + this.onerror?.(error as Error); + } + } + + this.onclose?.(); + } + + async send(message: JSONRPCMessage | JSONRPCMessage[]): Promise { + try { + const commonHeaders = await this._commonHeaders(); + const headers = new Headers({ ...commonHeaders, ...this._requestInit?.headers }); + headers.set("content-type", "application/json"); + headers.set("accept", "application/json, text/event-stream"); + + const init = { + ...this._requestInit, + method: "POST", + headers, + body: JSON.stringify(message), + signal: this._abortController?.signal, + }; + + const response = await fetch(this._url, init); + + // Handle session ID received during initialization + const sessionId = response.headers.get("mcp-session-id"); + if (sessionId) { + this._sessionId = sessionId; + } + + if (!response.ok) { + if (response.status === 401 && this._authProvider) { + const result = await auth(this._authProvider, { serverUrl: this._url }); + if (result !== "AUTHORIZED") { + throw new UnauthorizedError(); + } + + // Purposely _not_ awaited, so we don't call onerror twice + return this.send(message); + } + + const text = await response.text().catch(() => null); + throw new Error( + `Error POSTing to endpoint (HTTP ${response.status}): ${text}`, + ); + } + + // If the response is 202 Accepted, there's no body to process + if (response.status === 202) { + return; + } + + // Determine if the message contains any requests + const containsRequests = Array.isArray(message) + ? message.some(msg => 'method' in msg && 'id' in msg) + : ('method' in message && 'id' in message); + + if (containsRequests) { + // Check if we got text/event-stream in response - if so, handle SSE stream + const contentType = response.headers.get("content-type"); + if (contentType?.includes("text/event-stream")) { + // Handle SSE stream to receive responses + this._handleSseStream(response.body); + } else if (contentType?.includes("application/json")) { + // For non-streaming servers, we might get direct JSON responses + const data = await response.json(); + const responseMessages = Array.isArray(data) + ? data.map(msg => JSONRPCMessageSchema.parse(msg)) + : [JSONRPCMessageSchema.parse(data)]; + + for (const msg of responseMessages) { + this.onmessage?.(msg); + } + } + } + } catch (error) { + this.onerror?.(error as Error); + throw error; + } + } + + private _handleSseStream(stream: ReadableStream | null): void { + if (!stream) { + return; + } + + // Set up stream handling for server-sent events + const reader = stream.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + const processStream = async () => { + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // Process SSE messages in the buffer + const events = buffer.split('\n\n'); + buffer = events.pop() || ''; + + for (const event of events) { + const lines = event.split('\n'); + let id: string | undefined; + let eventType: string | undefined; + let data: string | undefined; + + // Parse SSE message according to the format + for (const line of lines) { + if (line.startsWith('id:')) { + id = line.slice(3).trim(); + } else if (line.startsWith('event:')) { + eventType = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + data = line.slice(5).trim(); + } + } + + // Update last event ID if provided by server + if (id) { + this._lastEventId = id; + } + + // Handle message event + if (eventType === 'message' && data) { + try { + const message = JSONRPCMessageSchema.parse(JSON.parse(data)); + this.onmessage?.(message); + } catch (error) { + this.onerror?.(error as Error); + } + } + } + } + } catch (error) { + this.onerror?.(error as Error); + } + }; + + processStream(); + } +} \ No newline at end of file From c3b446d4245d706cb54ebb04322a9468a1342705 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 8 Apr 2025 11:11:09 +0100 Subject: [PATCH 2/8] multiple streams and initial stream with get --- src/client/streamableHttp.test.ts | 154 +++++++++++++++++++++++++++++- src/client/streamableHttp.ts | 145 ++++++++++++++++------------ 2 files changed, 235 insertions(+), 64 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index e4ba7b1d..ef35cacc 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -10,7 +10,8 @@ describe("StreamableHTTPClientTransport", () => { jest.spyOn(global, "fetch"); }); - afterEach(() => { + afterEach(async () => { + await transport.close().catch(() => { }); jest.clearAllMocks(); }); @@ -191,4 +192,155 @@ describe("StreamableHTTPClientTransport", () => { expect(messageSpy).toHaveBeenCalledWith(responseMessage); }); + + it("should attempt initial GET connection and handle 405 gracefully", async () => { + // Mock the server not supporting GET for SSE (returning 405) + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: false, + status: 405, + statusText: "Method Not Allowed" + }); + + await transport.start(); + + // Check that GET was attempted + expect(global.fetch).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + method: "GET", + headers: expect.any(Headers) + }) + ); + + // Verify transport still works after 405 + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 202, + headers: new Headers() + }); + + await transport.send({ jsonrpc: "2.0", method: "test", params: {} } as JSONRPCMessage); + expect(global.fetch).toHaveBeenCalledTimes(2); + }); + + it("should handle successful initial GET connection for SSE", async () => { + // Set up readable stream for SSE events + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + // Send a server notification via SSE + const event = 'event: message\ndata: {"jsonrpc": "2.0", "method": "serverNotification", "params": {}}\n\n'; + controller.enqueue(encoder.encode(event)); + } + }); + + // Mock successful GET connection + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ "content-type": "text/event-stream" }), + body: stream + }); + + const messageSpy = jest.fn(); + transport.onmessage = messageSpy; + + await transport.start(); + + // Give time for the SSE event to be processed + await new Promise(resolve => setTimeout(resolve, 50)); + + expect(messageSpy).toHaveBeenCalledWith( + expect.objectContaining({ + jsonrpc: "2.0", + method: "serverNotification", + params: {} + }) + ); + }); + + it("should handle multiple concurrent SSE streams", async () => { + // Mock two POST requests that return SSE streams + const makeStream = (id: string) => { + const encoder = new TextEncoder(); + return new ReadableStream({ + start(controller) { + const event = `event: message\ndata: {"jsonrpc": "2.0", "result": {"id": "${id}"}, "id": "${id}"}\n\n`; + controller.enqueue(encoder.encode(event)); + } + }); + }; + + (global.fetch as jest.Mock) + .mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ "content-type": "text/event-stream" }), + body: makeStream("request1") + }) + .mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ "content-type": "text/event-stream" }), + body: makeStream("request2") + }); + + const messageSpy = jest.fn(); + transport.onmessage = messageSpy; + + // Send two concurrent requests + await Promise.all([ + transport.send({ jsonrpc: "2.0", method: "test1", params: {}, id: "request1" }), + transport.send({ jsonrpc: "2.0", method: "test2", params: {}, id: "request2" }) + ]); + + // Give time for SSE processing + await new Promise(resolve => setTimeout(resolve, 50)); + + // Both streams should have delivered their messages + expect(messageSpy).toHaveBeenCalledTimes(2); + expect(messageSpy).toHaveBeenCalledWith( + expect.objectContaining({ result: { id: "request1" }, id: "request1" }) + ); + expect(messageSpy).toHaveBeenCalledWith( + expect.objectContaining({ result: { id: "request2" }, id: "request2" }) + ); + }); + + it("should include last-event-id header when resuming a broken connection", async () => { + // First make a successful connection that provides an event ID + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + const event = 'id: event-123\nevent: message\ndata: {"jsonrpc": "2.0", "method": "serverNotification", "params": {}}\n\n'; + controller.enqueue(encoder.encode(event)); + controller.close(); + } + }); + + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ "content-type": "text/event-stream" }), + body: stream + }); + + await transport.start(); + await new Promise(resolve => setTimeout(resolve, 50)); + + // Now simulate attempting to reconnect + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ "content-type": "text/event-stream" }), + body: null + }); + + await transport.start(); + + // Check that Last-Event-ID was included + const calls = (global.fetch as jest.Mock).mock.calls; + const lastCall = calls[calls.length - 1]; + expect(lastCall[1].headers.get("last-event-id")).toBe("event-123"); + }); }); \ No newline at end of file diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index ffe1b5b4..9fa65c85 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -1,7 +1,7 @@ import { Transport } from "../shared/transport.js"; import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js"; -import { EventSource, type ErrorEvent, type EventSourceInit } from "eventsource"; +import { type ErrorEvent } from "eventsource"; export class StreamableHTTPError extends Error { constructor( @@ -34,12 +34,7 @@ export type StreamableHTTPClientTransportOptions = { authProvider?: OAuthClientProvider; /** - * Customizes the initial SSE request to the server (the request that begins the stream). - */ - eventSourceInit?: EventSourceInit; - - /** - * Customizes recurring POST requests to the server. + * Customizes HTTP requests to the server. */ requestInit?: RequestInit; }; @@ -50,10 +45,9 @@ export type StreamableHTTPClientTransportOptions = { * for receiving messages. */ export class StreamableHTTPClientTransport implements Transport { - private _eventSource?: EventSource; + private _activeStreams: Map> = new Map(); private _abortController?: AbortController; private _url: URL; - private _eventSourceInit?: EventSourceInit; private _requestInit?: RequestInit; private _authProvider?: OAuthClientProvider; private _sessionId?: string; @@ -68,7 +62,6 @@ export class StreamableHTTPClientTransport implements Transport { opts?: StreamableHTTPClientTransportOptions, ) { this._url = url; - this._eventSourceInit = opts?.eventSourceInit; this._requestInit = opts?.requestInit; this._authProvider = opts?.authProvider; } @@ -109,61 +102,59 @@ export class StreamableHTTPClientTransport implements Transport { return headers; } - private _startOrAuth(): Promise { - return new Promise((resolve, reject) => { - // Try to open an SSE stream with GET to listen for server messages + private async _startOrAuth(): Promise { + try { + // Try to open an initial SSE stream with GET to listen for server messages // This is optional according to the spec - server may not support it - - const headers: HeadersInit = { - 'Accept': 'text/event-stream' - }; + const commonHeaders = await this._commonHeaders(); + const headers = new Headers(commonHeaders); + headers.set('Accept', 'text/event-stream'); // Include Last-Event-ID header for resumable streams if (this._lastEventId) { - headers['last-event-id'] = this._lastEventId; + headers.set('last-event-id', this._lastEventId); } - fetch(this._url, { + const response = await fetch(this._url, { method: 'GET', - headers - }).then(response => { - if (response.status === 405) { - // Server doesn't support GET for SSE, which is allowed by the spec - // We'll rely on SSE responses to POST requests for communication - resolve(); - return; - } + headers, + signal: this._abortController?.signal, + }); - if (!response.ok) { - if (response.status === 401 && this._authProvider) { - // Need to authenticate - this._authThenStart().then(resolve, reject); - return; - } + if (response.status === 405) { + // Server doesn't support GET for SSE, which is allowed by the spec + // We'll rely on SSE responses to POST requests for communication + return; + } - const error = new Error(`Failed to open SSE stream: ${response.status} ${response.statusText}`); - reject(error); - this.onerror?.(error); - return; + if (!response.ok) { + if (response.status === 401 && this._authProvider) { + // Need to authenticate + return await this._authThenStart(); } - // Successful connection, handle the SSE stream - this._handleSseStream(response.body); - resolve(); - }).catch(error => { - reject(error); + const error = new Error(`Failed to open SSE stream: ${response.status} ${response.statusText}`); this.onerror?.(error); - }); - }); + throw error; + } + + // Successful connection, handle the SSE stream as a standalone listener + const streamId = `initial-${Date.now()}`; + this._handleSseStream(response.body, streamId); + } catch (error) { + this.onerror?.(error as Error); + throw error; + } } async start() { - if (this._eventSource) { + if (this._activeStreams.size > 0) { throw new Error( "StreamableHTTPClientTransport already started! If using Client class, note that connect() calls start() automatically.", ); } + this._abortController = new AbortController(); return await this._startOrAuth(); } @@ -182,8 +173,18 @@ export class StreamableHTTPClientTransport implements Transport { } async close(): Promise { + // Close all active streams + for (const reader of this._activeStreams.values()) { + try { + reader.cancel(); + } catch (error) { + this.onerror?.(error as Error); + } + } + this._activeStreams.clear(); + + // Abort any pending requests this._abortController?.abort(); - this._eventSource?.close(); // If we have a session ID, send a DELETE request to explicitly terminate the session if (this._sessionId) { @@ -257,17 +258,25 @@ export class StreamableHTTPClientTransport implements Transport { return; } - // Determine if the message contains any requests - const containsRequests = Array.isArray(message) - ? message.some(msg => 'method' in msg && 'id' in msg) - : ('method' in message && 'id' in message); + // Get original message(s) for detecting request IDs + const messages = Array.isArray(message) ? message : [message]; - if (containsRequests) { - // Check if we got text/event-stream in response - if so, handle SSE stream - const contentType = response.headers.get("content-type"); + // Extract IDs from request messages for tracking responses + const requestIds = messages.filter(msg => 'method' in msg && 'id' in msg) + .map(msg => 'id' in msg ? msg.id : undefined) + .filter(id => id !== undefined); + + // If we have request IDs and an SSE response, create a unique stream ID + const hasRequests = requestIds.length > 0; + + // Check the response type + const contentType = response.headers.get("content-type"); + + if (hasRequests) { if (contentType?.includes("text/event-stream")) { - // Handle SSE stream to receive responses - this._handleSseStream(response.body); + // For streaming responses, create a unique stream ID based on request IDs + const streamId = `req-${requestIds.join('-')}-${Date.now()}`; + this._handleSseStream(response.body, streamId); } else if (contentType?.includes("application/json")) { // For non-streaming servers, we might get direct JSON responses const data = await response.json(); @@ -286,13 +295,14 @@ export class StreamableHTTPClientTransport implements Transport { } } - private _handleSseStream(stream: ReadableStream | null): void { + private _handleSseStream(stream: ReadableStream | null, streamId: string): void { if (!stream) { return; } // Set up stream handling for server-sent events const reader = stream.getReader(); + this._activeStreams.set(streamId, reader); const decoder = new TextDecoder(); let buffer = ''; @@ -300,7 +310,11 @@ export class StreamableHTTPClientTransport implements Transport { try { while (true) { const { done, value } = await reader.read(); - if (done) break; + if (done) { + // Stream closed by server + this._activeStreams.delete(streamId); + break; + } buffer += decoder.decode(value, { stream: true }); @@ -326,22 +340,27 @@ export class StreamableHTTPClientTransport implements Transport { } // Update last event ID if provided by server + // As per spec: the ID MUST be globally unique across all streams within that session if (id) { this._lastEventId = id; } // Handle message event - if (eventType === 'message' && data) { - try { - const message = JSONRPCMessageSchema.parse(JSON.parse(data)); - this.onmessage?.(message); - } catch (error) { - this.onerror?.(error as Error); + if (data) { + // Default event type is 'message' per SSE spec if not specified + if (!eventType || eventType === 'message') { + try { + const message = JSONRPCMessageSchema.parse(JSON.parse(data)); + this.onmessage?.(message); + } catch (error) { + this.onerror?.(error as Error); + } } } } } } catch (error) { + this._activeStreams.delete(streamId); this.onerror?.(error as Error); } }; From f1827ced109c78685ca3348c9deee5d74715904a Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 8 Apr 2025 15:04:07 +0100 Subject: [PATCH 3/8] handle not implemented get --- src/client/streamableHttp.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 9fa65c85..4ec89af8 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -121,7 +121,7 @@ export class StreamableHTTPClientTransport implements Transport { signal: this._abortController?.signal, }); - if (response.status === 405) { + if (response.status === 405 || response.status === 404) { // Server doesn't support GET for SSE, which is allowed by the spec // We'll rely on SSE responses to POST requests for communication return; From d4ad9764018f6e03fc25ca4d82f3d46748645295 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 8 Apr 2025 21:39:49 +0100 Subject: [PATCH 4/8] use EventSourceParserStream --- src/client/streamableHttp.test.ts | 40 -------------- src/client/streamableHttp.ts | 92 +++++++------------------------ 2 files changed, 21 insertions(+), 111 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index ef35cacc..8a1b9044 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -124,46 +124,6 @@ describe("StreamableHTTPClientTransport", () => { expect(errorSpy).toHaveBeenCalled(); }); - it("should handle session termination via DELETE request", async () => { - // First set the session ID by mocking initialization - (global.fetch as jest.Mock).mockResolvedValueOnce({ - ok: true, - status: 200, - headers: new Headers({ "mcp-session-id": "session-to-terminate" }), - }); - - await transport.send({ - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-id" - } as JSONRPCMessage); - - // Mock DELETE request for session termination - (global.fetch as jest.Mock).mockResolvedValueOnce({ - ok: true, - status: 200, - headers: new Headers() - }); - - const closeSpy = jest.fn(); - transport.onclose = closeSpy; - - await transport.close(); - - // Check that DELETE request was sent - const calls = (global.fetch as jest.Mock).mock.calls; - const lastCall = calls[calls.length - 1]; - expect(lastCall[1].method).toBe("DELETE"); - // The headers may be a plain object in tests - expect(lastCall[1].headers["mcp-session-id"]).toBe("session-to-terminate"); - - expect(closeSpy).toHaveBeenCalled(); - }); - it("should handle non-streaming JSON response", async () => { const message: JSONRPCMessage = { jsonrpc: "2.0", diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 4ec89af8..0a0d39e7 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -2,7 +2,7 @@ import { Transport } from "../shared/transport.js"; import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js"; import { type ErrorEvent } from "eventsource"; - +import { EventSourceParserStream } from 'eventsource-parser/stream'; export class StreamableHTTPError extends Error { constructor( public readonly code: number | undefined, @@ -45,7 +45,7 @@ export type StreamableHTTPClientTransportOptions = { * for receiving messages. */ export class StreamableHTTPClientTransport implements Transport { - private _activeStreams: Map> = new Map(); + private _activeStreams: Map> = new Map(); private _abortController?: AbortController; private _url: URL; private _requestInit?: RequestInit; @@ -186,30 +186,6 @@ export class StreamableHTTPClientTransport implements Transport { // Abort any pending requests this._abortController?.abort(); - // If we have a session ID, send a DELETE request to explicitly terminate the session - if (this._sessionId) { - try { - const commonHeaders = await this._commonHeaders(); - const response = await fetch(this._url, { - method: "DELETE", - headers: commonHeaders, - signal: this._abortController?.signal, - }); - - if (!response.ok) { - // Server might respond with 405 if it doesn't support explicit session termination - // We don't throw an error in that case - if (response.status !== 405) { - const text = await response.text().catch(() => null); - throw new Error(`Error terminating session (HTTP ${response.status}): ${text}`); - } - } - } catch (error) { - // We still want to invoke onclose even if the session termination fails - this.onerror?.(error as Error); - } - } - this.onclose?.(); } @@ -300,62 +276,36 @@ export class StreamableHTTPClientTransport implements Transport { return; } - // Set up stream handling for server-sent events - const reader = stream.getReader(); + // Create a pipeline: binary stream -> text decoder -> SSE parser + const eventStream = stream + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()); + + const reader = eventStream.getReader(); this._activeStreams.set(streamId, reader); - const decoder = new TextDecoder(); - let buffer = ''; const processStream = async () => { try { while (true) { - const { done, value } = await reader.read(); + const { done, value: event } = await reader.read(); if (done) { - // Stream closed by server this._activeStreams.delete(streamId); break; } - buffer += decoder.decode(value, { stream: true }); - - // Process SSE messages in the buffer - const events = buffer.split('\n\n'); - buffer = events.pop() || ''; - - for (const event of events) { - const lines = event.split('\n'); - let id: string | undefined; - let eventType: string | undefined; - let data: string | undefined; - - // Parse SSE message according to the format - for (const line of lines) { - if (line.startsWith('id:')) { - id = line.slice(3).trim(); - } else if (line.startsWith('event:')) { - eventType = line.slice(6).trim(); - } else if (line.startsWith('data:')) { - data = line.slice(5).trim(); - } - } - - // Update last event ID if provided by server - // As per spec: the ID MUST be globally unique across all streams within that session - if (id) { - this._lastEventId = id; - } + // Update last event ID if provided + if (event.id) { + this._lastEventId = event.id; + } - // Handle message event - if (data) { - // Default event type is 'message' per SSE spec if not specified - if (!eventType || eventType === 'message') { - try { - const message = JSONRPCMessageSchema.parse(JSON.parse(data)); - this.onmessage?.(message); - } catch (error) { - this.onerror?.(error as Error); - } - } + // Handle message events (default event type is undefined per docs) + // or explicit 'message' event type + if (!event.event || event.event === 'message') { + try { + const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); + this.onmessage?.(message); + } catch (error) { + this.onerror?.(error as Error); } } } From 15c10a58833ecce01958733611d4e30a963d07ef Mon Sep 17 00:00:00 2001 From: ihrpr Date: Tue, 8 Apr 2025 22:30:15 +0100 Subject: [PATCH 5/8] fix lint --- src/client/streamableHttp.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 0a0d39e7..38da5bcb 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -2,7 +2,7 @@ import { Transport } from "../shared/transport.js"; import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js"; import { type ErrorEvent } from "eventsource"; -import { EventSourceParserStream } from 'eventsource-parser/stream'; +import { EventSourceMessage, EventSourceParserStream } from 'eventsource-parser/stream'; export class StreamableHTTPError extends Error { constructor( public readonly code: number | undefined, @@ -45,7 +45,7 @@ export type StreamableHTTPClientTransportOptions = { * for receiving messages. */ export class StreamableHTTPClientTransport implements Transport { - private _activeStreams: Map> = new Map(); + private _activeStreams: Map> = new Map(); private _abortController?: AbortController; private _url: URL; private _requestInit?: RequestInit; From 1d18a57a171e99da2da75107d163d2d2dba628f9 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Wed, 9 Apr 2025 09:50:59 +0100 Subject: [PATCH 6/8] extract standalone get into a method which can be called by a client --- src/client/streamableHttp.test.ts | 28 ++++--- src/client/streamableHttp.ts | 122 ++++++++++++++++-------------- 2 files changed, 84 insertions(+), 66 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 8a1b9044..0dc582d4 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -161,7 +161,10 @@ describe("StreamableHTTPClientTransport", () => { statusText: "Method Not Allowed" }); + // We expect the 405 error to be caught and handled gracefully + // This should not throw an error that breaks the transport await transport.start(); + await expect(transport.openSseStream()).rejects.toThrow('Failed to open SSE stream: Method Not Allowed'); // Check that GET was attempted expect(global.fetch).toHaveBeenCalledWith( @@ -206,6 +209,7 @@ describe("StreamableHTTPClientTransport", () => { transport.onmessage = messageSpy; await transport.start(); + await transport.openSseStream(); // Give time for the SSE event to be processed await new Promise(resolve => setTimeout(resolve, 50)); @@ -233,7 +237,7 @@ describe("StreamableHTTPClientTransport", () => { (global.fetch as jest.Mock) .mockResolvedValueOnce({ - ok: true, + ok: true, status: 200, headers: new Headers({ "content-type": "text/event-stream" }), body: makeStream("request1") @@ -255,16 +259,21 @@ describe("StreamableHTTPClientTransport", () => { ]); // Give time for SSE processing - await new Promise(resolve => setTimeout(resolve, 50)); + await new Promise(resolve => setTimeout(resolve, 100)); // Both streams should have delivered their messages expect(messageSpy).toHaveBeenCalledTimes(2); - expect(messageSpy).toHaveBeenCalledWith( - expect.objectContaining({ result: { id: "request1" }, id: "request1" }) - ); - expect(messageSpy).toHaveBeenCalledWith( - expect.objectContaining({ result: { id: "request2" }, id: "request2" }) - ); + + // Verify received messages without assuming specific order + expect(messageSpy.mock.calls.some(call => { + const msg = call[0]; + return msg.id === "request1" && msg.result?.id === "request1"; + })).toBe(true); + + expect(messageSpy.mock.calls.some(call => { + const msg = call[0]; + return msg.id === "request2" && msg.result?.id === "request2"; + })).toBe(true); }); it("should include last-event-id header when resuming a broken connection", async () => { @@ -286,6 +295,7 @@ describe("StreamableHTTPClientTransport", () => { }); await transport.start(); + await transport.openSseStream(); await new Promise(resolve => setTimeout(resolve, 50)); // Now simulate attempting to reconnect @@ -296,7 +306,7 @@ describe("StreamableHTTPClientTransport", () => { body: null }); - await transport.start(); + await transport.openSseStream(); // Check that Last-Event-ID was included const calls = (global.fetch as jest.Mock).mock.calls; diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 38da5bcb..82cd61ec 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -1,13 +1,11 @@ import { Transport } from "../shared/transport.js"; import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js"; -import { type ErrorEvent } from "eventsource"; import { EventSourceMessage, EventSourceParserStream } from 'eventsource-parser/stream'; export class StreamableHTTPError extends Error { constructor( public readonly code: number | undefined, message: string | undefined, - public readonly event: ErrorEvent, ) { super(`Streamable HTTP error: ${message}`); } @@ -83,7 +81,7 @@ export class StreamableHTTPClientTransport implements Transport { throw new UnauthorizedError(); } - return await this._startOrAuth(); + return await this._startOrAuthStandaloneSSE(); } private async _commonHeaders(): Promise { @@ -102,7 +100,7 @@ export class StreamableHTTPClientTransport implements Transport { return headers; } - private async _startOrAuth(): Promise { + private async _startOrAuthStandaloneSSE(): Promise { try { // Try to open an initial SSE stream with GET to listen for server messages // This is optional according to the spec - server may not support it @@ -121,25 +119,22 @@ export class StreamableHTTPClientTransport implements Transport { signal: this._abortController?.signal, }); - if (response.status === 405 || response.status === 404) { - // Server doesn't support GET for SSE, which is allowed by the spec - // We'll rely on SSE responses to POST requests for communication - return; - } - if (!response.ok) { if (response.status === 401 && this._authProvider) { // Need to authenticate return await this._authThenStart(); } - const error = new Error(`Failed to open SSE stream: ${response.status} ${response.statusText}`); + const error = new StreamableHTTPError( + response.status, + `Failed to open SSE stream: ${response.statusText}`, + ); this.onerror?.(error); throw error; } // Successful connection, handle the SSE stream as a standalone listener - const streamId = `initial-${Date.now()}`; + const streamId = `standalone-sse-${Date.now()}`; this._handleSseStream(response.body, streamId); } catch (error) { this.onerror?.(error as Error); @@ -147,6 +142,53 @@ export class StreamableHTTPClientTransport implements Transport { } } + private _handleSseStream(stream: ReadableStream | null, streamId: string): void { + if (!stream) { + return; + } + + // Create a pipeline: binary stream -> text decoder -> SSE parser + const eventStream = stream + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()); + + const reader = eventStream.getReader(); + this._activeStreams.set(streamId, reader); + + const processStream = async () => { + try { + while (true) { + const { done, value: event } = await reader.read(); + if (done) { + this._activeStreams.delete(streamId); + break; + } + + // Update last event ID if provided + if (event.id) { + this._lastEventId = event.id; + } + + // Handle message events (default event type is undefined per docs) + // or explicit 'message' event type + if (!event.event || event.event === 'message') { + try { + const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); + this.onmessage?.(message); + } catch (error) { + this.onerror?.(error as Error); + } + } + } + } catch (error) { + this._activeStreams.delete(streamId); + this.onerror?.(error as Error); + } + }; + + processStream(); + } + async start() { if (this._activeStreams.size > 0) { throw new Error( @@ -155,7 +197,6 @@ export class StreamableHTTPClientTransport implements Transport { } this._abortController = new AbortController(); - return await this._startOrAuth(); } /** @@ -271,50 +312,17 @@ export class StreamableHTTPClientTransport implements Transport { } } - private _handleSseStream(stream: ReadableStream | null, streamId: string): void { - if (!stream) { - return; + /** + * Opens SSE stream to receive messages from the server. + * + * This allows the server to push messages to the client without requiring the client + * to first send a request via HTTP POST. Some servers may not support this feature. + * If authentication is required but fails, this method will throw an UnauthorizedError. + */ + async openSseStream(): Promise { + if (!this._abortController) { + this._abortController = new AbortController(); } - - // Create a pipeline: binary stream -> text decoder -> SSE parser - const eventStream = stream - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new EventSourceParserStream()); - - const reader = eventStream.getReader(); - this._activeStreams.set(streamId, reader); - - const processStream = async () => { - try { - while (true) { - const { done, value: event } = await reader.read(); - if (done) { - this._activeStreams.delete(streamId); - break; - } - - // Update last event ID if provided - if (event.id) { - this._lastEventId = event.id; - } - - // Handle message events (default event type is undefined per docs) - // or explicit 'message' event type - if (!event.event || event.event === 'message') { - try { - const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); - this.onmessage?.(message); - } catch (error) { - this.onerror?.(error as Error); - } - } - } - } catch (error) { - this._activeStreams.delete(streamId); - this.onerror?.(error as Error); - } - }; - - processStream(); + await this._startOrAuthStandaloneSSE(); } } \ No newline at end of file From 18674007e55000b9a6fffd86729bd4034249cdb4 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Wed, 9 Apr 2025 11:13:28 +0100 Subject: [PATCH 7/8] remove the _activeStreams map --- src/client/streamableHttp.ts | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 82cd61ec..1420f7be 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -43,7 +43,6 @@ export type StreamableHTTPClientTransportOptions = { * for receiving messages. */ export class StreamableHTTPClientTransport implements Transport { - private _activeStreams: Map> = new Map(); private _abortController?: AbortController; private _url: URL; private _requestInit?: RequestInit; @@ -134,33 +133,28 @@ export class StreamableHTTPClientTransport implements Transport { } // Successful connection, handle the SSE stream as a standalone listener - const streamId = `standalone-sse-${Date.now()}`; - this._handleSseStream(response.body, streamId); + this._handleSseStream(response.body); } catch (error) { this.onerror?.(error as Error); throw error; } } - private _handleSseStream(stream: ReadableStream | null, streamId: string): void { + private _handleSseStream(stream: ReadableStream | null): void { if (!stream) { return; } - // Create a pipeline: binary stream -> text decoder -> SSE parser const eventStream = stream .pipeThrough(new TextDecoderStream()) .pipeThrough(new EventSourceParserStream()); const reader = eventStream.getReader(); - this._activeStreams.set(streamId, reader); - const processStream = async () => { try { while (true) { const { done, value: event } = await reader.read(); if (done) { - this._activeStreams.delete(streamId); break; } @@ -181,7 +175,6 @@ export class StreamableHTTPClientTransport implements Transport { } } } catch (error) { - this._activeStreams.delete(streamId); this.onerror?.(error as Error); } }; @@ -190,7 +183,7 @@ export class StreamableHTTPClientTransport implements Transport { } async start() { - if (this._activeStreams.size > 0) { + if (this._abortController) { throw new Error( "StreamableHTTPClientTransport already started! If using Client class, note that connect() calls start() automatically.", ); @@ -214,16 +207,6 @@ export class StreamableHTTPClientTransport implements Transport { } async close(): Promise { - // Close all active streams - for (const reader of this._activeStreams.values()) { - try { - reader.cancel(); - } catch (error) { - this.onerror?.(error as Error); - } - } - this._activeStreams.clear(); - // Abort any pending requests this._abortController?.abort(); @@ -292,8 +275,7 @@ export class StreamableHTTPClientTransport implements Transport { if (hasRequests) { if (contentType?.includes("text/event-stream")) { // For streaming responses, create a unique stream ID based on request IDs - const streamId = `req-${requestIds.join('-')}-${Date.now()}`; - this._handleSseStream(response.body, streamId); + this._handleSseStream(response.body); } else if (contentType?.includes("application/json")) { // For non-streaming servers, we might get direct JSON responses const data = await response.json(); @@ -321,7 +303,9 @@ export class StreamableHTTPClientTransport implements Transport { */ async openSseStream(): Promise { if (!this._abortController) { - this._abortController = new AbortController(); + throw new Error( + "StreamableHTTPClientTransport not started! Call connect() before openSseStream().", + ); } await this._startOrAuthStandaloneSSE(); } From 26929fa9b7862f04a6a0980782fa6adb35b5561e Mon Sep 17 00:00:00 2001 From: ihrpr Date: Wed, 9 Apr 2025 11:17:34 +0100 Subject: [PATCH 8/8] fix lint --- src/client/streamableHttp.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 1420f7be..0c667e35 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -1,7 +1,7 @@ import { Transport } from "../shared/transport.js"; import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js"; -import { EventSourceMessage, EventSourceParserStream } from 'eventsource-parser/stream'; +import { EventSourceParserStream } from 'eventsource-parser/stream'; export class StreamableHTTPError extends Error { constructor( public readonly code: number | undefined,