Skip to content

Commit 43cb015

Browse files
committed
resume long running requst from new client
1 parent 663b68a commit 43cb015

File tree

6 files changed

+76
-33
lines changed

6 files changed

+76
-33
lines changed

src/client/streamableHttp.ts

+22-3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ export type StreamableHTTPClientTransportOptions = {
7171
* Options to configure the reconnection behavior.
7272
*/
7373
reconnectionOptions?: StreamableHTTPReconnectionOptions;
74+
/**
75+
* Session ID for the connection. This is used to identify the session on the server.
76+
* When not provided and connecting to a server that supports session IDs, the server will generate a new session ID.
77+
*/
78+
sessionId?: string;
7479
};
7580

7681
/**
@@ -98,6 +103,7 @@ export class StreamableHTTPClientTransport implements Transport {
98103
this._requestInit = opts?.requestInit;
99104
this._authProvider = opts?.authProvider;
100105
this._reconnectionOptions = opts?.reconnectionOptions || this._defaultReconnectionOptions;
106+
this._sessionId = opts?.sessionId;
101107
}
102108

103109
private async _authThenStart(): Promise<void> {
@@ -240,7 +246,7 @@ export class StreamableHTTPClientTransport implements Transport {
240246
}, delay);
241247
}
242248

243-
private _handleSseStream(stream: ReadableStream<Uint8Array> | null): void {
249+
private _handleSseStream(stream: ReadableStream<Uint8Array> | null, onLastEventIdUpdate?: (event: string) => void): void {
244250
if (!stream) {
245251
return;
246252
}
@@ -266,6 +272,7 @@ export class StreamableHTTPClientTransport implements Transport {
266272
// Update last event ID if provided
267273
if (event.id) {
268274
lastEventId = event.id;
275+
onLastEventIdUpdate?.(lastEventId);
269276
}
270277

271278
if (!event.event || event.event === "message") {
@@ -330,8 +337,16 @@ export class StreamableHTTPClientTransport implements Transport {
330337
this.onclose?.();
331338
}
332339

333-
async send(message: JSONRPCMessage | JSONRPCMessage[]): Promise<void> {
340+
async send(message: JSONRPCMessage | JSONRPCMessage[], options?: { lastEventId?: string, onLastEventIdUpdate?: (event: string) => void }): Promise<void> {
334341
try {
342+
// If client passes in a lastEventId in the request options, we need to reconnect the SSE stream
343+
const { lastEventId, onLastEventIdUpdate } = options ?? {};
344+
if (lastEventId) {
345+
// If we have at last event ID, we need to reconnect the SSE stream
346+
this._startOrAuthStandaloneSSE(lastEventId).catch(err => this.onerror?.(err));
347+
return;
348+
}
349+
335350
const headers = await this._commonHeaders();
336351
headers.set("content-type", "application/json");
337352
headers.set("accept", "application/json, text/event-stream");
@@ -393,7 +408,7 @@ export class StreamableHTTPClientTransport implements Transport {
393408
// Handle SSE stream responses for requests
394409
// We use the same handler as standalone streams, which now supports
395410
// reconnection with the last event ID
396-
this._handleSseStream(response.body);
411+
this._handleSseStream(response.body, onLastEventIdUpdate);
397412
} else if (contentType?.includes("application/json")) {
398413
// For non-streaming servers, we might get direct JSON responses
399414
const data = await response.json();
@@ -416,4 +431,8 @@ export class StreamableHTTPClientTransport implements Transport {
416431
throw error;
417432
}
418433
}
434+
435+
get sessionId(): string | undefined {
436+
return this._sessionId;
437+
}
419438
}

src/examples/client/simpleStreamableHttp.ts

+14-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ let notificationCount = 0;
2929
let client: Client | null = null;
3030
let transport: StreamableHTTPClientTransport | null = null;
3131
let serverUrl = 'http://localhost:3000/mcp';
32+
let notificationsToolLastEventId: string | undefined = undefined;
33+
let sessionId: string | undefined = undefined;
3234

3335
async function main(): Promise<void> {
3436
console.log('MCP Interactive Client');
@@ -186,7 +188,10 @@ async function connect(url?: string): Promise<void> {
186188
}
187189

188190
transport = new StreamableHTTPClientTransport(
189-
new URL(serverUrl)
191+
new URL(serverUrl),
192+
{
193+
sessionId: sessionId
194+
}
190195
);
191196

192197
// Set up notification handlers
@@ -218,6 +223,8 @@ async function connect(url?: string): Promise<void> {
218223

219224
// Connect the client
220225
await client.connect(transport);
226+
sessionId = transport.sessionId
227+
console.log('Transport created with session ID:', sessionId);
221228
console.log('Connected to MCP server');
222229
} catch (error) {
223230
console.error('Failed to connect:', error);
@@ -291,7 +298,12 @@ async function callTool(name: string, args: Record<string, unknown>): Promise<vo
291298
};
292299

293300
console.log(`Calling tool '${name}' with args:`, args);
294-
const result = await client.request(request, CallToolResultSchema);
301+
const onLastEventIdUpdate = (event: string) => {
302+
notificationsToolLastEventId = event;
303+
};
304+
const result = await client.request(request, CallToolResultSchema, {
305+
lastEventId: notificationsToolLastEventId, onLastEventIdUpdate
306+
});
295307

296308
console.log('Tool result:');
297309
result.content.forEach(item => {

src/examples/server/simpleStreamableHttp.ts

+12-8
Original file line numberDiff line numberDiff line change
@@ -172,14 +172,18 @@ server.tool(
172172

173173
while (count === 0 || counter < count) {
174174
counter++;
175-
await sendNotification({
176-
method: "notifications/message",
177-
params: {
178-
level: "info",
179-
data: `Periodic notification #${counter} at ${new Date().toISOString()}`
180-
}
181-
});
182-
175+
try {
176+
await sendNotification({
177+
method: "notifications/message",
178+
params: {
179+
level: "info",
180+
data: `Periodic notification #${counter} at ${new Date().toISOString()}`
181+
}
182+
});
183+
}
184+
catch (error) {
185+
console.error("Error sending notification:", error);
186+
}
183187
// Wait for the specified interval
184188
await sleep(interval);
185189
}

src/server/streamableHttp.ts

+12-16
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,6 @@ export class StreamableHTTPServerTransport implements Transport {
217217

218218
// Assign the response to the standalone SSE stream
219219
this._streamMapping.set(this._standaloneSseStreamId, res);
220-
221220
// Set up close handler for client disconnects
222221
res.on("close", () => {
223222
this._streamMapping.delete(this._standaloneSseStreamId);
@@ -345,8 +344,10 @@ export class StreamableHTTPServerTransport implements Transport {
345344
const isInitializationRequest = messages.some(
346345
msg => 'method' in msg && msg.method === 'initialize'
347346
);
347+
const mcpSessionId = req.headers["mcp-session-id"] as string | undefined;
348348
if (isInitializationRequest) {
349-
if (this._initialized) {
349+
// if generateSessionId is not set, the server does not support session management
350+
if (this._initialized && this.sessionId !== undefined && mcpSessionId !== this.sessionId) {
350351
res.writeHead(400).end(JSON.stringify({
351352
jsonrpc: "2.0",
352353
error: {
@@ -368,7 +369,7 @@ export class StreamableHTTPServerTransport implements Transport {
368369
}));
369370
return;
370371
}
371-
this.sessionId = this.sessionIdGenerator();
372+
this.sessionId = mcpSessionId ?? this.sessionIdGenerator();
372373
this._initialized = true;
373374

374375
}
@@ -419,17 +420,8 @@ export class StreamableHTTPServerTransport implements Transport {
419420
this._requestToStreamMapping.set(message.id, streamId);
420421
}
421422
}
422-
423423
// Set up close handler for client disconnects
424424
res.on("close", () => {
425-
// find a stream ID for this response
426-
// Remove all entries that reference this response
427-
for (const [id, stream] of this._requestToStreamMapping.entries()) {
428-
if (streamId === stream) {
429-
this._requestToStreamMapping.delete(id);
430-
this._requestResponseMap.delete(id);
431-
}
432-
}
433425
this._streamMapping.delete(streamId);
434426
});
435427

@@ -577,7 +569,7 @@ export class StreamableHTTPServerTransport implements Transport {
577569
// Get the response for this request
578570
const streamId = this._requestToStreamMapping.get(requestId);
579571
const response = this._streamMapping.get(streamId!);
580-
if (!streamId || !response) {
572+
if (!streamId) {
581573
throw new Error(`No connection established for request ID: ${String(requestId)}`);
582574
}
583575

@@ -588,9 +580,10 @@ export class StreamableHTTPServerTransport implements Transport {
588580
if (this._eventStore) {
589581
eventId = await this._eventStore.storeEvent(streamId, message);
590582
}
591-
592-
// Write the event to the response stream
593-
this.writeSSEEvent(response, message, eventId);
583+
if (response) {
584+
// Write the event to the response stream
585+
this.writeSSEEvent(response, message, eventId);
586+
}
594587
}
595588

596589
if (isJSONRPCResponse(message)) {
@@ -603,6 +596,9 @@ export class StreamableHTTPServerTransport implements Transport {
603596
const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id));
604597

605598
if (allResponsesReady) {
599+
if (!response) {
600+
throw new Error(`No connection established for request ID: ${String(requestId)}`);
601+
}
606602
if (this._enableJsonResponse) {
607603
// All responses ready, send as JSON
608604
const headers: Record<string, string> = {

src/shared/protocol.ts

+15-3
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,18 @@ export type RequestOptions = {
8787
* May be used to indicate to the transport which incoming request to associate this outgoing request with.
8888
*/
8989
relatedRequestId?: RequestId;
90+
91+
/**
92+
* May be used to indicate to the transport which last event ID to associate this outgoing request with.
93+
* This is used to resume a long-running requests that may have been interrupted and a new instance of a client is being created.
94+
*/
95+
lastEventId?: string;
96+
97+
/**
98+
* A callback that is invoked when the last event ID is updated.
99+
* This is used to notidy the client that the last event ID has changed, so that client can update its state accordingly.
100+
*/
101+
onLastEventIdUpdate?: (event: string) => void;
90102
};
91103

