From b37a2e11ab2639c5d85c76a13f9a6332b41949ab Mon Sep 17 00:00:00 2001 From: ihrpr Date: Wed, 9 Apr 2025 21:01:35 +0100 Subject: [PATCH 1/7] support for server returning json -- not stream --- src/examples/README.md | 28 +++ src/examples/client/simpleStreamableHttp.ts | 63 ++++--- .../server/jsonResponseStreamableHttp.ts | 119 +++++++++++++ src/server/.streamableHttp.test.ts.swp | Bin 0 -> 16384 bytes src/server/streamableHttp.test.ts | 162 ++++++++++++++++-- src/server/streamableHttp.ts | 129 ++++++++++---- 6 files changed, 431 insertions(+), 70 deletions(-) create mode 100644 src/examples/server/jsonResponseStreamableHttp.ts create mode 100644 src/server/.streamableHttp.test.ts.swp diff --git a/src/examples/README.md b/src/examples/README.md index 6e53fdec..937159c3 100644 --- a/src/examples/README.md +++ b/src/examples/README.md @@ -6,6 +6,34 @@ This directory contains example implementations of MCP clients and servers using Multi node with stete management example will be added soon after we add support. +### Server with JSON response mode (`server/jsonResponseStreamableHttp.ts`) + +A simple MCP server that uses the Streamable HTTP transport with JSON response mode enabled, implemented with Express. The server provides a simple `greet` tool that returns a greeting for a name. + +#### Running the server + +```bash +npx tsx src/examples/server/jsonResponseStreamableHttp.ts +``` + +The server will start on port 3000. You can test the initialization and tool calling: + +```bash +# Initialize the server and get the session ID from headers +SESSION_ID=$(curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" \ + -d '{"jsonrpc":"2.0","method":"initialize","params":{"capabilities":{}},"id":"1"}' \ + -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r') +echo "Session ID: $SESSION_ID" + +# Call the greet tool using the saved session ID +curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" \ + -H "mcp-session-id: $SESSION_ID" \ + -d '{"jsonrpc":"2.0","method":"mcp.call_tool","params":{"name":"greet","arguments":{"name":"World"}},"id":"2"}' \ + http://localhost:3000/mcp +``` + +Note that in this example, we're using plain JSON response mode by setting `Accept: application/json` header. + ### Server (`server/simpleStreamableHttp.ts`) A simple MCP server that uses the Streamable HTTP transport, implemented with Express. The server provides: diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index 9bf43ce8..589c1f41 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -15,9 +15,9 @@ import { async function main(): Promise { // Create a new client with streamable HTTP transport - const client = new Client({ - name: 'example-client', - version: '1.0.0' + const client = new Client({ + name: 'example-client', + version: '1.0.0' }); const transport = new StreamableHTTPClientTransport( new URL('http://localhost:3000/mcp') @@ -27,7 +27,6 @@ async function main(): Promise { await client.connect(transport); console.log('Connected to MCP server'); - // List available tools const toolsRequest: ListToolsRequest = { method: 'tools/list', @@ -48,32 +47,44 @@ async function main(): Promise { console.log('Greeting result:', greetResult.content[0].text); // List available prompts - const promptsRequest: ListPromptsRequest = { - method: 'prompts/list', - params: {} - }; - const promptsResult = await client.request(promptsRequest, ListPromptsResultSchema); - console.log('Available prompts:', promptsResult.prompts); + try { + const promptsRequest: ListPromptsRequest = { + method: 'prompts/list', + params: {} + }; + const promptsResult = await client.request(promptsRequest, ListPromptsResultSchema); + console.log('Available prompts:', promptsResult.prompts); + } catch (error) { + console.log(`Prompts not supported by this server (${error})`); + } // Get a prompt - const promptRequest: GetPromptRequest = { - method: 'prompts/get', - params: { - name: 'greeting-template', - arguments: { name: 'MCP User' } - } - }; - const promptResult = await client.request(promptRequest, GetPromptResultSchema); - console.log('Prompt template:', promptResult.messages[0].content.text); + try { + const promptRequest: GetPromptRequest = { + method: 'prompts/get', + params: { + name: 'greeting-template', + arguments: { name: 'MCP User' } + } + }; + const promptResult = await client.request(promptRequest, GetPromptResultSchema); + console.log('Prompt template:', promptResult.messages[0].content.text); + } catch (error) { + console.log(`Prompt retrieval not supported by this server (${error})`); + } // List available resources - const resourcesRequest: ListResourcesRequest = { - method: 'resources/list', - params: {} - }; - const resourcesResult = await client.request(resourcesRequest, ListResourcesResultSchema); - console.log('Available resources:', resourcesResult.resources); - + try { + const resourcesRequest: ListResourcesRequest = { + method: 'resources/list', + params: {} + }; + const resourcesResult = await client.request(resourcesRequest, ListResourcesResultSchema); + console.log('Available resources:', resourcesResult.resources); + } catch (error) { + console.log(`Resources not supported by this server (${error})`); + } + // Close the connection await client.close(); } diff --git a/src/examples/server/jsonResponseStreamableHttp.ts b/src/examples/server/jsonResponseStreamableHttp.ts new file mode 100644 index 00000000..2e83b54a --- /dev/null +++ b/src/examples/server/jsonResponseStreamableHttp.ts @@ -0,0 +1,119 @@ +import express, { Request, Response } from 'express'; +import { randomUUID } from 'node:crypto'; +import { McpServer } from '../../server/mcp.js'; +import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; +import { z } from 'zod'; +import { CallToolResult } from '../../types.js'; + +// Create an MCP server with implementation details +const server = new McpServer({ + name: 'json-response-streamable-http-server', + version: '1.0.0', +}); + +// Register a simple tool that returns a greeting +server.tool( + 'greet', + 'A simple greeting tool', + { + name: z.string().describe('Name to greet'), + }, + async ({ name }): Promise => { + return { + content: [ + { + type: 'text', + text: `Hello, ${name}!`, + }, + ], + }; + } +); + +const app = express(); +app.use(express.json()); + +// Map to store transports by session ID +const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; + +app.post('/mcp', async (req: Request, res: Response) => { + console.log('Received MCP request:', req.body); + try { + // Check for existing session ID + const sessionId = req.headers['mcp-session-id'] as string | undefined; + let transport: StreamableHTTPServerTransport; + + if (sessionId && transports[sessionId]) { + // Reuse existing transport + transport = transports[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + // New initialization request - use JSON response mode + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + enableJsonResponse: true, // Enable JSON response mode + }); + + // Connect the transport to the MCP server BEFORE handling the request + await server.connect(transport); + + // After handling the request, if we get a session ID back, store the transport + await transport.handleRequest(req, res, req.body); + + // Store the transport by session ID for future requests + if (transport.sessionId) { + transports[transport.sessionId] = transport; + } + return; // Already handled + } else { + // Invalid request - no session ID or not initialization request + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No valid session ID provided', + }, + id: null, + }); + return; + } + + // Handle the request with existing transport - no need to reconnect + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error('Error handling MCP request:', error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { + code: -32603, + message: 'Internal server error', + }, + id: null, + }); + } + } +}); + +// Helper function to detect initialize requests +function isInitializeRequest(body: unknown): boolean { + if (Array.isArray(body)) { + return body.some(msg => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize'); + } + return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize'; +} + +// Start the server +const PORT = 3000; +app.listen(PORT, () => { + console.log(`MCP Streamable HTTP Server with JSON responses listening on port ${PORT}`); + console.log(`Server is running. Press Ctrl+C to stop.`); + console.log(`Initialize with: curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d '{"jsonrpc":"2.0","method":"initialize","params":{"capabilities":{}},"id":"1"}' http://localhost:${PORT}/mcp`); + console.log(`Then call tool with: curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -H "mcp-session-id: YOUR_SESSION_ID" -d '{"jsonrpc":"2.0","method":"mcp.call_tool","params":{"name":"greet","arguments":{"name":"World"}},"id":"2"}' http://localhost:${PORT}/mcp`); +}); + +// Handle server shutdown +process.on('SIGINT', async () => { + console.log('Shutting down server...'); + await server.close(); + process.exit(0); +}); \ No newline at end of file diff --git a/src/server/.streamableHttp.test.ts.swp b/src/server/.streamableHttp.test.ts.swp new file mode 100644 index 0000000000000000000000000000000000000000..4ab20fa3582429b76d0a1f7da19f2c2ec77faf7f GIT binary patch literal 16384 zcmeI3%a0tz9mg9&c=~~Yxqw89;lK>c^z5u5h_SI@*T62);?+KgWXnOl-8DPC?x}8f z*X(*@1{5Xv0}{gl@sL6ULXH%H00B`BNF)l8LZlpkLoP^>B7q2r0(JrkIN$23>gj2E zy@nHzYU!gL_phGcs`}NV%S&VTOwW*;?Use-Qp>va52u!Ie4}n{{j+8Hj5*@W)F z1$2t@$mTTRHcv{^G%t+;MuD?cV5_xryfr35UsJi7T=uQSvyEuN83l|2MggOMQNSo* z6fg=H1&jh;0|j`r&3XX!I9Ju>KK1vU!r#;C3Dx^S!TzbT`|ACB1^Xw;9;x?>Dg&L* zf0aE}_TLul|50{*3UHGuu#W$+vL8|LPZaDQDf_CjU-=FBL&yKOvNL7>Nx}XvW!EPg zrwjH|%HC7)SI!$v|D>`9%Klct{-Lt3Df{F%N8^8>?B7=QUlr`?wfSQdFbWt2i~>dh zqkvJsC}0#Y3K#{90!D#S1su<^&WB~2*e~Pzzuy19y4|ww2eY6GKHX+ne+93AUjhzX zFaxdwTfnKUmh}esHFycU2p$AS!2xg^*bDZ6Yr(r)5FVTWKLpN zfBy}50vrJya0o1d1#l@ig)@TRgJ;2U@B=Ubt^nJ?M>vD{2lzX94Lk!L10lEp)W9cb z*AKuy!OP&s;78yP*a@}+(e_&b9+E!2&^Unys!4BC7HavNQ-;3cd9x3vNVNQ=CsJOh-kd{iF;C7EnqGgX&@?u6#?DWd3IP3+v5(4yD4bkP;`N!?+yaA`B^{)=ik$vSgM zwMK5)gVfVRhW#to(FV1gToqh`Y9#1k(6r%u=mW!ktX@xhx?+UL;bDhMqC}JEJPwM( zmeHt0J>z3zI=C83BLg`bq+%dVIjeMnQa0%X0rkG?^KMn8^F@YXF^-*;Uk=)RwBs}C z3GbnD*>vD;G;WJoZQvz3PLK@UL@`pae2dKcObLe#cm@X{os;TXUpX)FfA1tlnr}l- zk@E-3l+hSo6&@iSw`6)B@%m!Q7&?*EIef+`5aOk`9hYdeGOr@fI1IB(4^9)obKXOD zy%lls(G~OVZ9yO$c_Qaur^9J{yW@7NC6RPw>DpV`BV_(Ho4~RuM7Ze8gO>SK=|<8y z1YM>uPfGuj_cVoWCD##p<|FQuqK0u>iGUS=7xoqxr}tIWn94jO(q?px%nxsf zE-cK>r`JXo^ui%^iAxkK4O$c_P|FatLTD2?-;3#RXp#kPlT>~oJN0(UEc7yBkB*R) zt9|Ao?Xq&Mf2OkMJHh~^iYf2M41Z*yM7{$|H>A%Fvv7@xpPLkups#YVHlxrt4clQ_ z*5wT?wCD3&@N@+@mzHIr>!TaWZY352S&|!s!TNFYKNs@326Rh?Q;{ z!0(Xi$V()eOYL3}9kMC^LI$EfT2z^%8i{kN0AhR(YVD#9X|f{Mz4@`0=`x3%KQrr8-g~!Kl6>g-Gx>xpE4z3zel(Fn z?dpZznkAFgtc#oVD1q4jKaaii6!zR=|F6F4jup=I0z1aYr#3-9qjwx z1W$t=*bQ3XH1_u=!H3{=@C5i7SOwRE>wwt*KM4+kc`yS+Iqn4JF$x$3i~>dhqkvJs zC}0#Y3K#{xyb9o0hhU4SPj~cReQaF~nc5hOZFJ3MA^t~i*&#H(U5IZ&6WB58Q&qW@ z-YBKgRB+gpP_HZz>;&iK(SARngm)>CVbgCwIKxZ0*!YH>)T9cN+P>hybzP;uL0M+8 zx0j`eX(L|}&yR7;AFS(avx6o(WIg#+3LEedRkcoXn?7Pf(CnBdJAxm!(FO { await transport.handleRequest(req, response); // Verify that the request is tracked in the SSE map - expect(transport["_sseResponseMapping"].size).toBe(2); - expect(transport["_sseResponseMapping"].has("cleanup-test")).toBe(true); + expect(transport["_responseMapping"].size).toBe(2); + expect(transport["_responseMapping"].has("cleanup-test")).toBe(true); // Send a response await transport.send({ @@ -1019,8 +1019,8 @@ describe("StreamableHTTPServerTransport", () => { }); // Verify that the mapping was cleaned up - expect(transport["_sseResponseMapping"].size).toBe(1); - expect(transport["_sseResponseMapping"].has("cleanup-test")).toBe(false); + expect(transport["_responseMapping"].size).toBe(1); + expect(transport["_responseMapping"].has("cleanup-test")).toBe(false); }); it("should clean up connection tracking when client disconnects", async () => { @@ -1052,17 +1052,17 @@ describe("StreamableHTTPServerTransport", () => { await transport.handleRequest(req, response); // Both requests should be mapped to the same response - expect(transport["_sseResponseMapping"].size).toBe(3); - expect(transport["_sseResponseMapping"].get("req1")).toBe(response); - expect(transport["_sseResponseMapping"].get("req2")).toBe(response); + expect(transport["_responseMapping"].size).toBe(3); + expect(transport["_responseMapping"].get("req1")).toBe(response); + expect(transport["_responseMapping"].get("req2")).toBe(response); // Simulate client disconnect by triggering the stored callback if (closeCallback) closeCallback(); // All entries using this response should be removed - expect(transport["_sseResponseMapping"].size).toBe(1); - expect(transport["_sseResponseMapping"].has("req1")).toBe(false); - expect(transport["_sseResponseMapping"].has("req2")).toBe(false); + expect(transport["_responseMapping"].size).toBe(1); + expect(transport["_responseMapping"].has("req1")).toBe(false); + expect(transport["_responseMapping"].has("req2")).toBe(false); }); }); @@ -1214,6 +1214,148 @@ describe("StreamableHTTPServerTransport", () => { }); }); + describe("JSON Response Mode", () => { + let jsonResponseTransport: StreamableHTTPServerTransport; + let mockResponse: jest.Mocked; + + beforeEach(async () => { + jsonResponseTransport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + enableJsonResponse: true, + }); + + // Initialize the transport + const initMessage: JSONRPCMessage = { + jsonrpc: "2.0", + method: "initialize", + params: { + clientInfo: { name: "test-client", version: "1.0" }, + protocolVersion: "2025-03-26" + }, + id: "init-1", + }; + + const initReq = createMockRequest({ + method: "POST", + headers: { + "content-type": "application/json", + "accept": "application/json, text/event-stream", + }, + body: JSON.stringify(initMessage), + }); + + mockResponse = createMockResponse(); + await jsonResponseTransport.handleRequest(initReq, mockResponse); + mockResponse = createMockResponse(); // Reset for tests + }); + + it("should return JSON response for a single request", async () => { + const requestMessage: JSONRPCMessage = { + jsonrpc: "2.0", + method: "test", + params: {}, + id: "test-req-id", + }; + + const req = createMockRequest({ + method: "POST", + headers: { + "content-type": "application/json", + "accept": "application/json, text/event-stream", + "mcp-session-id": jsonResponseTransport.sessionId, + }, + body: JSON.stringify(requestMessage), + }); + + // Mock immediate response + jsonResponseTransport.onmessage = (message) => { + if ('method' in message && 'id' in message) { + const responseMessage: JSONRPCMessage = { + jsonrpc: "2.0", + result: { value: "test-result" }, + id: message.id, + }; + setTimeout(() => { + void jsonResponseTransport.send(responseMessage); + }, 10); + } + }; + + await jsonResponseTransport.handleRequest(req, mockResponse); + + // Wait for all promises to resolve + await new Promise(resolve => setTimeout(resolve, 50)); + + // Should respond with application/json header + expect(mockResponse.writeHead).toHaveBeenCalledWith( + 200, + expect.objectContaining({ + "Content-Type": "application/json", + }) + ); + + // Should return the response as JSON + const expectedResponse = { + jsonrpc: "2.0", + result: { value: "test-result" }, + id: "test-req-id", + }; + + expect(mockResponse.end).toHaveBeenCalledWith(JSON.stringify(expectedResponse)); + }); + + it("should return JSON response for batch requests", async () => { + const batchMessages: JSONRPCMessage[] = [ + { jsonrpc: "2.0", method: "test1", params: {}, id: "req1" }, + { jsonrpc: "2.0", method: "test2", params: {}, id: "req2" }, + ]; + + const req = createMockRequest({ + method: "POST", + headers: { + "content-type": "application/json", + "accept": "application/json, text/event-stream", + "mcp-session-id": jsonResponseTransport.sessionId, + }, + body: JSON.stringify(batchMessages), + }); + + // Mock sequential responses - send them in specific order + jsonResponseTransport.onmessage = (message) => { + if ('method' in message && 'id' in message) { + const responseMessage: JSONRPCMessage = { + jsonrpc: "2.0", + result: { value: `result-for-${message.id}` }, + id: message.id, + }; + // Send responses in order - req1 first, then req2 + if (message.id === 'req1') { + void jsonResponseTransport.send(responseMessage); + } else { + // Add a tiny delay to req2 to ensure it's processed after req1 + setTimeout(() => void jsonResponseTransport.send(responseMessage), 5); + } + } + }; + + await jsonResponseTransport.handleRequest(req, mockResponse); + + // Wait for all promises to resolve - give it enough time + await new Promise(resolve => setTimeout(resolve, 100)); + + // Should respond with application/json header + expect(mockResponse.writeHead).toHaveBeenCalledWith( + 200, + expect.objectContaining({ + "Content-Type": "application/json", + }) + ); + + expect(mockResponse.end).toHaveBeenCalled(); + expect(mockResponse.end.mock.calls[0][0]).toContain("result-for-req2"); + }); + }); + describe("Handling Pre-Parsed Body", () => { beforeEach(async () => { // Initialize the transport diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index b0fcce6d..3fd96307 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -18,8 +18,12 @@ export interface StreamableHTTPServerTransportOptions { */ sessionIdGenerator: () => string | undefined; - - + /** + * If true, the server will return JSON responses instead of starting an SSE stream. + * This can be useful for simple request/response scenarios without streaming. + * Default is false (SSE streams are preferred). + */ + enableJsonResponse?: boolean; } /** @@ -60,8 +64,11 @@ export class StreamableHTTPServerTransport implements Transport { // when sessionId is not set (undefined), it means the transport is in stateless mode private sessionIdGenerator: () => string | undefined; private _started: boolean = false; - private _sseResponseMapping: Map = new Map(); + private _responseMapping: Map = new Map(); + private _requestResponseMap: Map = new Map(); private _initialized: boolean = false; + private _enableJsonResponse: boolean = false; + sessionId?: string | undefined; onclose?: () => void; @@ -70,6 +77,7 @@ export class StreamableHTTPServerTransport implements Transport { constructor(options: StreamableHTTPServerTransportOptions) { this.sessionIdGenerator = options.sessionIdGenerator; + this._enableJsonResponse = options.enableJsonResponse ?? false; } /** @@ -221,33 +229,37 @@ export class StreamableHTTPServerTransport implements Transport { this.onmessage?.(message); } } else if (hasRequests) { - const headers: Record = { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", - Connection: "keep-alive", - }; - - // After initialization, always include the session ID if we have one - if (this.sessionId !== undefined) { - headers["mcp-session-id"] = this.sessionId; - } - - res.writeHead(200, headers); + // The default behavior is to use SSE streaming + // but in some cases server will return JSON responses + if (!this._enableJsonResponse) { + const headers: Record = { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }; + + // After initialization, always include the session ID if we have one + if (this.sessionId !== undefined) { + headers["mcp-session-id"] = this.sessionId; + } + res.writeHead(200, headers); + } // Store the response for this request to send messages back through this connection // We need to track by request ID to maintain the connection for (const message of messages) { if ('method' in message && 'id' in message) { - this._sseResponseMapping.set(message.id, res); + this._responseMapping.set(message.id, res); } } // Set up close handler for client disconnects res.on("close", () => { // Remove all entries that reference this response - for (const [id, storedRes] of this._sseResponseMapping.entries()) { + for (const [id, storedRes] of this._responseMapping.entries()) { if (storedRes === res) { - this._sseResponseMapping.delete(id); + this._responseMapping.delete(id); + this._requestResponseMap.delete(id); } } }); @@ -350,10 +362,14 @@ export class StreamableHTTPServerTransport implements Transport { async close(): Promise { // Close all SSE connections - this._sseResponseMapping.forEach((response) => { + this._responseMapping.forEach((response) => { response.end(); }); - this._sseResponseMapping.clear(); + this._responseMapping.clear(); + + // Clear any pending responses + this._requestResponseMap.clear(); + this.onclose?.(); } @@ -367,24 +383,69 @@ export class StreamableHTTPServerTransport implements Transport { throw new Error("No request ID provided for the message"); } - const sseResponse = this._sseResponseMapping.get(requestId); - if (!sseResponse) { - throw new Error(`No SSE connection established for request ID: ${String(requestId)}`); + // Get the response for this request + const response = this._responseMapping.get(requestId); + if (!response) { + throw new Error(`No connection established for request ID: ${String(requestId)}`); } - // Send the message as an SSE event - sseResponse.write( - `event: message\ndata: ${JSON.stringify(message)}\n\n`, - ); - // After all JSON-RPC responses have been sent, the server SHOULD close the SSE stream. + + if (!this._enableJsonResponse) { + response.write(`event: message\ndata: ${JSON.stringify(message)}\n\n`); + } if ('result' in message || 'error' in message) { - this._sseResponseMapping.delete(requestId); - // Only close the connection if it's not needed by other requests - const canCloseConnection = ![...this._sseResponseMapping.entries()].some(([id, res]) => res === sseResponse && id !== requestId); - if (canCloseConnection) { - sseResponse?.end(); + this._requestResponseMap.set(requestId, message); + + // Get all request IDs that share the same request response object + const relatedIds = Array.from(this._responseMapping.entries()) + .filter(([_, res]) => res === response) + .map(([id]) => id); + + // Check if we have responses for all requests using this connection + const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id)); + + if (allResponsesReady) { + if (this._enableJsonResponse) { + // If we are in SSE mode, we don't need to do anything else + + // All responses ready, send as JSON + const headers: Record = { + 'Content-Type': 'application/json', + }; + if (this.sessionId !== undefined) { + headers['mcp-session-id'] = this.sessionId; + } + + // In tests, relatedIds might be coming in non-deterministic order + // We need to sort numerically for numeric IDs, alphabetically for string IDs + const responses = relatedIds + .sort((a, b) => { + // If both IDs are numbers, sort numerically + if (typeof a === 'number' && typeof b === 'number') { + return a - b; + } + + // Otherwise sort by string lexical order + return String(a).localeCompare(String(b)); + }) + .map(id => this._requestResponseMap.get(id)!); + + response.writeHead(200, headers); + if (responses.length === 1) { + response.end(JSON.stringify(responses[0])); + } else { + response.end(JSON.stringify(responses)); + } + } else { + response.end(); + } + // Clean up + for (const id of relatedIds) { + this._requestResponseMap.delete(id); + this._responseMapping.delete(id); + } } } } +} -} \ No newline at end of file From 71aeb9a3a653e9da0cb29eb3c228ab60eddfcaa3 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Wed, 9 Apr 2025 21:38:57 +0100 Subject: [PATCH 2/7] add examples with notifications --- src/examples/client/simpleStreamableHttp.ts | 25 +++++++++- .../server/jsonResponseStreamableHttp.ts | 44 ++++++++++++++++ src/examples/server/simpleStreamableHttp.ts | 50 +++++++++++++++++-- 3 files changed, 113 insertions(+), 6 deletions(-) diff --git a/src/examples/client/simpleStreamableHttp.ts b/src/examples/client/simpleStreamableHttp.ts index 589c1f41..b17add14 100644 --- a/src/examples/client/simpleStreamableHttp.ts +++ b/src/examples/client/simpleStreamableHttp.ts @@ -10,7 +10,8 @@ import { GetPromptRequest, GetPromptResultSchema, ListResourcesRequest, - ListResourcesResultSchema + ListResourcesResultSchema, + LoggingMessageNotificationSchema } from '../../types.js'; async function main(): Promise { @@ -19,12 +20,17 @@ async function main(): Promise { name: 'example-client', version: '1.0.0' }); + const transport = new StreamableHTTPClientTransport( new URL('http://localhost:3000/mcp') ); // Connect the client using the transport and initialize the server await client.connect(transport); + client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { + console.log(`Notification received: ${notification.params.level} - ${notification.params.data}`); + }); + console.log('Connected to MCP server'); // List available tools @@ -46,6 +52,23 @@ async function main(): Promise { const greetResult = await client.request(greetRequest, CallToolResultSchema); console.log('Greeting result:', greetResult.content[0].text); + // Call the new 'multi-greet' tool + console.log('\nCalling multi-greet tool (with notifications)...'); + const multiGreetRequest: CallToolRequest = { + method: 'tools/call', + params: { + name: 'multi-greet', + arguments: { name: 'MCP User' } + } + }; + const multiGreetResult = await client.request(multiGreetRequest, CallToolResultSchema); + console.log('Multi-greet results:'); + multiGreetResult.content.forEach(item => { + if (item.type === 'text') { + console.log(`- ${item.text}`); + } + }); + // List available prompts try { const promptsRequest: ListPromptsRequest = { diff --git a/src/examples/server/jsonResponseStreamableHttp.ts b/src/examples/server/jsonResponseStreamableHttp.ts index 2e83b54a..9b053631 100644 --- a/src/examples/server/jsonResponseStreamableHttp.ts +++ b/src/examples/server/jsonResponseStreamableHttp.ts @@ -9,6 +9,10 @@ import { CallToolResult } from '../../types.js'; const server = new McpServer({ name: 'json-response-streamable-http-server', version: '1.0.0', +}, { + capabilities: { + logging: {}, + } }); // Register a simple tool that returns a greeting @@ -30,6 +34,46 @@ server.tool( } ); +// Register a tool that sends multiple greetings with notifications +server.tool( + 'multi-greet', + 'A tool that sends different greetings with delays between them', + { + name: z.string().describe('Name to greet'), + }, + async ({ name }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + + await sendNotification({ + method: "notifications/message", + params: { level: "debug", data: `Starting multi-greet for ${name}` } + }); + + await sleep(1000); // Wait 1 second before first greeting + + await sendNotification({ + method: "notifications/message", + params: { level: "info", data: `Sending first greeting to ${name}` } + }); + + await sleep(1000); // Wait another second before second greeting + + await sendNotification({ + method: "notifications/message", + params: { level: "info", data: `Sending second greeting to ${name}` } + }); + + return { + content: [ + { + type: 'text', + text: `Good morning, ${name}!`, + } + ], + }; + } +); + const app = express(); app.use(express.json()); diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index e6ebe4b9..74c3ef53 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -9,7 +9,7 @@ import { CallToolResult, GetPromptResult, ReadResourceResult } from '../../types const server = new McpServer({ name: 'simple-streamable-http-server', version: '1.0.0', -}); +}, { capabilities: { logging: {} } }); // Register a simple tool that returns a greeting server.tool( @@ -30,6 +30,46 @@ server.tool( } ); +// Register a tool that sends multiple greetings with notifications +server.tool( + 'multi-greet', + 'A tool that sends different greetings with delays between them', + { + name: z.string().describe('Name to greet'), + }, + async ({ name }, { sendNotification }): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + + await sendNotification({ + method: "notifications/message", + params: { level: "debug", data: `Starting multi-greet for ${name}` } + }); + + await sleep(1000); // Wait 1 second before first greeting + + await sendNotification({ + method: "notifications/message", + params: { level: "info", data: `Sending first greeting to ${name}` } + }); + + await sleep(1000); // Wait another second before second greeting + + await sendNotification({ + method: "notifications/message", + params: { level: "info", data: `Sending second greeting to ${name}` } + }); + + return { + content: [ + { + type: 'text', + text: `Good morning, ${name}!`, + } + ], + }; + } +); + // Register a simple prompt server.prompt( 'greeting-template', @@ -81,7 +121,7 @@ app.post('/mcp', async (req: Request, res: Response) => { // Check for existing session ID const sessionId = req.headers['mcp-session-id'] as string | undefined; let transport: StreamableHTTPServerTransport; - + if (sessionId && transports[sessionId]) { // Reuse existing transport transport = transports[sessionId]; @@ -90,14 +130,14 @@ app.post('/mcp', async (req: Request, res: Response) => { transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), }); - + // Connect the transport to the MCP server BEFORE handling the request // so responses can flow back through the same transport await server.connect(transport); - + // After handling the request, if we get a session ID back, store the transport await transport.handleRequest(req, res, req.body); - + // Store the transport by session ID for future requests if (transport.sessionId) { transports[transport.sessionId] = transport; From 5b99e94e6820e38bac6a4a763687dd32f318e598 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Wed, 9 Apr 2025 21:54:35 +0100 Subject: [PATCH 3/7] clean up somments --- src/server/streamableHttp.ts | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 3fd96307..e8844529 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -406,8 +406,6 @@ export class StreamableHTTPServerTransport implements Transport { if (allResponsesReady) { if (this._enableJsonResponse) { - // If we are in SSE mode, we don't need to do anything else - // All responses ready, send as JSON const headers: Record = { 'Content-Type': 'application/json', @@ -416,18 +414,7 @@ export class StreamableHTTPServerTransport implements Transport { headers['mcp-session-id'] = this.sessionId; } - // In tests, relatedIds might be coming in non-deterministic order - // We need to sort numerically for numeric IDs, alphabetically for string IDs const responses = relatedIds - .sort((a, b) => { - // If both IDs are numbers, sort numerically - if (typeof a === 'number' && typeof b === 'number') { - return a - b; - } - - // Otherwise sort by string lexical order - return String(a).localeCompare(String(b)); - }) .map(id => this._requestResponseMap.get(id)!); response.writeHead(200, headers); @@ -437,6 +424,7 @@ export class StreamableHTTPServerTransport implements Transport { response.end(JSON.stringify(responses)); } } else { + // End the SSE stream response.end(); } // Clean up From a76004c92bfa2c75a2f47eb70b4bbf8d24ee02c1 Mon Sep 17 00:00:00 2001 From: Antoine Beauvais-Lacasse Date: Wed, 9 Apr 2025 20:25:25 -0400 Subject: [PATCH 4/7] StreamableHTTPClientTransport cleanup / fixes * always send headers specified in requestInit option * avoid doubled onerror call * use for-await to iterate SSE stream * remove outdated comments * simplify requestId tracking * throw error when response Content-Type is out of spec --- src/client/streamableHttp.test.ts | 79 +++++++++++++++++++++--- src/client/streamableHttp.ts | 99 ++++++++++++++----------------- 2 files changed, 114 insertions(+), 64 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 0dc582d4..40f22139 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -80,7 +80,7 @@ describe("StreamableHTTPClientTransport", () => { (global.fetch as jest.Mock).mockResolvedValueOnce({ ok: true, status: 200, - headers: new Headers({ "mcp-session-id": "test-session-id" }), + headers: new Headers({ "content-type": "text/event-stream", "mcp-session-id": "test-session-id" }), }); await transport.send(message); @@ -164,7 +164,7 @@ describe("StreamableHTTPClientTransport", () => { // We expect the 405 error to be caught and handled gracefully // This should not throw an error that breaks the transport await transport.start(); - await expect(transport.openSseStream()).rejects.toThrow('Failed to open SSE stream: Method Not Allowed'); + await expect(transport.openSseStream()).rejects.toThrow("Failed to open SSE stream: Method Not Allowed"); // Check that GET was attempted expect(global.fetch).toHaveBeenCalledWith( @@ -192,7 +192,7 @@ describe("StreamableHTTPClientTransport", () => { const stream = new ReadableStream({ start(controller) { // Send a server notification via SSE - const event = 'event: message\ndata: {"jsonrpc": "2.0", "method": "serverNotification", "params": {}}\n\n'; + const event = "event: message\ndata: {\"jsonrpc\": \"2.0\", \"method\": \"serverNotification\", \"params\": {}}\n\n"; controller.enqueue(encoder.encode(event)); } }); @@ -237,7 +237,7 @@ describe("StreamableHTTPClientTransport", () => { (global.fetch as jest.Mock) .mockResolvedValueOnce({ - ok: true, + ok: true, status: 200, headers: new Headers({ "content-type": "text/event-stream" }), body: makeStream("request1") @@ -263,13 +263,13 @@ describe("StreamableHTTPClientTransport", () => { // Both streams should have delivered their messages expect(messageSpy).toHaveBeenCalledTimes(2); - + // Verify received messages without assuming specific order expect(messageSpy.mock.calls.some(call => { const msg = call[0]; return msg.id === "request1" && msg.result?.id === "request1"; })).toBe(true); - + expect(messageSpy.mock.calls.some(call => { const msg = call[0]; return msg.id === "request2" && msg.result?.id === "request2"; @@ -281,7 +281,7 @@ describe("StreamableHTTPClientTransport", () => { const encoder = new TextEncoder(); const stream = new ReadableStream({ start(controller) { - const event = 'id: event-123\nevent: message\ndata: {"jsonrpc": "2.0", "method": "serverNotification", "params": {}}\n\n'; + const event = "id: event-123\nevent: message\ndata: {\"jsonrpc\": \"2.0\", \"method\": \"serverNotification\", \"params\": {}}\n\n"; controller.enqueue(encoder.encode(event)); controller.close(); } @@ -313,4 +313,67 @@ describe("StreamableHTTPClientTransport", () => { const lastCall = calls[calls.length - 1]; expect(lastCall[1].headers.get("last-event-id")).toBe("event-123"); }); -}); \ No newline at end of file + + it("should throw error when invalid content-type is received", async () => { + const message: JSONRPCMessage = { + jsonrpc: "2.0", + method: "test", + params: {}, + id: "test-id" + }; + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("invalid text response"); + controller.close(); + } + }); + + const errorSpy = jest.fn(); + transport.onerror = errorSpy; + + (global.fetch as jest.Mock).mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ "content-type": "text/plain" }), + body: stream + }); + + await transport.start(); + await expect(transport.send(message)).rejects.toThrow("Unexpected content type: text/plain"); + expect(errorSpy).toHaveBeenCalled(); + }); + + + it("should always send specified custom headers", async () => { + const requestInit = { + headers: { + "X-Custom-Header": "CustomValue" + } + }; + transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), { + requestInit: requestInit + }); + + let actualReqInit: RequestInit = {}; + + ((global.fetch as jest.Mock)).mockImplementation( + async (_url, reqInit) => { + actualReqInit = reqInit; + return new Response(null, { status: 200, headers: { "content-type": "text/event-stream" } }); + } + ); + + await transport.start(); + + await transport.openSseStream(); + expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue"); + + requestInit.headers["X-Custom-Header"] = "SecondCustomValue"; + + await transport.send({ jsonrpc: "2.0", method: "test", params: {} } as JSONRPCMessage); + expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("SecondCustomValue"); + + expect(global.fetch).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 0c667e35..5ea537c7 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -1,7 +1,8 @@ import { Transport } from "../shared/transport.js"; import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js"; -import { EventSourceParserStream } from 'eventsource-parser/stream'; +import { EventSourceParserStream } from "eventsource-parser/stream"; + export class StreamableHTTPError extends Error { constructor( public readonly code: number | undefined, @@ -17,16 +18,16 @@ export class StreamableHTTPError extends Error { export type StreamableHTTPClientTransportOptions = { /** * An OAuth client provider to use for authentication. - * + * * When an `authProvider` is specified and the connection is started: * 1. The connection is attempted with any existing access token from the `authProvider`. * 2. If the access token has expired, the `authProvider` is used to refresh the token. * 3. If token refresh fails or no access token exists, and auth is required, `OAuthClientProvider.redirectToAuthorization` is called, and an `UnauthorizedError` will be thrown from `connect`/`start`. - * + * * After the user has finished authorizing via their user agent, and is redirected back to the MCP client application, call `StreamableHTTPClientTransport.finishAuth` with the authorization code before retrying the connection. - * + * * If an `authProvider` is not provided, and auth is required, an `UnauthorizedError` will be thrown. - * + * * `UnauthorizedError` might also be thrown when sending any message over the transport, indicating that the session has expired, and needs to be re-authed and reconnected. */ authProvider?: OAuthClientProvider; @@ -83,7 +84,7 @@ export class StreamableHTTPClientTransport implements Transport { return await this._startOrAuthStandaloneSSE(); } - private async _commonHeaders(): Promise { + private async _commonHeaders(): Promise { const headers: HeadersInit = {}; if (this._authProvider) { const tokens = await this._authProvider.tokens(); @@ -96,24 +97,25 @@ export class StreamableHTTPClientTransport implements Transport { headers["mcp-session-id"] = this._sessionId; } - return headers; + return new Headers( + { ...headers, ...this._requestInit?.headers } + ); } private async _startOrAuthStandaloneSSE(): Promise { try { // Try to open an initial SSE stream with GET to listen for server messages // This is optional according to the spec - server may not support it - const commonHeaders = await this._commonHeaders(); - const headers = new Headers(commonHeaders); - headers.set('Accept', 'text/event-stream'); + const headers = await this._commonHeaders(); + headers.set("Accept", "text/event-stream"); // Include Last-Event-ID header for resumable streams if (this._lastEventId) { - headers.set('last-event-id', this._lastEventId); + headers.set("last-event-id", this._lastEventId); } const response = await fetch(this._url, { - method: 'GET', + method: "GET", headers, signal: this._abortController?.signal, }); @@ -124,12 +126,10 @@ export class StreamableHTTPClientTransport implements Transport { return await this._authThenStart(); } - const error = new StreamableHTTPError( + throw new StreamableHTTPError( response.status, `Failed to open SSE stream: ${response.statusText}`, ); - this.onerror?.(error); - throw error; } // Successful connection, handle the SSE stream as a standalone listener @@ -144,42 +144,32 @@ export class StreamableHTTPClientTransport implements Transport { if (!stream) { return; } - // Create a pipeline: binary stream -> text decoder -> SSE parser - const eventStream = stream - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new EventSourceParserStream()); - const reader = eventStream.getReader(); const processStream = async () => { - try { - while (true) { - const { done, value: event } = await reader.read(); - if (done) { - break; - } - - // Update last event ID if provided - if (event.id) { - this._lastEventId = event.id; - } - - // Handle message events (default event type is undefined per docs) - // or explicit 'message' event type - if (!event.event || event.event === 'message') { - try { - const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); - this.onmessage?.(message); - } catch (error) { - this.onerror?.(error as Error); - } + // Create a pipeline: binary stream -> text decoder -> SSE parser + const eventStream = stream + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()); + + for await (const event of eventStream) { + // Update last event ID if provided + if (event.id) { + this._lastEventId = event.id; + } + // Handle message events (default event type is undefined per docs) + // or explicit 'message' event type + if (!event.event || event.event === "message") { + try { + const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); + this.onmessage?.(message); + } catch (error) { + this.onerror?.(error as Error); } } - } catch (error) { - this.onerror?.(error as Error); } }; - processStream(); + processStream().catch(err => this.onerror?.(err)); } async start() { @@ -215,8 +205,7 @@ export class StreamableHTTPClientTransport implements Transport { async send(message: JSONRPCMessage | JSONRPCMessage[]): Promise { try { - const commonHeaders = await this._commonHeaders(); - const headers = new Headers({ ...commonHeaders, ...this._requestInit?.headers }); + const headers = await this._commonHeaders(); headers.set("content-type", "application/json"); headers.set("accept", "application/json, text/event-stream"); @@ -261,20 +250,13 @@ export class StreamableHTTPClientTransport implements Transport { // Get original message(s) for detecting request IDs const messages = Array.isArray(message) ? message : [message]; - // Extract IDs from request messages for tracking responses - const requestIds = messages.filter(msg => 'method' in msg && 'id' in msg) - .map(msg => 'id' in msg ? msg.id : undefined) - .filter(id => id !== undefined); - - // If we have request IDs and an SSE response, create a unique stream ID - const hasRequests = requestIds.length > 0; + const hasRequests = messages.filter(msg => "method" in msg && "id" in msg && msg.id !== undefined).length > 0; // Check the response type const contentType = response.headers.get("content-type"); if (hasRequests) { if (contentType?.includes("text/event-stream")) { - // For streaming responses, create a unique stream ID based on request IDs this._handleSseStream(response.body); } else if (contentType?.includes("application/json")) { // For non-streaming servers, we might get direct JSON responses @@ -286,6 +268,11 @@ export class StreamableHTTPClientTransport implements Transport { for (const msg of responseMessages) { this.onmessage?.(msg); } + } else { + throw new StreamableHTTPError( + -1, + `Unexpected content type: ${contentType}`, + ); } } } catch (error) { @@ -296,7 +283,7 @@ export class StreamableHTTPClientTransport implements Transport { /** * Opens SSE stream to receive messages from the server. - * + * * This allows the server to push messages to the client without requiring the client * to first send a request via HTTP POST. Some servers may not support this feature. * If authentication is required but fails, this method will throw an UnauthorizedError. @@ -309,4 +296,4 @@ export class StreamableHTTPClientTransport implements Transport { } await this._startOrAuthStandaloneSSE(); } -} \ No newline at end of file +} From 632b836c51c3dd197e8bccf9400d926b6db722cb Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 10 Apr 2025 09:43:42 +0100 Subject: [PATCH 5/7] remove order assumption for batch responses in test --- src/server/streamableHttp.test.ts | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 74868979..926c4306 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1320,7 +1320,7 @@ describe("StreamableHTTPServerTransport", () => { body: JSON.stringify(batchMessages), }); - // Mock sequential responses - send them in specific order + // Mock responses without enforcing specific order jsonResponseTransport.onmessage = (message) => { if ('method' in message && 'id' in message) { const responseMessage: JSONRPCMessage = { @@ -1328,13 +1328,7 @@ describe("StreamableHTTPServerTransport", () => { result: { value: `result-for-${message.id}` }, id: message.id, }; - // Send responses in order - req1 first, then req2 - if (message.id === 'req1') { - void jsonResponseTransport.send(responseMessage); - } else { - // Add a tiny delay to req2 to ensure it's processed after req1 - setTimeout(() => void jsonResponseTransport.send(responseMessage), 5); - } + void jsonResponseTransport.send(responseMessage); } }; @@ -1351,8 +1345,15 @@ describe("StreamableHTTPServerTransport", () => { }) ); + // Verify response was sent but don't assume specific order expect(mockResponse.end).toHaveBeenCalled(); - expect(mockResponse.end.mock.calls[0][0]).toContain("result-for-req2"); + const responseJson = JSON.parse(mockResponse.end.mock.calls[0][0] as string); + expect(Array.isArray(responseJson)).toBe(true); + expect(responseJson).toHaveLength(2); + + // Check each response exists separately without assuming order + expect(responseJson).toContainEqual(expect.objectContaining({ id: "req1", result: { value: "result-for-req1" } })); + expect(responseJson).toContainEqual(expect.objectContaining({ id: "req2", result: { value: "result-for-req2" } })); }); }); From 962a9c959ae86a1635afdd996150ce011c3a1805 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 10 Apr 2025 13:01:09 +0100 Subject: [PATCH 6/7] clean up and docs fixes --- src/examples/README.md | 57 +++++++++++++++--- .../server/jsonResponseStreamableHttp.ts | 27 +++++++-- src/examples/server/simpleStreamableHttp.ts | 23 ++++++- src/server/.streamableHttp.test.ts.swp | Bin 16384 -> 0 bytes src/server/streamableHttp.test.ts | 11 ++-- 5 files changed, 99 insertions(+), 19 deletions(-) delete mode 100644 src/server/.streamableHttp.test.ts.swp diff --git a/src/examples/README.md b/src/examples/README.md index 937159c3..cc6af51c 100644 --- a/src/examples/README.md +++ b/src/examples/README.md @@ -20,15 +20,43 @@ The server will start on port 3000. You can test the initialization and tool cal ```bash # Initialize the server and get the session ID from headers -SESSION_ID=$(curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" \ - -d '{"jsonrpc":"2.0","method":"initialize","params":{"capabilities":{}},"id":"1"}' \ +SESSION_ID=$(curl -X POST \ + -H "Content-Type: application/json" \ + -H "Accept: application/json" \ + -H "Accept: text/event-stream" \ + -d '{ + "jsonrpc": "2.0", + "method": "initialize", + "params": { + "capabilities": {}, + "protocolVersion": "2025-03-26", + "clientInfo": { + "name": "test", + "version": "1.0.0" + } + }, + "id": "1" + }' \ -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r') echo "Session ID: $SESSION_ID" # Call the greet tool using the saved session ID -curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" \ +curl -X POST \ + -H "Content-Type: application/json" \ + -H "Accept: application/json" \ + -H "Accept: text/event-stream" \ -H "mcp-session-id: $SESSION_ID" \ - -d '{"jsonrpc":"2.0","method":"mcp.call_tool","params":{"name":"greet","arguments":{"name":"World"}},"id":"2"}' \ + -d '{ + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "greet", + "arguments": { + "name": "World" + } + }, + "id": "2" + }' \ http://localhost:3000/mcp ``` @@ -52,10 +80,25 @@ The server will start on port 3000. You can test the initialization and tool lis ```bash # First initialize the server and save the session ID to a variable -SESSION_ID=$(curl -X POST -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" \ - -d '{"jsonrpc":"2.0","method":"initialize","params":{"capabilities":{}},"id":"1"}' \ +SESSION_ID=$(curl -X POST \ + -H "Content-Type: application/json" \ + -H "Accept: application/json" \ + -H "Accept: text/event-stream" \ + -d '{ + "jsonrpc": "2.0", + "method": "initialize", + "params": { + "capabilities": {}, + "protocolVersion": "2025-03-26", + "clientInfo": { + "name": "test", + "version": "1.0.0" + } + }, + "id": "1" + }' \ -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r') -echo "Session ID: $SESSION_ID" +echo "Session ID: $SESSION_ID # Then list tools using the saved session ID curl -X POST -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" \ diff --git a/src/examples/server/jsonResponseStreamableHttp.ts b/src/examples/server/jsonResponseStreamableHttp.ts index 9b053631..1d322112 100644 --- a/src/examples/server/jsonResponseStreamableHttp.ts +++ b/src/examples/server/jsonResponseStreamableHttp.ts @@ -149,10 +149,29 @@ function isInitializeRequest(body: unknown): boolean { // Start the server const PORT = 3000; app.listen(PORT, () => { - console.log(`MCP Streamable HTTP Server with JSON responses listening on port ${PORT}`); - console.log(`Server is running. Press Ctrl+C to stop.`); - console.log(`Initialize with: curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d '{"jsonrpc":"2.0","method":"initialize","params":{"capabilities":{}},"id":"1"}' http://localhost:${PORT}/mcp`); - console.log(`Then call tool with: curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -H "mcp-session-id: YOUR_SESSION_ID" -d '{"jsonrpc":"2.0","method":"mcp.call_tool","params":{"name":"greet","arguments":{"name":"World"}},"id":"2"}' http://localhost:${PORT}/mcp`); + console.log(`MCP Streamable HTTP Server listening on port ${PORT}`); + console.log(`Initialize session with the command below id you are using curl for testing: + ----------------------------- + SESSION_ID=$(curl -X POST \ + -H "Content-Type: application/json" \ + -H "Accept: application/json" \ + -H "Accept: text/event-stream" \ + -d '{ + "jsonrpc": "2.0", + "method": "initialize", + "params": { + "capabilities": {}, + "protocolVersion": "2025-03-26", + "clientInfo": { + "name": "test", + "version": "1.0.0" + } + }, + "id": "1" + }' \ + -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\\r') + echo "Session ID: $SESSION_ID" + -----------------------------`); }); // Handle server shutdown diff --git a/src/examples/server/simpleStreamableHttp.ts b/src/examples/server/simpleStreamableHttp.ts index 74c3ef53..5b228cbd 100644 --- a/src/examples/server/simpleStreamableHttp.ts +++ b/src/examples/server/simpleStreamableHttp.ts @@ -186,7 +186,28 @@ function isInitializeRequest(body: unknown): boolean { const PORT = 3000; app.listen(PORT, () => { console.log(`MCP Streamable HTTP Server listening on port ${PORT}`); - console.log(`Test with: curl -X POST -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" -d '{"jsonrpc":"2.0","method":"initialize","params":{"capabilities":{}},"id":"1"}' http://localhost:${PORT}/mcp`); + console.log(`Initialize session with the command below id you are using curl for testing: + ----------------------------- + SESSION_ID=$(curl -X POST \ + -H "Content-Type: application/json" \ + -H "Accept: application/json" \ + -H "Accept: text/event-stream" \ + -d '{ + "jsonrpc": "2.0", + "method": "initialize", + "params": { + "capabilities": {}, + "protocolVersion": "2025-03-26", + "clientInfo": { + "name": "test", + "version": "1.0.0" + } + }, + "id": "1" + }' \ + -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\\r') + echo "Session ID: $SESSION_ID" + -----------------------------`); }); // Handle server shutdown diff --git a/src/server/.streamableHttp.test.ts.swp b/src/server/.streamableHttp.test.ts.swp deleted file mode 100644 index 4ab20fa3582429b76d0a1f7da19f2c2ec77faf7f..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16384 zcmeI3%a0tz9mg9&c=~~Yxqw89;lK>c^z5u5h_SI@*T62);?+KgWXnOl-8DPC?x}8f z*X(*@1{5Xv0}{gl@sL6ULXH%H00B`BNF)l8LZlpkLoP^>B7q2r0(JrkIN$23>gj2E zy@nHzYU!gL_phGcs`}NV%S&VTOwW*;?Use-Qp>va52u!Ie4}n{{j+8Hj5*@W)F z1$2t@$mTTRHcv{^G%t+;MuD?cV5_xryfr35UsJi7T=uQSvyEuN83l|2MggOMQNSo* z6fg=H1&jh;0|j`r&3XX!I9Ju>KK1vU!r#;C3Dx^S!TzbT`|ACB1^Xw;9;x?>Dg&L* zf0aE}_TLul|50{*3UHGuu#W$+vL8|LPZaDQDf_CjU-=FBL&yKOvNL7>Nx}XvW!EPg zrwjH|%HC7)SI!$v|D>`9%Klct{-Lt3Df{F%N8^8>?B7=QUlr`?wfSQdFbWt2i~>dh zqkvJsC}0#Y3K#{90!D#S1su<^&WB~2*e~Pzzuy19y4|ww2eY6GKHX+ne+93AUjhzX zFaxdwTfnKUmh}esHFycU2p$AS!2xg^*bDZ6Yr(r)5FVTWKLpN zfBy}50vrJya0o1d1#l@ig)@TRgJ;2U@B=Ubt^nJ?M>vD{2lzX94Lk!L10lEp)W9cb z*AKuy!OP&s;78yP*a@}+(e_&b9+E!2&^Unys!4BC7HavNQ-;3cd9x3vNVNQ=CsJOh-kd{iF;C7EnqGgX&@?u6#?DWd3IP3+v5(4yD4bkP;`N!?+yaA`B^{)=ik$vSgM zwMK5)gVfVRhW#to(FV1gToqh`Y9#1k(6r%u=mW!ktX@xhx?+UL;bDhMqC}JEJPwM( zmeHt0J>z3zI=C83BLg`bq+%dVIjeMnQa0%X0rkG?^KMn8^F@YXF^-*;Uk=)RwBs}C z3GbnD*>vD;G;WJoZQvz3PLK@UL@`pae2dKcObLe#cm@X{os;TXUpX)FfA1tlnr}l- zk@E-3l+hSo6&@iSw`6)B@%m!Q7&?*EIef+`5aOk`9hYdeGOr@fI1IB(4^9)obKXOD zy%lls(G~OVZ9yO$c_Qaur^9J{yW@7NC6RPw>DpV`BV_(Ho4~RuM7Ze8gO>SK=|<8y z1YM>uPfGuj_cVoWCD##p<|FQuqK0u>iGUS=7xoqxr}tIWn94jO(q?px%nxsf zE-cK>r`JXo^ui%^iAxkK4O$c_P|FatLTD2?-;3#RXp#kPlT>~oJN0(UEc7yBkB*R) zt9|Ao?Xq&Mf2OkMJHh~^iYf2M41Z*yM7{$|H>A%Fvv7@xpPLkups#YVHlxrt4clQ_ z*5wT?wCD3&@N@+@mzHIr>!TaWZY352S&|!s!TNFYKNs@326Rh?Q;{ z!0(Xi$V()eOYL3}9kMC^LI$EfT2z^%8i{kN0AhR(YVD#9X|f{Mz4@`0=`x3%KQrr8-g~!Kl6>g-Gx>xpE4z3zel(Fn z?dpZznkAFgtc#oVD1q4jKaaii6!zR=|F6F4jup=I0z1aYr#3-9qjwx z1W$t=*bQ3XH1_u=!H3{=@C5i7SOwRE>wwt*KM4+kc`yS+Iqn4JF$x$3i~>dhqkvJs zC}0#Y3K#{xyb9o0hhU4SPj~cReQaF~nc5hOZFJ3MA^t~i*&#H(U5IZ&6WB58Q&qW@ z-YBKgRB+gpP_HZz>;&iK(SARngm)>CVbgCwIKxZ0*!YH>)T9cN+P>hybzP;uL0M+8 zx0j`eX(L|}&yR7;AFS(avx6o(WIg#+3LEedRkcoXn?7Pf(CnBdJAxm!(FO { it("should return JSON response for a single request", async () => { const requestMessage: JSONRPCMessage = { jsonrpc: "2.0", - method: "test", + method: "tools/list", params: {}, id: "test-req-id", }; @@ -1306,8 +1306,8 @@ describe("StreamableHTTPServerTransport", () => { it("should return JSON response for batch requests", async () => { const batchMessages: JSONRPCMessage[] = [ - { jsonrpc: "2.0", method: "test1", params: {}, id: "req1" }, - { jsonrpc: "2.0", method: "test2", params: {}, id: "req2" }, + { jsonrpc: "2.0", method: "tools/list", params: {}, id: "req1" }, + { jsonrpc: "2.0", method: "tools/call", params: {}, id: "req2" }, ]; const req = createMockRequest({ @@ -1334,9 +1334,6 @@ describe("StreamableHTTPServerTransport", () => { await jsonResponseTransport.handleRequest(req, mockResponse); - // Wait for all promises to resolve - give it enough time - await new Promise(resolve => setTimeout(resolve, 100)); - // Should respond with application/json header expect(mockResponse.writeHead).toHaveBeenCalledWith( 200, @@ -1350,7 +1347,7 @@ describe("StreamableHTTPServerTransport", () => { const responseJson = JSON.parse(mockResponse.end.mock.calls[0][0] as string); expect(Array.isArray(responseJson)).toBe(true); expect(responseJson).toHaveLength(2); - + // Check each response exists separately without assuming order expect(responseJson).toContainEqual(expect.objectContaining({ id: "req1", result: { value: "result-for-req1" } })); expect(responseJson).toContainEqual(expect.objectContaining({ id: "req2", result: { value: "result-for-req2" } })); From 569c9287941abe238c177755783fe88c865149f5 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Thu, 10 Apr 2025 13:04:58 +0100 Subject: [PATCH 7/7] remove promise from test --- src/server/streamableHttp.test.ts | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 640c843d..ad80ea62 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1272,20 +1272,14 @@ describe("StreamableHTTPServerTransport", () => { if ('method' in message && 'id' in message) { const responseMessage: JSONRPCMessage = { jsonrpc: "2.0", - result: { value: "test-result" }, + result: { value: `test-result` }, id: message.id, }; - setTimeout(() => { - void jsonResponseTransport.send(responseMessage); - }, 10); + void jsonResponseTransport.send(responseMessage); } }; await jsonResponseTransport.handleRequest(req, mockResponse); - - // Wait for all promises to resolve - await new Promise(resolve => setTimeout(resolve, 50)); - // Should respond with application/json header expect(mockResponse.writeHead).toHaveBeenCalledWith( 200,