From 03de5b6853ec2f214848931f02764710b9da59e8 Mon Sep 17 00:00:00 2001 From: ihrpr Date: Fri, 11 Apr 2025 18:27:12 +0100 Subject: [PATCH 1/2] Use server and transport classes instead of mocks in streamable http transport --- src/server/streamableHttp.test.ts | 2229 +++++++++++------------------ src/server/streamableHttp.ts | 2 + 2 files changed, 816 insertions(+), 1415 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 85fcae2f..dda69331 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1,1590 +1,989 @@ -import { IncomingMessage, ServerResponse } from "node:http"; -import { StreamableHTTPServerTransport } from "./streamableHttp.js"; -import { JSONRPCMessage } from "../types.js"; -import { Readable } from "node:stream"; +import { createServer, type Server, IncomingMessage, ServerResponse } from "node:http"; +import { AddressInfo } from "node:net"; import { randomUUID } from "node:crypto"; -// Mock IncomingMessage -function createMockRequest(options: { - method: string; - headers: Record; - body?: string; -}): IncomingMessage { - const readable = new Readable(); - readable._read = () => { }; - if (options.body) { - readable.push(options.body); - readable.push(null); - } - - return Object.assign(readable, { - method: options.method, - headers: options.headers, - }) as IncomingMessage; -} - -// Mock ServerResponse -function createMockResponse(): jest.Mocked { - const response = { - writeHead: jest.fn().mockReturnThis(), - write: jest.fn().mockReturnThis(), - end: jest.fn().mockReturnThis(), - on: jest.fn().mockReturnThis(), - emit: jest.fn().mockReturnThis(), - getHeader: jest.fn(), - setHeader: jest.fn(), - flushHeaders: jest.fn(), - } as unknown as jest.Mocked; - return response; +import { StreamableHTTPServerTransport } from "./streamableHttp.js"; +import { McpServer } from "./mcp.js"; +import { CallToolResult, JSONRPCMessage } from "../types.js"; +import { z } from "zod"; + +/** + * Test server configuration for StreamableHTTPServerTransport tests + */ +interface TestServerConfig { + sessionIdGenerator?: () => string | undefined; + enableJsonResponse?: boolean; + customRequestHandler?: (req: IncomingMessage, res: ServerResponse, parsedBody?: unknown) => Promise; } -describe("StreamableHTTPServerTransport", () => { - let transport: StreamableHTTPServerTransport; - let mockResponse: jest.Mocked; - let mockRequest: string; - - beforeEach(() => { - transport = new StreamableHTTPServerTransport({ - sessionIdGenerator: () => randomUUID(), - }); - mockResponse = createMockResponse(); - mockRequest = JSON.stringify({ - jsonrpc: "2.0", - method: "test", - params: {}, - id: 1, - }); +/** + * Helper to create and start test HTTP server with MCP setup + */ +async function createTestServer(config: TestServerConfig = {}): Promise<{ + server: Server; + transport: StreamableHTTPServerTransport; + mcpServer: McpServer; + baseUrl: URL; +}> { + const mcpServer = new McpServer( + { name: "test-server", version: "1.0.0" }, + { capabilities: {} } + ); + + mcpServer.tool( + "greet", + "A simple greeting tool", + { name: z.string().describe("Name to greet") }, + async ({ name }): Promise => { + return { content: [{ type: "text", text: `Hello, ${name}!` }] }; + } + ); + + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: config.sessionIdGenerator ?? (() => randomUUID()), + enableJsonResponse: config.enableJsonResponse ?? false }); - afterEach(() => { - jest.clearAllMocks(); + await mcpServer.connect(transport); + + const server = createServer(async (req, res) => { + try { + if (config.customRequestHandler) { + await config.customRequestHandler(req, res); + } else { + await transport.handleRequest(req, res); + } + } catch (error) { + console.error("Error handling request:", error); + if (!res.headersSent) res.writeHead(500).end(); + } }); - describe("Session Management", () => { - it("should generate session ID during initialization", async () => { - const initializeMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initializeMessage), - }); - - expect(transport.sessionId).toBeUndefined(); - expect(transport["_initialized"]).toBe(false); - - await transport.handleRequest(req, mockResponse); - - expect(transport.sessionId).toBeDefined(); - expect(transport["_initialized"]).toBe(true); - expect(mockResponse.writeHead).toHaveBeenCalledWith( - 200, - expect.objectContaining({ - "mcp-session-id": transport.sessionId, - }) - ); + const baseUrl = await new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => { + const addr = server.address() as AddressInfo; + resolve(new URL(`http://127.0.0.1:${addr.port}`)); }); + }); - it("should reject second initialization request", async () => { - // First initialize - const initMessage1: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const req1 = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage1), - }); + return { server, transport, mcpServer, baseUrl }; +} - await transport.handleRequest(req1, mockResponse); - expect(transport["_initialized"]).toBe(true); +/** + * Helper to stop test server + */ +async function stopTestServer({ server, transport }: { server: Server; transport: StreamableHTTPServerTransport }): Promise { + await transport.close(); + await new Promise((resolve) => server.close(() => resolve())); +} - // Reset mock for second request - mockResponse.writeHead.mockClear(); - mockResponse.end.mockClear(); +/** + * Common test messages + */ +const TEST_MESSAGES = { + initialize: { + jsonrpc: "2.0", + method: "initialize", + params: { + clientInfo: { name: "test-client", version: "1.0" }, + protocolVersion: "2025-03-26", + }, + id: "init-1", + } as JSONRPCMessage, + + toolsList: { + jsonrpc: "2.0", + method: "tools/list", + params: {}, + id: "tools-1", + } as JSONRPCMessage +}; + +/** + * Helper to extract text from SSE response + * Note: Can only be called once per response stream. For multiple reads, + * get the reader manually and read multiple times. + */ +async function readSSEEvent(response: Response): Promise { + const reader = response.body?.getReader(); + const { value } = await reader!.read(); + return new TextDecoder().decode(value); +} - // Try second initialize - const initMessage2: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-2", - }; +/** + * Helper to send JSON-RPC request + */ +async function sendPostRequest(baseUrl: URL, message: JSONRPCMessage | JSONRPCMessage[], sessionId?: string): Promise { + const headers: Record = { + "Content-Type": "application/json", + Accept: "application/json, text/event-stream", + }; + + if (sessionId) { + headers["mcp-session-id"] = sessionId; + } - const req2 = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage2), - }); + return fetch(baseUrl, { + method: "POST", + headers, + body: JSON.stringify(message), + }); +} - await transport.handleRequest(req2, mockResponse); +function expectErrorResponse(data: unknown, expectedCode: number, expectedMessagePattern: RegExp): void { + expect(data).toMatchObject({ + jsonrpc: "2.0", + error: expect.objectContaining({ + code: expectedCode, + message: expect.stringMatching(expectedMessagePattern), + }), + }); +} - expect(mockResponse.writeHead).toHaveBeenCalledWith(400); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"message":"Invalid Request: Server already initialized"')); - }); +describe("StreamableHTTPServerTransport", () => { + let server: Server; + let transport: StreamableHTTPServerTransport; + let baseUrl: URL; + let sessionId: string; + + beforeEach(async () => { + const result = await createTestServer(); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + }); - it("should reject batch initialize request", async () => { - const batchInitialize: JSONRPCMessage[] = [ - { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }, - { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client-2", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-2", - } - ]; + afterEach(async () => { + await stopTestServer({ server, transport }); + }); - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(batchInitialize), - }); + async function initializeServer(): Promise { + const response = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); - await transport.handleRequest(req, mockResponse); + expect(response.status).toBe(200); + const newSessionId = response.headers.get("mcp-session-id"); + expect(newSessionId).toBeDefined(); + return newSessionId as string; + } - expect(mockResponse.writeHead).toHaveBeenCalledWith(400); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"message":"Invalid Request: Only one initialization request is allowed"')); - }); + it("should initialize server and generate session ID", async () => { + const response = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); - it("should reject invalid session ID", async () => { - // First initialize the transport - const initMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage), - }); - - await transport.handleRequest(initReq, mockResponse); - mockResponse.writeHead.mockClear(); - - // Now try with an invalid session ID - const req = createMockRequest({ - method: "POST", - headers: { - "mcp-session-id": "invalid-session-id", - "accept": "application/json, text/event-stream", - "content-type": "application/json", - }, - body: mockRequest, - }); + expect(response.status).toBe(200); + expect(response.headers.get("content-type")).toBe("text/event-stream"); + expect(response.headers.get("mcp-session-id")).toBeDefined(); + }); - await transport.handleRequest(req, mockResponse); + it("should reject second initialization request", async () => { + // First initialize + const sessionId = await initializeServer(); + expect(sessionId).toBeDefined(); - expect(mockResponse.writeHead).toHaveBeenCalledWith(404); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"jsonrpc":"2.0"')); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"message":"Session not found"')); - }); + // Try second initialize + const secondInitMessage: JSONRPCMessage = { + jsonrpc: "2.0", + method: "initialize", + params: { + clientInfo: { name: "test-client-2", version: "1.0" }, + protocolVersion: "2025-03-26", + }, + id: "init-2", + }; + const response = await sendPostRequest(baseUrl, secondInitMessage); + + expect(response.status).toBe(400); + const errorData = await response.json(); + expectErrorResponse(errorData, -32600, /Server already initialized/); + }); - it("should reject non-initialization requests without session ID with 400 Bad Request", async () => { - // First initialize the transport - const initMessage: JSONRPCMessage = { + it("should reject batch initialize request", async () => { + const batchInitMessages: JSONRPCMessage[] = [ + TEST_MESSAGES.initialize, + { jsonrpc: "2.0", method: "initialize", params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", + clientInfo: { name: "test-client-2", version: "1.0" }, + protocolVersion: "2025-03-26", }, - body: JSON.stringify(initMessage), - }); - - await transport.handleRequest(initReq, mockResponse); - mockResponse.writeHead.mockClear(); - - // Now try without session ID - const req = createMockRequest({ - method: "POST", - headers: { - "accept": "application/json, text/event-stream", - "content-type": "application/json", - // No mcp-session-id header - }, - body: mockRequest - }); + id: "init-2", + } + ]; - await transport.handleRequest(req, mockResponse); + const response = await sendPostRequest(baseUrl, batchInitMessages); - expect(mockResponse.writeHead).toHaveBeenCalledWith(400); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"jsonrpc":"2.0"')); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"message":"Bad Request: Mcp-Session-Id header is required"')); - }); + expect(response.status).toBe(400); + const errorData = await response.json(); + expectErrorResponse(errorData, -32600, /Only one initialization request is allowed/); + }); - it("should reject requests to uninitialized server", async () => { - // Create a new transport that hasn't been initialized - const uninitializedTransport = new StreamableHTTPServerTransport({ - sessionIdGenerator: () => randomUUID(), - }); - - const req = createMockRequest({ - method: "POST", - headers: { - "accept": "application/json, text/event-stream", - "content-type": "application/json", - "mcp-session-id": "any-session-id", - }, - body: mockRequest - }); + it("should pandle post requests via sse response correctly", async () => { + sessionId = await initializeServer(); - await uninitializedTransport.handleRequest(req, mockResponse); + const response = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList, sessionId); - expect(mockResponse.writeHead).toHaveBeenCalledWith(400); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"message":"Bad Request: Server not initialized"')); - }); + expect(response.status).toBe(200); - it("should reject session ID as array", async () => { - // First initialize the transport - const initMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage), - }); - - await transport.handleRequest(initReq, mockResponse); - mockResponse.writeHead.mockClear(); - - // Now try with an array session ID - const req = createMockRequest({ - method: "POST", - headers: { - "mcp-session-id": ["session1", "session2"], - "accept": "application/json, text/event-stream", - "content-type": "application/json", - }, - body: mockRequest, - }); + // Read the SSE stream for the response + const text = await readSSEEvent(response); - await transport.handleRequest(req, mockResponse); + // Parse the SSE event + const eventLines = text.split("\n"); + const dataLine = eventLines.find(line => line.startsWith("data:")); + expect(dataLine).toBeDefined(); - expect(mockResponse.writeHead).toHaveBeenCalledWith(400); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"message":"Bad Request: Mcp-Session-Id header must be a single value"')); + const eventData = JSON.parse(dataLine!.substring(5)); + expect(eventData).toMatchObject({ + jsonrpc: "2.0", + result: expect.objectContaining({ + tools: expect.arrayContaining([ + expect.objectContaining({ + name: "greet", + description: "A simple greeting tool", + }), + ]), + }), + id: "tools-1", }); }); - describe("Mode without state management", () => { - let transportWithoutState: StreamableHTTPServerTransport; - let mockResponse: jest.Mocked; - beforeEach(async () => { - transportWithoutState = new StreamableHTTPServerTransport({ sessionIdGenerator: () => undefined }); - mockResponse = createMockResponse(); - - // Initialize the transport for each test - const initMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage), - }); + it("should call a tool and return the result", async () => { + sessionId = await initializeServer(); - await transportWithoutState.handleRequest(initReq, mockResponse); - mockResponse.writeHead.mockClear(); + const toolCallMessage: JSONRPCMessage = { + jsonrpc: "2.0", + method: "tools/call", + params: { + name: "greet", + arguments: { + name: "Test User", + }, + }, + id: "call-1", + }; + + const response = await sendPostRequest(baseUrl, toolCallMessage, sessionId); + expect(response.status).toBe(200); + + const text = await readSSEEvent(response); + const eventLines = text.split("\n"); + const dataLine = eventLines.find(line => line.startsWith("data:")); + expect(dataLine).toBeDefined(); + + const eventData = JSON.parse(dataLine!.substring(5)); + expect(eventData).toMatchObject({ + jsonrpc: "2.0", + result: { + content: [ + { + type: "text", + text: "Hello, Test User!", + }, + ], + }, + id: "call-1", }); + }); - it("should not include session ID in response headers when in mode without state management", async () => { - // Use a non-initialization request - const message: JSONRPCMessage = { - jsonrpc: "2.0", - method: "test", - params: {}, - id: 1, - }; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(message), - }); - - await transportWithoutState.handleRequest(req, mockResponse); - - expect(mockResponse.writeHead).toHaveBeenCalled(); - // Extract the headers from writeHead call - const headers = mockResponse.writeHead.mock.calls[0][1]; - expect(headers).not.toHaveProperty("mcp-session-id"); - }); + it("should reject requests without a valid session ID", async () => { + const response = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList); - it("should not validate session ID in mode without state management", async () => { - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": "invalid-session-id", // This would cause a 404 in mode with state management - }, - body: JSON.stringify({ - jsonrpc: "2.0", - method: "test", - params: {}, - id: 1 - }), - }); - - await transportWithoutState.handleRequest(req, mockResponse); - - // Should still get 200 OK, not 404 Not Found - expect(mockResponse.writeHead).toHaveBeenCalledWith( - 200, - expect.not.objectContaining({ - "mcp-session-id": expect.anything(), - }) - ); - }); + expect(response.status).toBe(400); + const errorData = await response.json(); + expectErrorResponse(errorData, -32000, /Bad Request/); + expect(errorData.id).toBeNull(); + }); - it("should handle POST requests without session validation in mode without state management", async () => { - const message: JSONRPCMessage = { - jsonrpc: "2.0", - method: "test", - params: {}, - id: 1, - }; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": "non-existent-session-id", // This would be rejected in mode with state management - }, - body: JSON.stringify(message), - }); + it("should reject invalid session ID", async () => { + // First initialize to be in valid state + await initializeServer(); - const onMessageMock = jest.fn(); - transportWithoutState.onmessage = onMessageMock; + // Now try with invalid session ID + const response = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList, "invalid-session-id"); - await transportWithoutState.handleRequest(req, mockResponse); + expect(response.status).toBe(404); + const errorData = await response.json(); + expectErrorResponse(errorData, -32001, /Session not found/); + }); - // Message should be processed despite invalid session ID - expect(onMessageMock).toHaveBeenCalledWith(message); - }); + it("should establish standalone SSE stream and receive server-initiated messages", async () => { + // First initialize to get a session ID + sessionId = await initializeServer(); - it("should work with a mix of requests with and without session IDs in mode without state management", async () => { - // First request without session ID - const req1 = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - accept: "application/json, text/event-stream", - }, - body: JSON.stringify({ - jsonrpc: "2.0", - method: "test", - params: {}, - id: "test-id" - }) - }); - - await transportWithoutState.handleRequest(req1, mockResponse); - expect(mockResponse.writeHead).toHaveBeenCalledWith( - 200, - expect.objectContaining({ - "Content-Type": "text/event-stream", - }) - ); - - // Reset mock for second request - mockResponse.writeHead.mockClear(); - - // Second request with a session ID (which would be invalid in mode with state management) - const req2 = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - accept: "application/json, text/event-stream", - "mcp-session-id": "some-random-session-id", - }, - body: JSON.stringify({ - jsonrpc: "2.0", - method: "test2", - params: {}, - id: "test-id-2" - }) - }); - - await transportWithoutState.handleRequest(req2, mockResponse); - - // Should still succeed - expect(mockResponse.writeHead).toHaveBeenCalledWith( - 200, - expect.objectContaining({ - "Content-Type": "text/event-stream", - }) - ); + // Open a standalone SSE stream + const sseResponse = await fetch(baseUrl, { + method: "GET", + headers: { + Accept: "text/event-stream", + "mcp-session-id": sessionId, + }, }); - it("should handle initialization in mode without state management", async () => { - const transportWithoutState = new StreamableHTTPServerTransport({ sessionIdGenerator: () => undefined }); + expect(sseResponse.status).toBe(200); + expect(sseResponse.headers.get("content-type")).toBe("text/event-stream"); - // Initialize message - const initializeMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - expect(transportWithoutState.sessionId).toBeUndefined(); - expect(transportWithoutState["_initialized"]).toBe(false); + // Send a notification (server-initiated message) that should appear on SSE stream + const notification: JSONRPCMessage = { + jsonrpc: "2.0", + method: "notifications/message", + params: { level: "info", data: "Test notification" }, + }; - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initializeMessage), - }); + // Send the notification via transport + await transport.send(notification); - const newResponse = createMockResponse(); - await transportWithoutState.handleRequest(req, newResponse); + // Read from the stream and verify we got the notification + const text = await readSSEEvent(sseResponse); - // After initialization, the sessionId should still be undefined - expect(transportWithoutState.sessionId).toBeUndefined(); - expect(transportWithoutState["_initialized"]).toBe(true); + const eventLines = text.split("\n"); + const dataLine = eventLines.find(line => line.startsWith("data:")); + expect(dataLine).toBeDefined(); - // Headers should NOT include session ID in mode without state management - const headers = newResponse.writeHead.mock.calls[0][1]; - expect(headers).not.toHaveProperty("mcp-session-id"); + const eventData = JSON.parse(dataLine!.substring(5)); + expect(eventData).toMatchObject({ + jsonrpc: "2.0", + method: "notifications/message", + params: { level: "info", data: "Test notification" }, }); }); - describe("Request Handling", () => { - // Initialize the transport before tests that need initialization - beforeEach(async () => { - // For tests that need initialization, initialize here - const initMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage), - }); + it("should not close GET SSE stream after sending multiple server notifications", async () => { + sessionId = await initializeServer(); - await transport.handleRequest(initReq, mockResponse); - mockResponse.writeHead.mockClear(); + // Open a standalone SSE stream + const sseResponse = await fetch(baseUrl, { + method: "GET", + headers: { + Accept: "text/event-stream", + "mcp-session-id": sessionId, + }, }); - it("should accept GET requests for SSE with proper Accept header", async () => { - const req = createMockRequest({ - method: "GET", - headers: { - "accept": "text/event-stream", - "mcp-session-id": transport.sessionId, - }, - }); + expect(sseResponse.status).toBe(200); + const reader = sseResponse.body?.getReader(); - await transport.handleRequest(req, mockResponse); + // Send multiple notifications + const notification1: JSONRPCMessage = { + jsonrpc: "2.0", + method: "notifications/message", + params: { level: "info", data: "First notification" } + }; - expect(mockResponse.writeHead).toHaveBeenCalledWith(200, expect.objectContaining({ - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache, no-transform", - "Connection": "keep-alive", - "mcp-session-id": transport.sessionId, - })); - }); + // Just send one and verify it comes through - then the stream should stay open + await transport.send(notification1); - it("should reject GET requests without Accept: text/event-stream header", async () => { - const req = createMockRequest({ - method: "GET", - headers: { - "accept": "application/json", - "mcp-session-id": transport.sessionId, - }, - }); + const { value, done } = await reader!.read(); + const text = new TextDecoder().decode(value); + expect(text).toContain("First notification"); + expect(done).toBe(false); // Stream should still be open + }); - await transport.handleRequest(req, mockResponse); + it("should reject second SSE stream for the same session", async () => { + sessionId = await initializeServer(); - expect(mockResponse.writeHead).toHaveBeenCalledWith(406); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"message":"Not Acceptable: Client must accept text/event-stream"')); + // Open first SSE stream + const firstStream = await fetch(baseUrl, { + method: "GET", + headers: { + Accept: "text/event-stream", + "mcp-session-id": sessionId, + }, }); - it("should send server-initiated requests to GET SSE stream", async () => { - // Open a standalone SSE stream with GET - const req = createMockRequest({ - method: "GET", - headers: { - "accept": "text/event-stream", - "mcp-session-id": transport.sessionId, - }, - }); - - const sseResponse = createMockResponse(); - await transport.handleRequest(req, sseResponse); - - // Send a notification without a related request ID - const notification: JSONRPCMessage = { - jsonrpc: "2.0", - method: "notifications/resources/updated", - params: { uri: "someuri" } - }; + expect(firstStream.status).toBe(200); - await transport.send(notification); - - // Verify notification was sent on SSE stream - expect(sseResponse.write).toHaveBeenCalledWith( - expect.stringContaining(`event: message\ndata: ${JSON.stringify(notification)}\n\n`) - ); + // Try to open a second SSE stream with the same session ID + const secondStream = await fetch(baseUrl, { + method: "GET", + headers: { + Accept: "text/event-stream", + "mcp-session-id": sessionId, + }, }); - it("should not close GET SSE stream after sending server requests or notifications", async () => { - // Open a standalone SSE stream - const req = createMockRequest({ - method: "GET", - headers: { - "accept": "text/event-stream", - "mcp-session-id": transport.sessionId, - }, - }); - - const sseResponse = createMockResponse(); - await transport.handleRequest(req, sseResponse); - - // Send multiple notifications - const notification1: JSONRPCMessage = { jsonrpc: "2.0", method: "event1", params: {} }; - const notification2: JSONRPCMessage = { jsonrpc: "2.0", method: "event2", params: {} }; + // Should be rejected + expect(secondStream.status).toBe(409); // Conflict + const errorData = await secondStream.json(); + expectErrorResponse(errorData, -32000, /Only one SSE stream is allowed per session/); + }); - await transport.send(notification1); - await transport.send(notification2); + it("should reject GET requests without Accept: text/event-stream header", async () => { + sessionId = await initializeServer(); - // Stream should remain open - expect(sseResponse.end).not.toHaveBeenCalled(); + // Try GET without proper Accept header + const response = await fetch(baseUrl, { + method: "GET", + headers: { + Accept: "application/json", + "mcp-session-id": sessionId, + }, }); - it("should reject second GET SSE stream for the same session", async () => { - // Open first SSE stream - should succeed - const req1 = createMockRequest({ - method: "GET", - headers: { - "accept": "text/event-stream", - "mcp-session-id": transport.sessionId, - }, - }); - - const sseResponse1 = createMockResponse(); - await transport.handleRequest(req1, sseResponse1); - - // Try to open a second SSE stream - should be rejected - const req2 = createMockRequest({ - method: "GET", - headers: { - "accept": "text/event-stream", - "mcp-session-id": transport.sessionId, - }, - }); - - const sseResponse2 = createMockResponse(); - await transport.handleRequest(req2, sseResponse2); + expect(response.status).toBe(406); + const errorData = await response.json(); + expectErrorResponse(errorData, -32000, /Client must accept text\/event-stream/); + }); - // First stream should be good - expect(sseResponse1.writeHead).toHaveBeenCalledWith(200, expect.anything()); + it("should reject POST requests without proper Accept header", async () => { + sessionId = await initializeServer(); - // Second stream should get 409 Conflict - expect(sseResponse2.writeHead).toHaveBeenCalledWith(409); - expect(sseResponse2.end).toHaveBeenCalledWith(expect.stringContaining('"message":"Conflict: Only one SSE stream is allowed per session"')); + // Try POST without Accept: text/event-stream + const response = await fetch(baseUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json", // Missing text/event-stream + "mcp-session-id": sessionId, + }, + body: JSON.stringify(TEST_MESSAGES.toolsList), }); - it("should reject POST requests without proper Accept header", async () => { - const message: JSONRPCMessage = { - jsonrpc: "2.0", - method: "test", - params: {}, - id: 1, - }; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "mcp-session-id": transport.sessionId, - }, - body: JSON.stringify(message), - }); + expect(response.status).toBe(406); + const errorData = await response.json(); + expectErrorResponse(errorData, -32000, /Client must accept both application\/json and text\/event-stream/); + }); - await transport.handleRequest(req, mockResponse); + it("should reject unsupported Content-Type", async () => { + sessionId = await initializeServer(); - expect(mockResponse.writeHead).toHaveBeenCalledWith(406); + // Try POST with text/plain Content-Type + const response = await fetch(baseUrl, { + method: "POST", + headers: { + "Content-Type": "text/plain", + Accept: "application/json, text/event-stream", + "mcp-session-id": sessionId, + }, + body: "This is plain text", }); - it("should properly handle JSON-RPC request messages in POST requests", async () => { - const message: JSONRPCMessage = { - jsonrpc: "2.0", - method: "test", - params: {}, - id: 1, - }; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId, - }, - body: JSON.stringify(message), - }); + expect(response.status).toBe(415); + const errorData = await response.json(); + expectErrorResponse(errorData, -32000, /Content-Type must be application\/json/); + }); - const onMessageMock = jest.fn(); - transport.onmessage = onMessageMock; + it("should handle JSON-RPC batch notification messages with 202 response", async () => { + sessionId = await initializeServer(); - await transport.handleRequest(req, mockResponse); + // Send batch of notifications (no IDs) + const batchNotifications: JSONRPCMessage[] = [ + { jsonrpc: "2.0", method: "someNotification1", params: {} }, + { jsonrpc: "2.0", method: "someNotification2", params: {} }, + ]; + const response = await sendPostRequest(baseUrl, batchNotifications, sessionId); - expect(onMessageMock).toHaveBeenCalledWith(message); - expect(mockResponse.writeHead).toHaveBeenCalledWith( - 200, - expect.objectContaining({ - "Content-Type": "text/event-stream", - }) - ); - }); + expect(response.status).toBe(202); + }); - it("should properly handle JSON-RPC notification or response messages in POST requests", async () => { - const notification: JSONRPCMessage = { - jsonrpc: "2.0", - method: "test", - params: {}, - }; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId, - }, - body: JSON.stringify(notification), - }); + it("should handle batch request messages with SSE stream for responses", async () => { + sessionId = await initializeServer(); - const onMessageMock = jest.fn(); - transport.onmessage = onMessageMock; + // Send batch of requests + const batchRequests: JSONRPCMessage[] = [ + { jsonrpc: "2.0", method: "tools/list", params: {}, id: "req-1" }, + { jsonrpc: "2.0", method: "tools/call", params: { name: "greet", arguments: { name: "BatchUser" } }, id: "req-2" }, + ]; + const response = await sendPostRequest(baseUrl, batchRequests, sessionId); - await transport.handleRequest(req, mockResponse); + expect(response.status).toBe(200); + expect(response.headers.get("content-type")).toBe("text/event-stream"); - expect(onMessageMock).toHaveBeenCalledWith(notification); - expect(mockResponse.writeHead).toHaveBeenCalledWith(202); - }); + const reader = response.body?.getReader(); - it("should handle batch notification messages properly with 202 response", async () => { - const batchMessages: JSONRPCMessage[] = [ - { jsonrpc: "2.0", method: "test1", params: {} }, - { jsonrpc: "2.0", method: "test2", params: {} }, - ]; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId, - }, - body: JSON.stringify(batchMessages), - }); + // The responses may come in any order or together in one chunk + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); - const onMessageMock = jest.fn(); - transport.onmessage = onMessageMock; + // Check that both responses were sent on the same stream + expect(text).toContain('"id":"req-1"'); + expect(text).toContain('"tools"'); // tools/list result + expect(text).toContain('"id":"req-2"'); + expect(text).toContain('Hello, BatchUser'); // tools/call result + }); - await transport.handleRequest(req, mockResponse); + it("should properly handle invalid JSON data", async () => { + sessionId = await initializeServer(); - expect(onMessageMock).toHaveBeenCalledTimes(2); - expect(mockResponse.writeHead).toHaveBeenCalledWith(202); + // Send invalid JSON + const response = await fetch(baseUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json, text/event-stream", + "mcp-session-id": sessionId, + }, + body: "This is not valid JSON", }); - it("should handle batch request messages with SSE when Accept header includes text/event-stream", async () => { - const batchMessages: JSONRPCMessage[] = [ - { jsonrpc: "2.0", method: "test1", params: {}, id: "req1" }, - { jsonrpc: "2.0", method: "test2", params: {}, id: "req2" }, - ]; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "text/event-stream, application/json", - "mcp-session-id": transport.sessionId, - }, - body: JSON.stringify(batchMessages), - }); - - const onMessageMock = jest.fn(); - transport.onmessage = onMessageMock; - - mockResponse = createMockResponse(); // Create fresh mock - await transport.handleRequest(req, mockResponse); - - // Should establish SSE connection - expect(mockResponse.writeHead).toHaveBeenCalledWith( - 200, - expect.objectContaining({ - "Content-Type": "text/event-stream" - }) - ); - expect(onMessageMock).toHaveBeenCalledTimes(2); - // Stream should remain open until responses are sent - expect(mockResponse.end).not.toHaveBeenCalled(); - }); + expect(response.status).toBe(400); + const errorData = await response.json(); + expectErrorResponse(errorData, -32700, /Parse error/); + }); - it("should reject unsupported Content-Type", async () => { - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "text/plain", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId, - }, - body: "test", - }); + it("should return 400 error for invalid JSON-RPC messages", async () => { + sessionId = await initializeServer(); - await transport.handleRequest(req, mockResponse); + // Invalid JSON-RPC (missing required jsonrpc version) + const invalidMessage = { method: "tools/list", params: {}, id: 1 }; // missing jsonrpc version + const response = await sendPostRequest(baseUrl, invalidMessage as JSONRPCMessage, sessionId); - expect(mockResponse.writeHead).toHaveBeenCalledWith(415); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"jsonrpc":"2.0"')); + expect(response.status).toBe(400); + const errorData = await response.json(); + expect(errorData).toMatchObject({ + jsonrpc: "2.0", + error: expect.anything(), }); + }); - it("should properly handle DELETE requests and close session", async () => { - // First initialize the transport - const initMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage), - }); - - await transport.handleRequest(initReq, mockResponse); - mockResponse.writeHead.mockClear(); + it("should reject requests to uninitialized server", async () => { + // Create a new HTTP server and transport without initializing + const { server: uninitializedServer, transport: uninitializedTransport, baseUrl: uninitializedUrl } = await createTestServer(); + // Transport not used in test but needed for cleanup - // Now try DELETE with proper session ID - const req = createMockRequest({ - method: "DELETE", - headers: { - "mcp-session-id": transport.sessionId, - }, - }); - - const onCloseMock = jest.fn(); - transport.onclose = onCloseMock; + // No initialization, just send a request directly + const uninitializedMessage: JSONRPCMessage = { + jsonrpc: "2.0", + method: "tools/list", + params: {}, + id: "uninitialized-test", + }; - await transport.handleRequest(req, mockResponse); + // Send a request to uninitialized server + const response = await sendPostRequest(uninitializedUrl, uninitializedMessage, "any-session-id"); - expect(mockResponse.writeHead).toHaveBeenCalledWith(200); - expect(onCloseMock).toHaveBeenCalled(); - }); + expect(response.status).toBe(400); + const errorData = await response.json(); + expectErrorResponse(errorData, -32000, /Server not initialized/); - it("should reject DELETE requests with invalid session ID", async () => { - // First initialize the transport - const initMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage), - }); + // Cleanup + await stopTestServer({ server: uninitializedServer, transport: uninitializedTransport }); + }); - await transport.handleRequest(initReq, mockResponse); - mockResponse.writeHead.mockClear(); + it("should send response messages to the connection that sent the request", async () => { + sessionId = await initializeServer(); - // Now try DELETE with invalid session ID - const req = createMockRequest({ - method: "DELETE", - headers: { - "mcp-session-id": "invalid-session-id", - }, - }); + const message1: JSONRPCMessage = { + jsonrpc: "2.0", + method: "tools/list", + params: {}, + id: "req-1" + }; - const onCloseMock = jest.fn(); - transport.onclose = onCloseMock; + const message2: JSONRPCMessage = { + jsonrpc: "2.0", + method: "tools/call", + params: { + name: "greet", + arguments: { name: "Connection2" } + }, + id: "req-2" + }; + + // Make two concurrent fetch connections for different requests + const req1 = sendPostRequest(baseUrl, message1, sessionId); + const req2 = sendPostRequest(baseUrl, message2, sessionId); + + // Get both responses + const [response1, response2] = await Promise.all([req1, req2]); + const reader1 = response1.body?.getReader(); + const reader2 = response2.body?.getReader(); + + // Read responses from each stream (requires each receives its specific response) + const { value: value1 } = await reader1!.read(); + const text1 = new TextDecoder().decode(value1); + expect(text1).toContain('"id":"req-1"'); + expect(text1).toContain('"tools"'); // tools/list result + + const { value: value2 } = await reader2!.read(); + const text2 = new TextDecoder().decode(value2); + expect(text2).toContain('"id":"req-2"'); + expect(text2).toContain('Hello, Connection2'); // tools/call result + }); - await transport.handleRequest(req, mockResponse); + it("should keep stream open after sending server notifications", async () => { + sessionId = await initializeServer(); - expect(mockResponse.writeHead).toHaveBeenCalledWith(404); - expect(onCloseMock).not.toHaveBeenCalled(); + // Open a standalone SSE stream + const sseResponse = await fetch(baseUrl, { + method: "GET", + headers: { + Accept: "text/event-stream", + "mcp-session-id": sessionId, + }, }); - }); - describe("SSE Response Handling", () => { - beforeEach(async () => { - // Initialize the transport - const initMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage), - }); - const initResponse = createMockResponse(); - await transport.handleRequest(initReq, initResponse); - mockResponse.writeHead.mockClear(); + // Send several server-initiated notifications + await transport.send({ + jsonrpc: "2.0", + method: "notifications/message", + params: { level: "info", data: "First notification" }, }); - it("should send response messages as SSE events", async () => { - // Setup a POST request with JSON-RPC request that accepts SSE - const requestMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "test", - params: {}, - id: "test-req-id" - }; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId - }, - body: JSON.stringify(requestMessage) - }); - - await transport.handleRequest(req, mockResponse); - - // Send a response to the request - const responseMessage: JSONRPCMessage = { - jsonrpc: "2.0", - result: { value: "test-result" }, - id: "test-req-id" - }; - - await transport.send(responseMessage, { relatedRequestId: "test-req-id" }); - - // Verify response was sent as SSE event - expect(mockResponse.write).toHaveBeenCalledWith( - expect.stringContaining(`event: message\ndata: ${JSON.stringify(responseMessage)}\n\n`) - ); - - // Stream should be closed after sending response - expect(mockResponse.end).toHaveBeenCalled(); + await transport.send({ + jsonrpc: "2.0", + method: "notifications/message", + params: { level: "info", data: "Second notification" }, }); - it("should keep stream open when sending intermediate notifications and requests", async () => { - // Setup a POST request with JSON-RPC request that accepts SSE - const requestMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "test", - params: {}, - id: "test-req-id" - }; - - // Create fresh response for this test - mockResponse = createMockResponse(); - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId - }, - body: JSON.stringify(requestMessage) - }); + // Stream should still be open - it should not close after sending notifications + expect(sseResponse.bodyUsed).toBe(false); + }); - await transport.handleRequest(req, mockResponse); + // The current implementation will close the entire transport for DELETE + // Creating a temporary transport/server where we don't care if it gets closed + it("should properly handle DELETE requests and close session", async () => { + // Setup a temporary server for this test + const tempResult = await createTestServer(); + const tempServer = tempResult.server; + const tempUrl = tempResult.baseUrl; - // Send an intermediate notification - const notification: JSONRPCMessage = { - jsonrpc: "2.0", - method: "progress", - params: { progress: "50%" } - }; + // Initialize to get a session ID + const initResponse = await sendPostRequest(tempUrl, TEST_MESSAGES.initialize); + const tempSessionId = initResponse.headers.get("mcp-session-id"); - await transport.send(notification, { relatedRequestId: "test-req-id" }); + // Now DELETE the session + const deleteResponse = await fetch(tempUrl, { + method: "DELETE", + headers: { "mcp-session-id": tempSessionId || "" }, + }); - // Stream should remain open - expect(mockResponse.end).not.toHaveBeenCalled(); + expect(deleteResponse.status).toBe(200); - // Send the final response - const responseMessage: JSONRPCMessage = { - jsonrpc: "2.0", - result: { value: "test-result" }, - id: "test-req-id" - }; + // Clean up + await new Promise((resolve) => tempServer.close(() => resolve())); + }); - await transport.send(responseMessage, { relatedRequestId: "test-req-id" }); + it("should reject DELETE requests with invalid session ID", async () => { + // Initialize the server first to activate it + sessionId = await initializeServer(); - // Now stream should be closed - expect(mockResponse.end).toHaveBeenCalled(); + // Try to delete with invalid session ID + const response = await fetch(baseUrl, { + method: "DELETE", + headers: { "mcp-session-id": "invalid-session-id" }, }); - it("should keep stream open when multiple requests share the same connection", async () => { - // Create a fresh response for this test - const sharedResponse = createMockResponse(); - - // Send two requests in a batch that will share the same connection - const batchRequests: JSONRPCMessage[] = [ - { jsonrpc: "2.0", method: "method1", params: {}, id: "req1" }, - { jsonrpc: "2.0", method: "method2", params: {}, id: "req2" } - ]; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId - }, - body: JSON.stringify(batchRequests) - }); + expect(response.status).toBe(404); + const errorData = await response.json(); + expectErrorResponse(errorData, -32001, /Session not found/); + }); +}); - await transport.handleRequest(req, sharedResponse); +// Test JSON Response Mode +describe("StreamableHTTPServerTransport with JSON Response Mode", () => { + let server: Server; + let transport: StreamableHTTPServerTransport; + let baseUrl: URL; + let sessionId: string; - // Respond to first request - const response1: JSONRPCMessage = { - jsonrpc: "2.0", - result: { value: "result1" }, - id: "req1" - }; + beforeEach(async () => { + const result = await createTestServer({ enableJsonResponse: true }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; - await transport.send(response1); + // Initialize and get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); - // Connection should remain open because req2 is still pending - expect(sharedResponse.write).toHaveBeenCalledWith( - expect.stringContaining(`event: message\ndata: ${JSON.stringify(response1)}\n\n`) - ); - expect(sharedResponse.end).not.toHaveBeenCalled(); + sessionId = initResponse.headers.get("mcp-session-id") as string; + }); - // Respond to second request - const response2: JSONRPCMessage = { - jsonrpc: "2.0", - result: { value: "result2" }, - id: "req2" - }; + afterEach(async () => { + await stopTestServer({ server, transport }); + }); - await transport.send(response2); + it("should return JSON response for a single request", async () => { + const toolsListMessage: JSONRPCMessage = { + jsonrpc: "2.0", + method: "tools/list", + params: {}, + id: "json-req-1", + }; - // Now connection should close as all requests are complete - expect(sharedResponse.write).toHaveBeenCalledWith( - expect.stringContaining(`event: message\ndata: ${JSON.stringify(response2)}\n\n`) - ); - expect(sharedResponse.end).toHaveBeenCalled(); - }); + const response = await sendPostRequest(baseUrl, toolsListMessage, sessionId); - it("should clean up connection tracking when a response is sent", async () => { - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId - }, - body: JSON.stringify({ - jsonrpc: "2.0", - method: "test", - params: {}, - id: "cleanup-test" - }) - }); - - const response = createMockResponse(); - await transport.handleRequest(req, response); - - // Verify that the request is tracked in the SSE map - expect(transport["_responseMapping"].size).toBe(2); - expect(transport["_responseMapping"].has("cleanup-test")).toBe(true); - - // Send a response - await transport.send({ - jsonrpc: "2.0", - result: {}, - id: "cleanup-test" - }); + expect(response.status).toBe(200); + expect(response.headers.get("content-type")).toBe("application/json"); - // Verify that the mapping was cleaned up - expect(transport["_responseMapping"].size).toBe(1); - expect(transport["_responseMapping"].has("cleanup-test")).toBe(false); + const result = await response.json(); + expect(result).toMatchObject({ + jsonrpc: "2.0", + result: expect.objectContaining({ + tools: expect.arrayContaining([ + expect.objectContaining({ name: "greet" }) + ]) + }), + id: "json-req-1" }); + }); - it("should clean up connection tracking when client disconnects", async () => { - // Setup two requests that share a connection - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId - }, - body: JSON.stringify([ - { jsonrpc: "2.0", method: "longRunning1", params: {}, id: "req1" }, - { jsonrpc: "2.0", method: "longRunning2", params: {}, id: "req2" } - ]) - }); + it("should return JSON response for batch requests", async () => { + const batchMessages: JSONRPCMessage[] = [ + { jsonrpc: "2.0", method: "tools/list", params: {}, id: "batch-1" }, + { jsonrpc: "2.0", method: "tools/call", params: { name: "greet", arguments: { name: "JSON" } }, id: "batch-2" } + ]; - const response = createMockResponse(); + const response = await sendPostRequest(baseUrl, batchMessages, sessionId); - // We need to manually store the callback to trigger it later - let closeCallback: (() => void) | undefined; - response.on.mockImplementation((event, callback: () => void) => { - if (typeof event === "string" && event === "close") { - closeCallback = callback; - } - return response; - }); + expect(response.status).toBe(200); + expect(response.headers.get("content-type")).toBe("application/json"); - await transport.handleRequest(req, response); + const results = await response.json(); + expect(Array.isArray(results)).toBe(true); + expect(results).toHaveLength(2); - // Both requests should be mapped to the same response - expect(transport["_responseMapping"].size).toBe(3); - expect(transport["_responseMapping"].get("req1")).toBe(response); - expect(transport["_responseMapping"].get("req2")).toBe(response); + // Batch responses can come in any order + const listResponse = results.find((r: { id?: string }) => r.id === "batch-1"); + const callResponse = results.find((r: { id?: string }) => r.id === "batch-2"); - // Simulate client disconnect by triggering the stored callback - if (closeCallback) closeCallback(); + expect(listResponse).toEqual(expect.objectContaining({ + jsonrpc: "2.0", + id: "batch-1", + result: expect.objectContaining({ + tools: expect.arrayContaining([ + expect.objectContaining({ name: "greet" }) + ]) + }) + })); - // All entries using this response should be removed - expect(transport["_responseMapping"].size).toBe(1); - expect(transport["_responseMapping"].has("req1")).toBe(false); - expect(transport["_responseMapping"].has("req2")).toBe(false); - }); + expect(callResponse).toEqual(expect.objectContaining({ + jsonrpc: "2.0", + id: "batch-2", + result: expect.objectContaining({ + content: expect.arrayContaining([ + expect.objectContaining({ type: "text", text: "Hello, JSON!" }) + ]) + }) + })); }); +}); - describe("Message Targeting", () => { - beforeEach(async () => { - // Initialize the transport - const initMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage), - }); - - await transport.handleRequest(initReq, mockResponse); - mockResponse.writeHead.mockClear(); +// Test pre-parsed body handling +describe("StreamableHTTPServerTransport with pre-parsed body", () => { + let server: Server; + let transport: StreamableHTTPServerTransport; + let baseUrl: URL; + let sessionId: string; + let parsedBody: unknown = null; + + beforeEach(async () => { + const result = await createTestServer({ + customRequestHandler: async (req, res) => { + try { + if (parsedBody !== null) { + await transport.handleRequest(req, res, parsedBody); + parsedBody = null; // Reset after use + } else { + await transport.handleRequest(req, res); + } + } catch (error) { + console.error("Error handling request:", error); + if (!res.headersSent) res.writeHead(500).end(); + } + } }); - it("should send response messages to the connection that sent the request", async () => { - // Create request with two separate connections - const requestMessage1: JSONRPCMessage = { - jsonrpc: "2.0", - method: "test1", - params: {}, - id: "req-id-1", - }; - - const mockResponse1 = createMockResponse(); - const req1 = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId - }, - body: JSON.stringify(requestMessage1), - }); - await transport.handleRequest(req1, mockResponse1); - - const requestMessage2: JSONRPCMessage = { - jsonrpc: "2.0", - method: "test2", - params: {}, - id: "req-id-2", - }; - - const mockResponse2 = createMockResponse(); - const req2 = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId - }, - body: JSON.stringify(requestMessage2), - }); - await transport.handleRequest(req2, mockResponse2); - - // Send responses with matching IDs - const responseMessage1: JSONRPCMessage = { - jsonrpc: "2.0", - result: { success: true }, - id: "req-id-1", - }; - - await transport.send(responseMessage1, { relatedRequestId: "req-id-1" }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; - const responseMessage2: JSONRPCMessage = { - jsonrpc: "2.0", - result: { success: true }, - id: "req-id-2", - }; - - await transport.send(responseMessage2, { relatedRequestId: "req-id-2" }); - - // Verify responses were sent to the right connections - expect(mockResponse1.write).toHaveBeenCalledWith( - expect.stringContaining(JSON.stringify(responseMessage1)) - ); - - expect(mockResponse2.write).toHaveBeenCalledWith( - expect.stringContaining(JSON.stringify(responseMessage2)) - ); - - // Verify responses were not sent to the wrong connections - const resp1HasResp2 = mockResponse1.write.mock.calls.some(call => - typeof call[0] === 'string' && call[0].includes(JSON.stringify(responseMessage2)) - ); - expect(resp1HasResp2).toBe(false); - - const resp2HasResp1 = mockResponse2.write.mock.calls.some(call => - typeof call[0] === 'string' && call[0].includes(JSON.stringify(responseMessage1)) - ); - expect(resp2HasResp1).toBe(false); - }); + // Initialize and get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get("mcp-session-id") as string; }); - describe("Error Handling", () => { - it("should return 400 error for invalid JSON data", async () => { - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: "invalid json", - }); + afterEach(async () => { + await stopTestServer({ server, transport }); + }); - const onErrorMock = jest.fn(); - transport.onerror = onErrorMock; + it("should accept pre-parsed request body", async () => { + // Set up the pre-parsed body + parsedBody = { + jsonrpc: "2.0", + method: "tools/list", + params: {}, + id: "preparsed-1", + }; + + // Send an empty body since we'll use pre-parsed body + const response = await fetch(baseUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json, text/event-stream", + "mcp-session-id": sessionId, + }, + // Empty body - we're testing pre-parsed body + body: "" + }); + + expect(response.status).toBe(200); + expect(response.headers.get("content-type")).toBe("text/event-stream"); + + const reader = response.body?.getReader(); + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); + + // Verify the response used the pre-parsed body + expect(text).toContain('"id":"preparsed-1"'); + expect(text).toContain('"tools"'); + }); - await transport.handleRequest(req, mockResponse); + it("should handle pre-parsed batch messages", async () => { + parsedBody = [ + { jsonrpc: "2.0", method: "tools/list", params: {}, id: "batch-1" }, + { jsonrpc: "2.0", method: "tools/call", params: { name: "greet", arguments: { name: "PreParsed" } }, id: "batch-2" } + ]; - expect(mockResponse.writeHead).toHaveBeenCalledWith(400); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"jsonrpc":"2.0"')); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"code":-32700')); - expect(onErrorMock).toHaveBeenCalled(); + const response = await fetch(baseUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json, text/event-stream", + "mcp-session-id": sessionId, + }, + body: "" // Empty as we're using pre-parsed }); - it("should return 400 error for invalid JSON-RPC messages", async () => { - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify({ invalid: "message" }), - }); + expect(response.status).toBe(200); - const onErrorMock = jest.fn(); - transport.onerror = onErrorMock; + const reader = response.body?.getReader(); + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); - await transport.handleRequest(req, mockResponse); + expect(text).toContain('"id":"batch-1"'); + expect(text).toContain('"tools"'); + }); - expect(mockResponse.writeHead).toHaveBeenCalledWith(400); - expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"jsonrpc":"2.0"')); - expect(onErrorMock).toHaveBeenCalled(); - }); + it("should prefer pre-parsed body over request body", async () => { + // Set pre-parsed to tools/list + parsedBody = { + jsonrpc: "2.0", + method: "tools/list", + params: {}, + id: "preparsed-wins", + }; + + // Send actual body with tools/call - should be ignored + const response = await fetch(baseUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json, text/event-stream", + "mcp-session-id": sessionId, + }, + body: JSON.stringify({ + jsonrpc: "2.0", + method: "tools/call", + params: { name: "greet", arguments: { name: "Ignored" } }, + id: "ignored-id" + }) + }); + + expect(response.status).toBe(200); + + const reader = response.body?.getReader(); + const { value } = await reader!.read(); + const text = new TextDecoder().decode(value); + + // Should have processed the pre-parsed body + expect(text).toContain('"id":"preparsed-wins"'); + expect(text).toContain('"tools"'); + expect(text).not.toContain('"ignored-id"'); }); +}); - describe("JSON Response Mode", () => { - let jsonResponseTransport: StreamableHTTPServerTransport; - let mockResponse: jest.Mocked; +// Test stateless mode +describe("StreamableHTTPServerTransport in stateless mode", () => { + let server: Server; + let transport: StreamableHTTPServerTransport; + let baseUrl: URL; - beforeEach(async () => { - jsonResponseTransport = new StreamableHTTPServerTransport({ - sessionIdGenerator: () => randomUUID(), - enableJsonResponse: true, - }); + beforeEach(async () => { + const result = await createTestServer({ sessionIdGenerator: () => undefined }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + }); - // Initialize the transport - const initMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage), - }); + afterEach(async () => { + await stopTestServer({ server, transport }); + }); - mockResponse = createMockResponse(); - await jsonResponseTransport.handleRequest(initReq, mockResponse); - mockResponse = createMockResponse(); // Reset for tests - }); + it("should operate without session ID validation", async () => { + // Initialize the server first + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); - it("should return JSON response for a single request", async () => { - const requestMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "tools/list", - params: {}, - id: "test-req-id", - }; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": jsonResponseTransport.sessionId, - }, - body: JSON.stringify(requestMessage), - }); - - // Mock immediate response - jsonResponseTransport.onmessage = (message) => { - if ('method' in message && 'id' in message) { - const responseMessage: JSONRPCMessage = { - jsonrpc: "2.0", - result: { value: `test-result` }, - id: message.id, - }; - void jsonResponseTransport.send(responseMessage); - } - }; - - await jsonResponseTransport.handleRequest(req, mockResponse); - // Should respond with application/json header - expect(mockResponse.writeHead).toHaveBeenCalledWith( - 200, - expect.objectContaining({ - "Content-Type": "application/json", - }) - ); - - // Should return the response as JSON - const expectedResponse = { - jsonrpc: "2.0", - result: { value: "test-result" }, - id: "test-req-id", - }; + expect(initResponse.status).toBe(200); + // Should NOT have session ID header in stateless mode + expect(initResponse.headers.get("mcp-session-id")).toBeNull(); - expect(mockResponse.end).toHaveBeenCalledWith(JSON.stringify(expectedResponse)); - }); + // Try request without session ID - should work in stateless mode + const toolsResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.toolsList); - it("should return JSON response for batch requests", async () => { - const batchMessages: JSONRPCMessage[] = [ - { jsonrpc: "2.0", method: "tools/list", params: {}, id: "req1" }, - { jsonrpc: "2.0", method: "tools/call", params: {}, id: "req2" }, - ]; - - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": jsonResponseTransport.sessionId, - }, - body: JSON.stringify(batchMessages), - }); - - // Mock responses without enforcing specific order - jsonResponseTransport.onmessage = (message) => { - if ('method' in message && 'id' in message) { - const responseMessage: JSONRPCMessage = { - jsonrpc: "2.0", - result: { value: `result-for-${message.id}` }, - id: message.id, - }; - void jsonResponseTransport.send(responseMessage); - } - }; - - await jsonResponseTransport.handleRequest(req, mockResponse); - - // Should respond with application/json header - expect(mockResponse.writeHead).toHaveBeenCalledWith( - 200, - expect.objectContaining({ - "Content-Type": "application/json", - }) - ); - - // Verify response was sent but don't assume specific order - expect(mockResponse.end).toHaveBeenCalled(); - const responseJson = JSON.parse(mockResponse.end.mock.calls[0][0] as string); - expect(Array.isArray(responseJson)).toBe(true); - expect(responseJson).toHaveLength(2); - - // Check each response exists separately without assuming order - expect(responseJson).toContainEqual(expect.objectContaining({ id: "req1", result: { value: "result-for-req1" } })); - expect(responseJson).toContainEqual(expect.objectContaining({ id: "req2", result: { value: "result-for-req2" } })); - }); + expect(toolsResponse.status).toBe(200); }); - describe("Handling Pre-Parsed Body", () => { - beforeEach(async () => { - // Initialize the transport - const initMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "initialize", - params: { - clientInfo: { name: "test-client", version: "1.0" }, - protocolVersion: "2025-03-26" - }, - id: "init-1", - }; - - const initReq = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - }, - body: JSON.stringify(initMessage), - }); + it("should handle POST requests with various session IDs in stateless mode", async () => { + // Initialize the server first + await fetch(baseUrl, { + method: "POST", + headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream" }, + body: JSON.stringify({ + jsonrpc: "2.0", method: "initialize", params: { clientInfo: { name: "test-client", version: "1.0" }, protocolVersion: "2025-03-26" }, id: "init-1" + }), + }); + + // Try with a random session ID - should be accepted + const response1 = await fetch(baseUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json, text/event-stream", + "mcp-session-id": "random-id-1", + }, + body: JSON.stringify({ jsonrpc: "2.0", method: "tools/list", params: {}, id: "t1" }), + }); + expect(response1.status).toBe(200); + + // Try with another random session ID - should also be accepted + const response2 = await fetch(baseUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json, text/event-stream", + "mcp-session-id": "different-id-2", + }, + body: JSON.stringify({ jsonrpc: "2.0", method: "tools/list", params: {}, id: "t2" }), + }); + expect(response2.status).toBe(200); + }); - await transport.handleRequest(initReq, mockResponse); - mockResponse.writeHead.mockClear(); - }); + it("should reject second SSE stream even in stateless mode", async () => { + // Despite no session ID requirement, the transport still only allows + // one standalone SSE stream at a time - it("should accept pre-parsed request body", async () => { - const message: JSONRPCMessage = { - jsonrpc: "2.0", - method: "test", - params: {}, - id: "pre-parsed-test", - }; - - // Create a request without actual body content - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId, - }, - // No body provided here - it will be passed as parsedBody - }); - - const onMessageMock = jest.fn(); - transport.onmessage = onMessageMock; - - // Pass the pre-parsed body directly - await transport.handleRequest(req, mockResponse, message); - - // Verify the message was processed correctly - expect(onMessageMock).toHaveBeenCalledWith(message); - expect(mockResponse.writeHead).toHaveBeenCalledWith( - 200, - expect.objectContaining({ - "Content-Type": "text/event-stream", - }) - ); + // Initialize the server first + await fetch(baseUrl, { + method: "POST", + headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream" }, + body: JSON.stringify({ + jsonrpc: "2.0", method: "initialize", params: { clientInfo: { name: "test-client", version: "1.0" }, protocolVersion: "2025-03-26" }, id: "init-1" + }), }); - it("should handle pre-parsed batch messages", async () => { - const batchMessages: JSONRPCMessage[] = [ - { - jsonrpc: "2.0", - method: "method1", - params: { data: "test1" }, - id: "batch1" - }, - { - jsonrpc: "2.0", - method: "method2", - params: { data: "test2" }, - id: "batch2" - }, - ]; - - // Create a request without actual body content - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId, - }, - // No body provided here - it will be passed as parsedBody - }); - - const onMessageMock = jest.fn(); - transport.onmessage = onMessageMock; - - // Pass the pre-parsed body directly - await transport.handleRequest(req, mockResponse, batchMessages); - - // Should be called for each message in the batch - expect(onMessageMock).toHaveBeenCalledTimes(2); - expect(onMessageMock).toHaveBeenCalledWith(batchMessages[0]); - expect(onMessageMock).toHaveBeenCalledWith(batchMessages[1]); + // Open first SSE stream + const stream1 = await fetch(baseUrl, { + method: "GET", + headers: { Accept: "text/event-stream" }, }); + expect(stream1.status).toBe(200); - it("should prefer pre-parsed body over request body", async () => { - const requestBodyMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "fromRequestBody", - params: {}, - id: "request-body", - }; - - const parsedBodyMessage: JSONRPCMessage = { - jsonrpc: "2.0", - method: "fromParsedBody", - params: {}, - id: "parsed-body", - }; - - // Create a request with actual body content - const req = createMockRequest({ - method: "POST", - headers: { - "content-type": "application/json", - "accept": "application/json, text/event-stream", - "mcp-session-id": transport.sessionId, - }, - body: JSON.stringify(requestBodyMessage), - }); - - const onMessageMock = jest.fn(); - transport.onmessage = onMessageMock; - - // Pass the pre-parsed body directly - await transport.handleRequest(req, mockResponse, parsedBodyMessage); - - // Should use the parsed body instead of the request body - expect(onMessageMock).toHaveBeenCalledWith(parsedBodyMessage); - expect(onMessageMock).not.toHaveBeenCalledWith(requestBodyMessage); + // Open second SSE stream - should still be rejected, stateless mode still only allows one + const stream2 = await fetch(baseUrl, { + method: "GET", + headers: { Accept: "text/event-stream" }, }); + expect(stream2.status).toBe(409); // Conflict - only one stream allowed }); -}); \ No newline at end of file +}); \ No newline at end of file diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index ec8d2aa7..0eaaa673 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -438,6 +438,8 @@ export class StreamableHTTPServerTransport implements Transport { // Clear any pending responses this._requestResponseMap.clear(); + this._standaloneSSE?.end(); + this._standaloneSSE = undefined; this.onclose?.(); } From 055fe61d9381594fdfec32d49040cccd00c11f8d Mon Sep 17 00:00:00 2001 From: ihrpr Date: Fri, 11 Apr 2025 20:52:27 +0100 Subject: [PATCH 2/2] close server --- src/server/streamableHttp.test.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index dda69331..949cafc7 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -72,8 +72,11 @@ async function createTestServer(config: TestServerConfig = {}): Promise<{ * Helper to stop test server */ async function stopTestServer({ server, transport }: { server: Server; transport: StreamableHTTPServerTransport }): Promise { + // First close the transport to ensure all SSE streams are closed await transport.close(); - await new Promise((resolve) => server.close(() => resolve())); + + // Close the server without waiting indefinitely + server.close(); } /** @@ -649,8 +652,8 @@ describe("StreamableHTTPServerTransport", () => { expect(deleteResponse.status).toBe(200); - // Clean up - await new Promise((resolve) => tempServer.close(() => resolve())); + // Clean up - don't wait indefinitely for server close + tempServer.close(); }); it("should reject DELETE requests with invalid session ID", async () => {