Skip to content

feat: add sse resumability #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ README
.DS_Store
dist
node_modules
.cursor
142 changes: 129 additions & 13 deletions src/transports/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
JsonRpcSuccessResponse,
JsonRpcErrorResponse,
JsonRpcError,
JsonRpcId
JsonRpcId,
MessageEntry
} from "./types.js";

import contentType from "content-type";
Expand Down Expand Up @@ -68,6 +69,8 @@ export class HttpStreamTransport extends AbstractTransport {
private _requestStreamMap = new Map<string | number, ActiveSseConnection>();
private _pendingBatches = new Map<ServerResponse, BatchResponseState>();
private _eventCounter = 0;
private _globalMessageStore = new Map<string, Map<string, MessageEntry>>();
private _pruneInterval?: NodeJS.Timeout;

constructor(config: HttpStreamTransportConfig = {}) {
super();
Expand All @@ -91,9 +94,14 @@ export class HttpStreamTransport extends AbstractTransport {
responseMode: this._config.responseMode,
sessionEnabled: this._config.session.enabled,
resumabilityEnabled: this._config.resumability.enabled,
resumabilityStore: this._config.resumability.messageStoreType,
authEnabled: !!this._config.auth,
corsOrigin: this._config.cors.allowOrigin
}, null, 2)}`);

if (this._config.resumability.enabled && this._config.resumability.messageStoreType === 'global') {
this._pruneInterval = setInterval(() => this.pruneMessageStore(), this._config.resumability.historyDuration / 3);
}
}

private getCorsHeaders(req: IncomingMessage, includeMaxAge: boolean = false): Record<string, string> {
Expand Down Expand Up @@ -419,7 +427,7 @@ export class HttpStreamTransport extends AbstractTransport {
const streamId = randomUUID();
const connection: ActiveSseConnection = {
res, sessionId, streamId, lastEventIdSent: null,
messageHistory: this._config.resumability.enabled ? [] : undefined,
messageHistory: this._config.resumability.enabled && this._config.resumability.messageStoreType === 'connection' ? [] : undefined,
pingInterval: undefined,
isPostConnection
};
Expand All @@ -438,7 +446,7 @@ export class HttpStreamTransport extends AbstractTransport {
res.write(': stream opened\n\n');
connection.pingInterval = setInterval(() => this.sendPing(connection), 15000);
if (lastEventId && this._config.resumability.enabled) {
this.handleResumption(connection, lastEventId).catch(err => { logger.error(`Error during stream resumption for ${streamId}: ${err.message}`); this.cleanupConnection(connection, `Resumption error: ${err.message}`); });
this.handleResumption(connection, lastEventId, sessionId).catch(err => { logger.error(`Error during stream resumption for ${streamId}: ${err.message}`); this.cleanupConnection(connection, `Resumption error: ${err.message}`); });
}
const cleanupHandler = (reason: string) => { if (connection.pingInterval) { clearInterval(connection.pingInterval); connection.pingInterval = undefined; } this.cleanupConnection(connection, reason); };
res.on("close", () => cleanupHandler("Client closed connection"));
Expand Down Expand Up @@ -572,12 +580,16 @@ export class HttpStreamTransport extends AbstractTransport {
if (this._config.resumability.enabled) {
eventId = `${Date.now()}-${this._eventCounter++}`;
targetConnection.lastEventIdSent = eventId;
if (targetConnection.messageHistory) {

this.storeMessage(message, targetConnection.sessionId, eventId);

if (this._config.resumability.messageStoreType === 'connection' && targetConnection.messageHistory) {
const timestamp = Date.now();
targetConnection.messageHistory.push({ eventId, message, timestamp });
const cutoff = timestamp - this._config.resumability.historyDuration;
targetConnection.messageHistory = targetConnection.messageHistory.filter(entry => entry.timestamp >= cutoff);
}

logger.debug(`Sending SSE event ID: ${eventId} on stream ${targetConnection.streamId}`);
targetConnection.res.write(`id: ${eventId}\n`);
}
Expand Down Expand Up @@ -641,24 +653,64 @@ export class HttpStreamTransport extends AbstractTransport {
return session;
}

private async handleResumption(connection: ActiveSseConnection, lastEventId: string): Promise<void> {
private async handleResumption(connection: ActiveSseConnection, lastEventId: string, sessionId?: string): Promise<void> {
logger.info(`Attempting resume stream ${connection.streamId} from event ${lastEventId}`);
if (!connection.messageHistory || !this._config.resumability.enabled) { logger.warn(`Resume requested for ${connection.streamId}, but history unavailable/disabled. Starting fresh.`); return; }
const history = connection.messageHistory;
const lastReceivedIndex = history.findIndex(entry => entry.eventId === lastEventId);
if (lastReceivedIndex === -1) { logger.warn(`Event ${lastEventId} not found in history for ${connection.streamId}. Starting fresh.`); return; }
const messagesToReplay = history.slice(lastReceivedIndex + 1);
if (messagesToReplay.length === 0) { logger.info(`Event ${lastEventId} was last known event for ${connection.streamId}. No replay needed.`); return; }

let messagesToReplay: MessageEntry[] = [];

if (this._config.resumability.messageStoreType === 'global') {
if (!this._config.resumability.enabled) {
logger.warn(`Resume requested for ${connection.streamId}, but resumability is disabled. Starting fresh.`);
return;
}

messagesToReplay = this.getMessagesAfterEvent(sessionId, lastEventId);

if (messagesToReplay.length === 0) {
logger.warn(`Event ${lastEventId} not found in global message store for session ${sessionId || 'N/A'}. Starting fresh.`);
return;
}
} else if (this._config.resumability.messageStoreType === 'connection') {
if (!connection.messageHistory || !this._config.resumability.enabled) {
logger.warn(`Resume requested for ${connection.streamId}, but history unavailable/disabled. Starting fresh.`);
return;
}

const history = connection.messageHistory;
const lastReceivedIndex = history.findIndex(entry => entry.eventId === lastEventId);

if (lastReceivedIndex === -1) {
logger.warn(`Event ${lastEventId} not found in history for ${connection.streamId}. Starting fresh.`);
return;
}

messagesToReplay = history.slice(lastReceivedIndex + 1);
}

if (messagesToReplay.length === 0) {
logger.info(`Event ${lastEventId} was last known event for ${connection.streamId}. No replay needed.`);
return;
}

logger.info(`Replaying ${messagesToReplay.length} messages for stream ${connection.streamId}`);

for (const entry of messagesToReplay) {
if (!connection.res || connection.res.writableEnded) { logger.warn(`Stream ${connection.streamId} closed during replay. Aborting.`); return; }
if (!connection.res || connection.res.writableEnded) {
logger.warn(`Stream ${connection.streamId} closed during replay. Aborting.`);
return;
}
try {
logger.debug(`Replaying event ${entry.eventId}`);
connection.res.write(`id: ${entry.eventId}\n`);
connection.res.write(`data: ${JSON.stringify(entry.message)}\n\n`);
connection.lastEventIdSent = entry.eventId;
} catch(error: any) { logger.error(`Error replaying message ${entry.eventId} to ${connection.streamId}: ${error.message}. Aborting.`); this.cleanupConnection(connection, `Replay write error: ${error.message}`); return; }
} catch(error: any) {
logger.error(`Error replaying message ${entry.eventId} to ${connection.streamId}: ${error.message}. Aborting.`);
this.cleanupConnection(connection, `Replay write error: ${error.message}`);
return;
}
}

logger.info(`Finished replaying messages for stream ${connection.streamId}`);
}

Expand Down Expand Up @@ -699,7 +751,14 @@ export class HttpStreamTransport extends AbstractTransport {

async close(): Promise<void> {
logger.info("Closing HttpStreamTransport...");

if (this._pruneInterval) {
clearInterval(this._pruneInterval);
this._pruneInterval = undefined;
}

this.cleanupAllConnections();

return new Promise((resolve, reject) => {
if (this._server) {
const server = this._server; this._server = undefined;
Expand All @@ -709,4 +768,61 @@ export class HttpStreamTransport extends AbstractTransport {
});
}
isRunning(): boolean { return Boolean(this._server?.listening); }

private storeMessage(message: JsonRpcMessage, sessionId: string | undefined, eventId: string): void {
if (!this._config.resumability.enabled) return;

const timestamp = Date.now();
const messageEntry: MessageEntry = { eventId, message, timestamp };

if (this._config.resumability.messageStoreType === 'global' && sessionId) {
if (!this._globalMessageStore.has(sessionId)) {
this._globalMessageStore.set(sessionId, new Map());
}
this._globalMessageStore.get(sessionId)!.set(eventId, messageEntry);
}
}

private pruneMessageStore(): void {
if (!this._config.resumability.enabled || this._config.resumability.messageStoreType !== 'global') return;

const cutoff = Date.now() - this._config.resumability.historyDuration;

for (const [sessionId, messages] of this._globalMessageStore.entries()) {
let expired = 0;
for (const [eventId, entry] of messages.entries()) {
if (entry.timestamp < cutoff) {
messages.delete(eventId);
expired++;
}
}

if (messages.size === 0) {
this._globalMessageStore.delete(sessionId);
} else if (expired > 0) {
logger.debug(`Pruned ${expired} expired messages for session ${sessionId}`);
}
}
}

private getMessagesAfterEvent(sessionId: string | undefined, lastEventId: string): MessageEntry[] {
if (!sessionId || !this._config.resumability.enabled ||
this._config.resumability.messageStoreType !== 'global' ||
!this._globalMessageStore.has(sessionId)) {
return [];
}

const messages = this._globalMessageStore.get(sessionId)!;

const allEntries = Array.from(messages.values())
.sort((a, b) => a.timestamp - b.timestamp);

const lastReceivedIndex = allEntries.findIndex(entry => entry.eventId === lastEventId);

if (lastReceivedIndex === -1) {
return [];
}

return allEntries.slice(lastReceivedIndex + 1);
}
}
10 changes: 9 additions & 1 deletion src/transports/http/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export interface HttpStreamTransportConfig {
* How long to keep message history in ms. Default: 300000 (5 min)
*/
historyDuration?: number;
messageStoreType?: 'connection' | 'global';
};
}

Expand Down Expand Up @@ -170,6 +171,7 @@ export const DEFAULT_HTTP_STREAM_CONFIG: HttpStreamTransportConfigInternal = {
resumability: {
enabled: false,
historyDuration: 300000,
messageStoreType: 'global',
},
cors: {
allowOrigin: "*",
Expand Down Expand Up @@ -197,7 +199,7 @@ export interface ActiveSseConnection {
sessionId?: string;
streamId: string;
lastEventIdSent: string | null;
messageHistory?: Array<{ eventId: string; message: JsonRpcMessage; timestamp: number }>;
messageHistory?: Array<MessageEntry>;
pingInterval?: NodeJS.Timeout;
isPostConnection: boolean;
pendingResponseIds?: Set<string | number>;
Expand All @@ -213,3 +215,9 @@ export interface BatchResponseState {
timeoutId: NodeJS.Timeout;
isCompleted: boolean;
}

export interface MessageEntry {
eventId: string;
message: JsonRpcMessage;
timestamp: number;
}