92104
/**
@@ -501,7 +513,7 @@ export abstract class Protocol<
501513
resultSchema: T,
502514
options?: RequestOptions,
503515
): Promise<z.infer<T>> {
504-
const { relatedRequestId } = options ?? {};
516+
const { relatedRequestId, lastEventId, onLastEventIdUpdate } = options ?? {};
505517

506518
return new Promise((resolve, reject) => {
507519
if (!this._transport) {
@@ -543,7 +555,7 @@ export abstract class Protocol<
543555
requestId: messageId,
544556
reason: String(reason),
545557
},
546-
}, { relatedRequestId })
558+
}, { relatedRequestId, lastEventId, onLastEventIdUpdate })
547559
.catch((error) =>
548560
this._onerror(new Error(`Failed to send cancellation: ${error}`)),
549561
);
@@ -581,7 +593,7 @@ export abstract class Protocol<
581593

582594
this._setupTimeout(messageId, timeout, options?.maxTotalTimeout, timeoutHandler, options?.resetTimeoutOnProgress ?? false);
583595

584-
this._transport.send(jsonrpcRequest, { relatedRequestId }).catch((error) => {
596+
this._transport.send(jsonrpcRequest, { relatedRequestId, lastEventId, onLastEventIdUpdate }).catch((error) => {
585597
this._cleanupTimeout(messageId);
586598
reject(error);
587599
});

src/shared/transport.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export interface Transport {
1818
*
1919
* If present, `relatedRequestId` is used to indicate to the transport which incoming request to associate this outgoing message with.
2020
*/
21-
send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void>;
21+
send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId, lastEventId?: string, onLastEventIdUpdate?: (event: string) => void }): Promise<void>;
2222

2323
/**
2424
* Closes the connection.

0 commit comments

Comments
 (0)