Skip to content

Commit eef96b4

Browse files
authored
Merge pull request #46 from QuantGeekDev/fix/close-sse
fix: close sse stream after post
2 parents e71ebb5 + d6ea60d commit eef96b4

File tree

2 files changed

+75
-11
lines changed

2 files changed

+75
-11
lines changed

src/transports/http/server.ts

+73-11
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,17 @@ export class HttpStreamTransport extends AbstractTransport {
301301
logger.info(`Initialized new session: ${newSessionId} via stream`);
302302
}
303303

304-
const sseConnection = this.setupSSEConnection(req, res, newSessionId || session?.id, undefined, additionalHeaders);
304+
const requestIds = new Set<string | number>();
305+
clientRequests.forEach(req => requestIds.add(req.id));
306+
307+
const sseConnection = this.setupSSEConnection(req, res, newSessionId || session?.id, undefined, additionalHeaders, true);
308+
309+
if (requestIds.size > 0) {
310+
sseConnection.pendingResponseIds = requestIds;
311+
logger.debug(`Stream mode: Tracking ${requestIds.size} pending responses for stream ${sseConnection.streamId}`);
312+
} else {
313+
logger.debug(`Stream mode: No request IDs to track for stream ${sseConnection.streamId}. Connection will remain open.`);
314+
}
305315

306316
if (newSessionId) {
307317
sseConnection.sessionId = newSessionId;
@@ -380,7 +390,7 @@ export class HttpStreamTransport extends AbstractTransport {
380390
logger.warn(`Client sent Last-Event-ID (${lastEventId}) but resumability is disabled.`);
381391
}
382392

383-
this.setupSSEConnection(req, res, session?.id, lastEventId);
393+
this.setupSSEConnection(req, res, session?.id, lastEventId, {}, false);
384394
logger.debug(`Established SSE stream for GET request (Session: ${session?.id || 'initialization phase'})`);
385395
}
386396

@@ -398,17 +408,30 @@ export class HttpStreamTransport extends AbstractTransport {
398408
res.writeHead(200, { 'Content-Type': 'text/plain' }).end("Session terminated");
399409
}
400410

