Skip to content

Commit c49769b

Browse files
committed
integration tests
1 parent 43cb015 commit c49769b

File tree

1 file changed

+329
-0
lines changed

1 file changed

+329
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
import { createServer, type Server } from 'node:http';
2+
import { AddressInfo } from 'node:net';
3+
import { randomUUID } from 'node:crypto';
4+
import { Client } from '../client/index.js';
5+
import { StreamableHTTPClientTransport } from '../client/streamableHttp.js';
6+
import { McpServer } from '../server/mcp.js';
7+
import { EventStore, StreamableHTTPServerTransport } from '../server/streamableHttp.js';
8+
import { CallToolResult, CallToolResultSchema, JSONRPCMessage, LoggingMessageNotificationSchema } from '../types.js';
9+
import { z } from 'zod';
10+
11+
/**
12+
* Simple in-memory event store implementation for resumability
13+
*/
14+
class InMemoryEventStore implements EventStore {
15+
private events: Map<string, { streamId: string, message: JSONRPCMessage }> = new Map();
16+
17+
generateEventId(streamId: string): string {
18+
return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`;
19+
}
20+
21+
getStreamIdFromEventId(eventId: string): string {
22+
const parts = eventId.split('_');
23+
return parts.length > 0 ? parts[0] : '';
24+
}
25+
26+
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
27+
const eventId = this.generateEventId(streamId);
28+
this.events.set(eventId, { streamId, message });
29+
return eventId;
30+
}
31+
32+
async getEventsAfter(lastEventId: string): Promise<Array<{ eventId: string, message: JSONRPCMessage }>> {
33+
if (!lastEventId || !this.events.has(lastEventId)) {
34+
return [];
35+
}
36+
37+
// Extract the stream ID from the event ID
38+
const streamId = this.getStreamIdFromEventId(lastEventId);
39+
const result: Array<{ eventId: string, message: JSONRPCMessage }> = [];
40+
let foundLastEvent = false;
41+
42+
// Sort events by eventId for chronological ordering
43+
const sortedEvents = [...this.events.entries()].sort((a, b) => a[0].localeCompare(b[0]));
44+
45+
for (const [eventId, { streamId: eventStreamId, message }] of sortedEvents) {
46+
// Only include events from the same stream
47+
if (eventStreamId !== streamId) {
48+
continue;
49+
}
50+
51+
// Start collecting events after we find the lastEventId
52+
if (eventId === lastEventId) {
53+
foundLastEvent = true;
54+
continue;
55+
}
56+
57+
if (foundLastEvent) {
58+
result.push({ eventId, message });
59+
}
60+
}
61+
62+
return result;
63+
}
64+
}
65+
66+
67+
describe('Transport resumability', () => {
68+
let server: Server;
69+
let mcpServer: McpServer;
70+
let serverTransport: StreamableHTTPServerTransport;
71+
let baseUrl: URL;
72+
let eventStore: InMemoryEventStore;
73+
74+
beforeEach(async () => {
75+
// Create event store for resumability
76+
eventStore = new InMemoryEventStore();
77+
78+
// Create a simple MCP server
79+
mcpServer = new McpServer(
80+
{ name: 'test-server', version: '1.0.0' },
81+
{ capabilities: { logging: {} } }
82+
);
83+
84+
// Add a simple notification tool that completes quickly
85+
mcpServer.tool(
86+
'send-notification',
87+
'Sends a single notification',
88+
{
89+
message: z.string().describe('Message to send').default('Test notification')
90+
},
91+
async ({ message }, { sendNotification }) => {
92+
// Send notification immediately
93+
await sendNotification({
94+
method: "notifications/message",
95+
params: {
96+
level: "info",
97+
data: message
98+
}
99+
});
100+
101+
return {
102+
content: [{ type: 'text', text: 'Notification sent' }]
103+
};
104+
}
105+
);
106+
107+
// Add a long-running tool that sends multiple notifications
108+
mcpServer.tool(
109+
'run-notifications',
110+
'Sends multiple notifications over time',
111+
{
112+
count: z.number().describe('Number of notifications to send').default(10),
113+
interval: z.number().describe('Interval between notifications in ms').default(50)
114+
},
115+
async ({ count, interval }, { sendNotification }) => {
116+
// Send notifications at specified intervals
117+
for (let i = 0; i < count; i++) {
118+
await sendNotification({
119+
method: "notifications/message",
120+
params: {
121+
level: "info",
122+
data: `Notification ${i + 1} of ${count}`
123+
}
124+
});
125+
126+
// Wait for the specified interval before sending next notification
127+
if (i < count - 1) {
128+
await new Promise(resolve => setTimeout(resolve, interval));
129+
}
130+
}
131+
132+
return {
133+
content: [{ type: 'text', text: `Sent ${count} notifications` }]
134+
};
135+
}
136+
);
137+
138+
// Create a transport with the event store
139+
serverTransport = new StreamableHTTPServerTransport({
140+
sessionIdGenerator: () => randomUUID(),
141+
eventStore
142+
});
143+
144+
// Connect the transport to the MCP server
145+
await mcpServer.connect(serverTransport);
146+
147+
// Create and start an HTTP server
148+
server = createServer(async (req, res) => {
149+
await serverTransport.handleRequest(req, res);
150+
});
151+
152+
// Start the server on a random port
153+
baseUrl = await new Promise<URL>((resolve) => {
154+
server.listen(0, '127.0.0.1', () => {
155+
const addr = server.address() as AddressInfo;
156+
resolve(new URL(`http://127.0.0.1:${addr.port}`));
157+
});
158+
});
159+
});
160+
161+
afterEach(async () => {
162+
// Clean up resources
163+
await mcpServer.close().catch(() => { });
164+
await serverTransport.close().catch(() => { });
165+
server.close();
166+
});
167+
168+
it('should store session ID when client connects', async () => {
169+
// Create and connect a client
170+
const client = new Client({
171+
name: 'test-client',
172+
version: '1.0.0'
173+
});
174+
175+
const transport = new StreamableHTTPClientTransport(baseUrl);
176+
await client.connect(transport);
177+
178+
// Verify session ID was generated
179+
expect(transport.sessionId).toBeDefined();
180+
181+
// Clean up
182+
await transport.close();
183+
});
184+
185+
it('should have session ID functionality', async () => {
186+
// The ability to store a session ID when connecting
187+
const client = new Client({
188+
name: 'test-client-reconnection',
189+
version: '1.0.0'
190+
});
191+
192+
const transport = new StreamableHTTPClientTransport(baseUrl);
193+
194+
// Make sure the client can connect and get a session ID
195+
await client.connect(transport);
196+
expect(transport.sessionId).toBeDefined();
197+
198+
// Clean up
199+
await transport.close();
200+
});
201+
202+
// This test demonstrates the capability to resume long-running tools
203+
// across client disconnection/reconnection
204+
it('should resume long-running notifications with lastEventId', async () => {
205+
// Create unique client ID for this test
206+
const clientId = 'test-client-long-running';
207+
const notifications: any[] = [];
208+
let sessionId: string | undefined;
209+
let lastEventId: string | undefined;
210+
211+
// Create first client
212+
let client1 = new Client({
213+
id: clientId,
214+
name: 'test-client',
215+
version: '1.0.0'
216+
});
217+
218+
// Set up notification handler for first client
219+
client1.setNotificationHandler(LoggingMessageNotificationSchema, (notification: any) => {
220+
if (notification.method === 'notifications/message') {
221+
notifications.push(notification.params);
222+
}
223+
});
224+
225+
// Connect first client
226+
const transport1 = new StreamableHTTPClientTransport(baseUrl);
227+
await client1.connect(transport1);
228+
sessionId = transport1.sessionId;
229+
expect(sessionId).toBeDefined();
230+
231+
// Start a long-running notification stream with tracking of lastEventId
232+
const onLastEventIdUpdate = jest.fn((eventId: string) => {
233+
lastEventId = eventId;
234+
});
235+
236+
// Start the notification tool with event tracking using request
237+
const toolPromise = client1.request({
238+
method: 'tools/call',
239+
params: {
240+
name: 'run-notifications',
241+
arguments: {
242+
count: 5,
243+
interval: 10
244+
}
245+
}
246+
}, CallToolResultSchema, {
247+
lastEventId,
248+
onLastEventIdUpdate
249+
});
250+
251+
// Wait for some notifications to arrive (not all)
252+
await new Promise(resolve => setTimeout(resolve, 20));
253+
254+
// Verify we received some notifications and lastEventId was updated
255+
expect(notifications.length).toBeGreaterThan(0);
256+
expect(notifications.length).toBeLessThan(5);
257+
expect(onLastEventIdUpdate).toHaveBeenCalled();
258+
expect(lastEventId).toBeDefined();
259+
260+
// Store original notification count for later comparison
261+
const firstClientNotificationCount = notifications.length;
262+
263+
// Disconnect first client without waiting for completion
264+
// When we close the connection, it will cause a ConnectionClosed error for
265+
// any in-progress requests, which is expected behavior
266+
// We need to catch the error since closing the transport will
267+
// cause the pending toolPromise to reject with a ConnectionClosed error
268+
await transport1.close();
269+
270+
// Try to cancel the promise, but ignore errors since it's already being handled
271+
toolPromise.catch(err => {
272+
// This error is expected - the connection was intentionally closed
273+
if (err?.code !== -32000) { // ConnectionClosed error code
274+
console.error("Unexpected error type during transport close:", err);
275+
}
276+
});
277+
278+
279+
// Create second client with same client ID
280+
const client2 = new Client({
281+
id: clientId,
282+
name: 'test-client',
283+
version: '1.0.0'
284+
});
285+
286+
// Set up notification handler for second client
287+
client2.setNotificationHandler(LoggingMessageNotificationSchema, (notification: any) => {
288+
if (notification.method === 'notifications/message') {
289+
notifications.push(notification.params);
290+
}
291+
});
292+
293+
// Connect second client with same session ID
294+
const transport2 = new StreamableHTTPClientTransport(baseUrl, {
295+
sessionId
296+
});
297+
await client2.connect(transport2);
298+
299+
// Resume the notification stream using lastEventId
300+
// This is the key part - we're resuming the same long-running tool using lastEventId
301+
const resumedToolPromise = client2.request({
302+
method: 'tools/call',
303+
params: {
304+
name: 'run-notifications',
305+
arguments: {
306+
count: 5,
307+
interval: 50
308+
}
309+
}
310+
}, CallToolResultSchema, {
311+
lastEventId, // Pass the lastEventId from the previous session
312+
onLastEventIdUpdate
313+
});
314+
315+
// Wait for remaining notifications
316+
await new Promise(resolve => setTimeout(resolve, 200));
317+
318+
// Verify we eventually received at leaset a few motifications
319+
expect(notifications.length).toBeGreaterThan(2);
320+
321+
// Verify the second client received notifications that the first client didn't
322+
expect(notifications.length).toBeGreaterThan(firstClientNotificationCount);
323+
324+
// Clean up
325+
326+
await transport2.close();
327+
328+
});
329+
});

0 commit comments

Comments
 (0)