Skip to content

Commit 0445095

Browse files
authored
Merge pull request #290 from modelcontextprotocol/ihrpr/streamable-http-client
Client implementation of Streamable HTTP transport
2 parents d205ad2 + 26929fa commit 0445095

File tree

2 files changed

+628
-0
lines changed

2 files changed

+628
-0
lines changed

src/client/streamableHttp.test.ts

+316
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
import { StreamableHTTPClientTransport } from "./streamableHttp.js";
2+
import { JSONRPCMessage } from "../types.js";
3+
4+
5+
describe("StreamableHTTPClientTransport", () => {
6+
let transport: StreamableHTTPClientTransport;
7+
8+
beforeEach(() => {
9+
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"));
10+
jest.spyOn(global, "fetch");
11+
});
12+
13+
afterEach(async () => {
14+
await transport.close().catch(() => { });
15+
jest.clearAllMocks();
16+
});
17+
18+
it("should send JSON-RPC messages via POST", async () => {
19+
const message: JSONRPCMessage = {
20+
jsonrpc: "2.0",
21+
method: "test",
22+
params: {},
23+
id: "test-id"
24+
};
25+
26+
(global.fetch as jest.Mock).mockResolvedValueOnce({
27+
ok: true,
28+
status: 202,
29+
headers: new Headers(),
30+
});
31+
32+
await transport.send(message);
33+
34+
expect(global.fetch).toHaveBeenCalledWith(
35+
expect.anything(),
36+
expect.objectContaining({
37+
method: "POST",
38+
headers: expect.any(Headers),
39+
body: JSON.stringify(message)
40+
})
41+
);
42+
});
43+
44+
it("should send batch messages", async () => {
45+
const messages: JSONRPCMessage[] = [
46+
{ jsonrpc: "2.0", method: "test1", params: {}, id: "id1" },
47+
{ jsonrpc: "2.0", method: "test2", params: {}, id: "id2" }
48+
];
49+
50+
(global.fetch as jest.Mock).mockResolvedValueOnce({
51+
ok: true,
52+
status: 200,
53+
headers: new Headers({ "content-type": "text/event-stream" }),
54+
body: null
55+
});
56+
57+
await transport.send(messages);
58+
59+
expect(global.fetch).toHaveBeenCalledWith(
60+
expect.anything(),
61+
expect.objectContaining({
62+
method: "POST",
63+
headers: expect.any(Headers),
64+
body: JSON.stringify(messages)
65+
})
66+
);
67+
});
68+
69+
it("should store session ID received during initialization", async () => {
70+
const message: JSONRPCMessage = {
71+
jsonrpc: "2.0",
72+
method: "initialize",
73+
params: {
74+
clientInfo: { name: "test-client", version: "1.0" },
75+
protocolVersion: "2025-03-26"
76+
},
77+
id: "init-id"
78+
};
79+
80+
(global.fetch as jest.Mock).mockResolvedValueOnce({
81+
ok: true,
82+
status: 200,
83+
headers: new Headers({ "mcp-session-id": "test-session-id" }),
84+
});
85+
86+
await transport.send(message);
87+
88+
// Send a second message that should include the session ID
89+
(global.fetch as jest.Mock).mockResolvedValueOnce({
90+
ok: true,
91+
status: 202,
92+
headers: new Headers()
93+
});
94+
95+
await transport.send({ jsonrpc: "2.0", method: "test", params: {} } as JSONRPCMessage);
96+
97+
// Check that second request included session ID header
98+
const calls = (global.fetch as jest.Mock).mock.calls;
99+
const lastCall = calls[calls.length - 1];
100+
expect(lastCall[1].headers).toBeDefined();
101+
expect(lastCall[1].headers.get("mcp-session-id")).toBe("test-session-id");
102+
});
103+
104+
it("should handle 404 response when session expires", async () => {
105+
const message: JSONRPCMessage = {
106+
jsonrpc: "2.0",
107+
method: "test",
108+
params: {},
109+
id: "test-id"
110+
};
111+
112+
(global.fetch as jest.Mock).mockResolvedValueOnce({
113+
ok: false,
114+
status: 404,
115+
statusText: "Not Found",
116+
text: () => Promise.resolve("Session not found"),
117+
headers: new Headers()
118+
});
119+
120+
const errorSpy = jest.fn();
121+
transport.onerror = errorSpy;
122+
123+
await expect(transport.send(message)).rejects.toThrow("Error POSTing to endpoint (HTTP 404)");
124+
expect(errorSpy).toHaveBeenCalled();
125+
});
126+
127+
it("should handle non-streaming JSON response", async () => {
128+
const message: JSONRPCMessage = {
129+
jsonrpc: "2.0",
130+
method: "test",
131+
params: {},
132+
id: "test-id"
133+
};
134+
135+
const responseMessage: JSONRPCMessage = {
136+
jsonrpc: "2.0",
137+
result: { success: true },
138+
id: "test-id"
139+
};
140+
141+
(global.fetch as jest.Mock).mockResolvedValueOnce({
142+
ok: true,
143+
status: 200,
144+
headers: new Headers({ "content-type": "application/json" }),
145+
json: () => Promise.resolve(responseMessage)
146+
});
147+
148+
const messageSpy = jest.fn();
149+
transport.onmessage = messageSpy;
150+
151+
await transport.send(message);
152+
153+
expect(messageSpy).toHaveBeenCalledWith(responseMessage);
154+
});
155+
156+
it("should attempt initial GET connection and handle 405 gracefully", async () => {
157+
// Mock the server not supporting GET for SSE (returning 405)
158+
(global.fetch as jest.Mock).mockResolvedValueOnce({
159+
ok: false,
160+
status: 405,
161+
statusText: "Method Not Allowed"
162+
});
163+
164+
// We expect the 405 error to be caught and handled gracefully
165+
// This should not throw an error that breaks the transport
166+
await transport.start();
167+
await expect(transport.openSseStream()).rejects.toThrow('Failed to open SSE stream: Method Not Allowed');
168+
169+
// Check that GET was attempted
170+
expect(global.fetch).toHaveBeenCalledWith(
171+
expect.anything(),
172+
expect.objectContaining({
173+
method: "GET",
174+
headers: expect.any(Headers)
175+
})
176+
);
177+
178+
// Verify transport still works after 405
179+
(global.fetch as jest.Mock).mockResolvedValueOnce({
180+
ok: true,
181+
status: 202,
182+
headers: new Headers()
183+
});
184+
185+
await transport.send({ jsonrpc: "2.0", method: "test", params: {} } as JSONRPCMessage);
186+
expect(global.fetch).toHaveBeenCalledTimes(2);
187+
});
188+
189+
it("should handle successful initial GET connection for SSE", async () => {
190+
// Set up readable stream for SSE events
191+
const encoder = new TextEncoder();
192+
const stream = new ReadableStream({
193+
start(controller) {
194+
// Send a server notification via SSE
195+
const event = 'event: message\ndata: {"jsonrpc": "2.0", "method": "serverNotification", "params": {}}\n\n';
196+
controller.enqueue(encoder.encode(event));
197+
}
198+
});
199+
200+
// Mock successful GET connection
201+
(global.fetch as jest.Mock).mockResolvedValueOnce({
202+
ok: true,
203+
status: 200,
204+
headers: new Headers({ "content-type": "text/event-stream" }),
205+
body: stream
206+
});
207+
208+
const messageSpy = jest.fn();
209+
transport.onmessage = messageSpy;
210+
211+
await transport.start();
212+
await transport.openSseStream();
213+
214+
// Give time for the SSE event to be processed
215+
await new Promise(resolve => setTimeout(resolve, 50));
216+
217+
expect(messageSpy).toHaveBeenCalledWith(
218+
expect.objectContaining({
219+
jsonrpc: "2.0",
220+
method: "serverNotification",
221+
params: {}
222+
})
223+
);
224+
});
225+
226+
it("should handle multiple concurrent SSE streams", async () => {
227+
// Mock two POST requests that return SSE streams
228+
const makeStream = (id: string) => {
229+
const encoder = new TextEncoder();
230+
return new ReadableStream({
231+
start(controller) {
232+
const event = `event: message\ndata: {"jsonrpc": "2.0", "result": {"id": "${id}"}, "id": "${id}"}\n\n`;
233+
controller.enqueue(encoder.encode(event));
234+
}
235+
});
236+
};
237+
238+
(global.fetch as jest.Mock)
239+
.mockResolvedValueOnce({
240+
ok: true,
241+
status: 200,
242+
headers: new Headers({ "content-type": "text/event-stream" }),
243+
body: makeStream("request1")
244+
})
245+
.mockResolvedValueOnce({
246+
ok: true,
247+
status: 200,
248+
headers: new Headers({ "content-type": "text/event-stream" }),
249+
body: makeStream("request2")
250+
});
251+
252+
const messageSpy = jest.fn();
253+
transport.onmessage = messageSpy;
254+
255+
// Send two concurrent requests
256+
await Promise.all([
257+
transport.send({ jsonrpc: "2.0", method: "test1", params: {}, id: "request1" }),
258+
transport.send({ jsonrpc: "2.0", method: "test2", params: {}, id: "request2" })
259+
]);
260+
261+
// Give time for SSE processing
262+
await new Promise(resolve => setTimeout(resolve, 100));
263+
264+
// Both streams should have delivered their messages
265+
expect(messageSpy).toHaveBeenCalledTimes(2);
266+
267+
// Verify received messages without assuming specific order
268+
expect(messageSpy.mock.calls.some(call => {
269+
const msg = call[0];
270+
return msg.id === "request1" && msg.result?.id === "request1";
271+
})).toBe(true);
272+
273+
expect(messageSpy.mock.calls.some(call => {
274+
const msg = call[0];
275+
return msg.id === "request2" && msg.result?.id === "request2";
276+
})).toBe(true);
277+
});
278+
279+
it("should include last-event-id header when resuming a broken connection", async () => {
280+
// First make a successful connection that provides an event ID
281+
const encoder = new TextEncoder();
282+
const stream = new ReadableStream({
283+
start(controller) {
284+
const event = 'id: event-123\nevent: message\ndata: {"jsonrpc": "2.0", "method": "serverNotification", "params": {}}\n\n';
285+
controller.enqueue(encoder.encode(event));
286+
controller.close();
287+
}
288+
});
289+
290+
(global.fetch as jest.Mock).mockResolvedValueOnce({
291+
ok: true,
292+
status: 200,
293+
headers: new Headers({ "content-type": "text/event-stream" }),
294+
body: stream
295+
});
296+
297+
await transport.start();
298+
await transport.openSseStream();
299+
await new Promise(resolve => setTimeout(resolve, 50));
300+
301+
// Now simulate attempting to reconnect
302+
(global.fetch as jest.Mock).mockResolvedValueOnce({
303+
ok: true,
304+
status: 200,
305+
headers: new Headers({ "content-type": "text/event-stream" }),
306+
body: null
307+
});
308+
309+
await transport.openSseStream();
310+
311+
// Check that Last-Event-ID was included
312+
const calls = (global.fetch as jest.Mock).mock.calls;
313+
const lastCall = calls[calls.length - 1];
314+
expect(lastCall[1].headers.get("last-event-id")).toBe("event-123");
315+
});
316+
});

0 commit comments

Comments
 (0)