401-
private setupSSEConnection(req: IncomingMessage, res: ServerResponse, sessionId?: string, lastEventId?: string, additionalHeaders: Record<string, string> = {}): ActiveSseConnection {
411+
private setupSSEConnection(
412+
req: IncomingMessage,
413+
res: ServerResponse,
414+
sessionId?: string,
415+
lastEventId?: string,
416+
additionalHeaders: Record<string, string> = {},
417+
isPostConnection: boolean = false
418+
): ActiveSseConnection {
402419
const streamId = randomUUID();
403420
const connection: ActiveSseConnection = {
404421
res, sessionId, streamId, lastEventIdSent: null,
405-
messageHistory: this._config.resumability.enabled ? [] : undefined, pingInterval: undefined
422+
messageHistory: this._config.resumability.enabled ? [] : undefined,
423+
pingInterval: undefined,
424+
isPostConnection
406425
};
407426

408427
const headers = { ...SSE_HEADERS, ...additionalHeaders };
409428
res.writeHead(200, headers);
410429

411-
logger.debug(`SSE stream ${streamId} setup (Session: ${sessionId || 'N/A'})`);
430+
const originInfo = isPostConnection ?
431+
`POST (will close after responses)` :
432+
`GET (persistent until client disconnects)`;
433+
434+
logger.debug(`SSE stream ${streamId} setup (Session: ${sessionId || 'N/A'}, Origin: ${originInfo})`);
412435
if (res.socket) { res.socket.setNoDelay(true); res.socket.setKeepAlive(true); res.socket.setTimeout(0); logger.debug(`Optimized socket for SSE stream ${streamId}`); }
413436
else { logger.warn(`Could not access socket for SSE stream ${streamId} to optimize.`); }
414437
this._activeSseConnections.add(connection);
@@ -421,7 +444,7 @@ export class HttpStreamTransport extends AbstractTransport {
421444
res.on("close", () => cleanupHandler("Client closed connection"));
422445
res.on("error", (err) => { logger.error(`SSE stream ${streamId} error: ${err.message}`); cleanupHandler(`Connection error: ${err.message}`); this._onerror?.(err); });
423446
res.on("finish", () => cleanupHandler("Stream finished"));
424-
logger.info(`SSE stream ${streamId} active (Session: ${sessionId || 'N/A'}, Total: ${this._activeSseConnections.size})`);
447+
logger.info(`SSE stream ${streamId} active (Session: ${sessionId || 'N/A'}, Origin: ${originInfo}, Total: ${this._activeSseConnections.size})`);
425448
return connection;
426449
}
427450

@@ -439,6 +462,23 @@ export class HttpStreamTransport extends AbstractTransport {
439462
logger.debug(`Total active SSE connections after cleanup: ${this._activeSseConnections.size}`);
440463
}
441464

465+
/**
466+
* Checks if a POST-initiated SSE connection has completed all responses.
467+
* If all responses have been sent, closes the connection as per spec recommendation.
468+
*/
469+
private checkAndCloseCompletedPostConnection(connection: ActiveSseConnection): void {
470+
if (!connection.isPostConnection || !connection.pendingResponseIds) {
471+
return;
472+
}
473+
474+
if (connection.pendingResponseIds.size > 0) {
475+
return;
476+
}
477+
478+
logger.info(`POST-initiated SSE stream ${connection.streamId} has sent all responses. Closing as per spec recommendation.`);
479+
this.cleanupConnection(connection, "All responses sent");
480+
}
481+
442482
private cleanupAllConnections(): void {
443483
logger.info(`Cleaning up all ${this._activeSseConnections.size} active SSE connections and ${this._pendingBatches.size} pending batches.`);
444484
Array.from(this._activeSseConnections).forEach(conn => this.cleanupConnection(conn, "Server shutting down"));
@@ -493,19 +533,37 @@ export class HttpStreamTransport extends AbstractTransport {
493533
if (targetConnection) {
494534
this._requestStreamMap.delete(message.id);
495535
logger.debug(`Stream mode: Found target stream ${targetConnection.streamId} for response ID ${message.id}`);
536+
537+
if (targetConnection.pendingResponseIds && targetConnection.pendingResponseIds.has(message.id)) {
538+
targetConnection.pendingResponseIds.delete(message.id);
539+
logger.debug(`Stream ${targetConnection.streamId}: Removed ID ${message.id} from pending responses. Remaining: ${targetConnection.pendingResponseIds.size}`);
540+
}
496541
} else {
497542
logger.warn(`Stream mode: No active stream found mapping to response ID ${message.id}. Message dropped.`);
498543
return;
499544
}
500-
}
545+
} else {
546+
targetConnection = Array.from(this._activeSseConnections)
547+
.filter(c => {
548+
return isResponse(message) ? c.isPostConnection : true;
549+
})
550+
.find(c => c.res && !c.res.writableEnded);
501551

502-
if (!targetConnection) {
503-
targetConnection = Array.from(this._activeSseConnections).find(c => c.res && !c.res.writableEnded);
504-
if (targetConnection) logger.debug(`Stream mode: No specific target, selected available stream ${targetConnection.streamId}`);
552+
if (targetConnection) {
553+
if (isResponse(message)) {
554+
logger.debug(`Stream mode: Using POST-originated stream ${targetConnection.streamId} for response`);
555+
} else {
556+
logger.debug(`Stream mode: Selected available stream ${targetConnection.streamId} for request/notification`);
557+
}
558+
}
505559
}
506560

507561
if (!targetConnection || !targetConnection.res || targetConnection.res.writableEnded) {
508-
logger.error(`Cannot send message via SSE: No suitable stream found. Message dropped: ${JSON.stringify(message)}`);
562+
if (isResponse(message)) {
563+
logger.error(`Cannot send response message via SSE: No suitable POST-originated stream found. Message dropped: ${JSON.stringify(message)}`);
564+
} else {
565+
logger.error(`Cannot send request/notification message via SSE: No suitable stream found. Message dropped: ${JSON.stringify(message)}`);
566+
}
509567
return;
510568
}
511569

@@ -525,6 +583,10 @@ export class HttpStreamTransport extends AbstractTransport {
525583
}
526584
logger.debug(`Sending SSE data on stream ${targetConnection.streamId}: ${JSON.stringify(message)}`);
527585
targetConnection.res.write(`data: ${JSON.stringify(message)}\n\n`);
586+
587+
if (isResponse(message)) {
588+
this.checkAndCloseCompletedPostConnection(targetConnection);
589+
}
528590
} catch (error: any) {
529591
logger.error(`Error writing to SSE stream ${targetConnection.streamId}: ${error.message}. Cleaning up connection.`);
530592
this.cleanupConnection(targetConnection, `Write error: ${error.message}`);

src/transports/http/types.ts

+2
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ export interface ActiveSseConnection {
199199
lastEventIdSent: string | null;
200200
messageHistory?: Array<{ eventId: string; message: JsonRpcMessage; timestamp: number }>;
201201
pingInterval?: NodeJS.Timeout;
202+
isPostConnection: boolean;
203+
pendingResponseIds?: Set<string | number>;
202204
}
203205

204206
/**

0 commit comments

Comments
 (0)