diff --git a/.gitignore b/.gitignore index 8afd3e1..399e472 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ README .DS_Store dist node_modules +.cursor diff --git a/src/transports/http/server.ts b/src/transports/http/server.ts index 5a0a7f6..e8bc8d8 100644 --- a/src/transports/http/server.ts +++ b/src/transports/http/server.ts @@ -7,7 +7,8 @@ import { JsonRpcSuccessResponse, JsonRpcErrorResponse, JsonRpcError, - JsonRpcId + JsonRpcId, + MessageEntry } from "./types.js"; import contentType from "content-type"; @@ -68,6 +69,8 @@ export class HttpStreamTransport extends AbstractTransport { private _requestStreamMap = new Map(); private _pendingBatches = new Map(); private _eventCounter = 0; + private _globalMessageStore = new Map>(); + private _pruneInterval?: NodeJS.Timeout; constructor(config: HttpStreamTransportConfig = {}) { super(); @@ -91,9 +94,14 @@ export class HttpStreamTransport extends AbstractTransport { responseMode: this._config.responseMode, sessionEnabled: this._config.session.enabled, resumabilityEnabled: this._config.resumability.enabled, + resumabilityStore: this._config.resumability.messageStoreType, authEnabled: !!this._config.auth, corsOrigin: this._config.cors.allowOrigin }, null, 2)}`); + + if (this._config.resumability.enabled && this._config.resumability.messageStoreType === 'global') { + this._pruneInterval = setInterval(() => this.pruneMessageStore(), this._config.resumability.historyDuration / 3); + } } private getCorsHeaders(req: IncomingMessage, includeMaxAge: boolean = false): Record { @@ -419,7 +427,7 @@ export class HttpStreamTransport extends AbstractTransport { const streamId = randomUUID(); const connection: ActiveSseConnection = { res, sessionId, streamId, lastEventIdSent: null, - messageHistory: this._config.resumability.enabled ? [] : undefined, + messageHistory: this._config.resumability.enabled && this._config.resumability.messageStoreType === 'connection' ? [] : undefined, pingInterval: undefined, isPostConnection }; @@ -438,7 +446,7 @@ export class HttpStreamTransport extends AbstractTransport { res.write(': stream opened\n\n'); connection.pingInterval = setInterval(() => this.sendPing(connection), 15000); if (lastEventId && this._config.resumability.enabled) { - this.handleResumption(connection, lastEventId).catch(err => { logger.error(`Error during stream resumption for ${streamId}: ${err.message}`); this.cleanupConnection(connection, `Resumption error: ${err.message}`); }); + this.handleResumption(connection, lastEventId, sessionId).catch(err => { logger.error(`Error during stream resumption for ${streamId}: ${err.message}`); this.cleanupConnection(connection, `Resumption error: ${err.message}`); }); } const cleanupHandler = (reason: string) => { if (connection.pingInterval) { clearInterval(connection.pingInterval); connection.pingInterval = undefined; } this.cleanupConnection(connection, reason); }; res.on("close", () => cleanupHandler("Client closed connection")); @@ -572,12 +580,16 @@ export class HttpStreamTransport extends AbstractTransport { if (this._config.resumability.enabled) { eventId = `${Date.now()}-${this._eventCounter++}`; targetConnection.lastEventIdSent = eventId; - if (targetConnection.messageHistory) { + + this.storeMessage(message, targetConnection.sessionId, eventId); + + if (this._config.resumability.messageStoreType === 'connection' && targetConnection.messageHistory) { const timestamp = Date.now(); targetConnection.messageHistory.push({ eventId, message, timestamp }); const cutoff = timestamp - this._config.resumability.historyDuration; targetConnection.messageHistory = targetConnection.messageHistory.filter(entry => entry.timestamp >= cutoff); } + logger.debug(`Sending SSE event ID: ${eventId} on stream ${targetConnection.streamId}`); targetConnection.res.write(`id: ${eventId}\n`); } @@ -641,24 +653,64 @@ export class HttpStreamTransport extends AbstractTransport { return session; } - private async handleResumption(connection: ActiveSseConnection, lastEventId: string): Promise { + private async handleResumption(connection: ActiveSseConnection, lastEventId: string, sessionId?: string): Promise { logger.info(`Attempting resume stream ${connection.streamId} from event ${lastEventId}`); - if (!connection.messageHistory || !this._config.resumability.enabled) { logger.warn(`Resume requested for ${connection.streamId}, but history unavailable/disabled. Starting fresh.`); return; } - const history = connection.messageHistory; - const lastReceivedIndex = history.findIndex(entry => entry.eventId === lastEventId); - if (lastReceivedIndex === -1) { logger.warn(`Event ${lastEventId} not found in history for ${connection.streamId}. Starting fresh.`); return; } - const messagesToReplay = history.slice(lastReceivedIndex + 1); - if (messagesToReplay.length === 0) { logger.info(`Event ${lastEventId} was last known event for ${connection.streamId}. No replay needed.`); return; } + + let messagesToReplay: MessageEntry[] = []; + + if (this._config.resumability.messageStoreType === 'global') { + if (!this._config.resumability.enabled) { + logger.warn(`Resume requested for ${connection.streamId}, but resumability is disabled. Starting fresh.`); + return; + } + + messagesToReplay = this.getMessagesAfterEvent(sessionId, lastEventId); + + if (messagesToReplay.length === 0) { + logger.warn(`Event ${lastEventId} not found in global message store for session ${sessionId || 'N/A'}. Starting fresh.`); + return; + } + } else if (this._config.resumability.messageStoreType === 'connection') { + if (!connection.messageHistory || !this._config.resumability.enabled) { + logger.warn(`Resume requested for ${connection.streamId}, but history unavailable/disabled. Starting fresh.`); + return; + } + + const history = connection.messageHistory; + const lastReceivedIndex = history.findIndex(entry => entry.eventId === lastEventId); + + if (lastReceivedIndex === -1) { + logger.warn(`Event ${lastEventId} not found in history for ${connection.streamId}. Starting fresh.`); + return; + } + + messagesToReplay = history.slice(lastReceivedIndex + 1); + } + + if (messagesToReplay.length === 0) { + logger.info(`Event ${lastEventId} was last known event for ${connection.streamId}. No replay needed.`); + return; + } + logger.info(`Replaying ${messagesToReplay.length} messages for stream ${connection.streamId}`); + for (const entry of messagesToReplay) { - if (!connection.res || connection.res.writableEnded) { logger.warn(`Stream ${connection.streamId} closed during replay. Aborting.`); return; } + if (!connection.res || connection.res.writableEnded) { + logger.warn(`Stream ${connection.streamId} closed during replay. Aborting.`); + return; + } try { logger.debug(`Replaying event ${entry.eventId}`); connection.res.write(`id: ${entry.eventId}\n`); connection.res.write(`data: ${JSON.stringify(entry.message)}\n\n`); connection.lastEventIdSent = entry.eventId; - } catch(error: any) { logger.error(`Error replaying message ${entry.eventId} to ${connection.streamId}: ${error.message}. Aborting.`); this.cleanupConnection(connection, `Replay write error: ${error.message}`); return; } + } catch(error: any) { + logger.error(`Error replaying message ${entry.eventId} to ${connection.streamId}: ${error.message}. Aborting.`); + this.cleanupConnection(connection, `Replay write error: ${error.message}`); + return; + } } + logger.info(`Finished replaying messages for stream ${connection.streamId}`); } @@ -699,7 +751,14 @@ export class HttpStreamTransport extends AbstractTransport { async close(): Promise { logger.info("Closing HttpStreamTransport..."); + + if (this._pruneInterval) { + clearInterval(this._pruneInterval); + this._pruneInterval = undefined; + } + this.cleanupAllConnections(); + return new Promise((resolve, reject) => { if (this._server) { const server = this._server; this._server = undefined; @@ -709,4 +768,61 @@ export class HttpStreamTransport extends AbstractTransport { }); } isRunning(): boolean { return Boolean(this._server?.listening); } + + private storeMessage(message: JsonRpcMessage, sessionId: string | undefined, eventId: string): void { + if (!this._config.resumability.enabled) return; + + const timestamp = Date.now(); + const messageEntry: MessageEntry = { eventId, message, timestamp }; + + if (this._config.resumability.messageStoreType === 'global' && sessionId) { + if (!this._globalMessageStore.has(sessionId)) { + this._globalMessageStore.set(sessionId, new Map()); + } + this._globalMessageStore.get(sessionId)!.set(eventId, messageEntry); + } + } + + private pruneMessageStore(): void { + if (!this._config.resumability.enabled || this._config.resumability.messageStoreType !== 'global') return; + + const cutoff = Date.now() - this._config.resumability.historyDuration; + + for (const [sessionId, messages] of this._globalMessageStore.entries()) { + let expired = 0; + for (const [eventId, entry] of messages.entries()) { + if (entry.timestamp < cutoff) { + messages.delete(eventId); + expired++; + } + } + + if (messages.size === 0) { + this._globalMessageStore.delete(sessionId); + } else if (expired > 0) { + logger.debug(`Pruned ${expired} expired messages for session ${sessionId}`); + } + } + } + + private getMessagesAfterEvent(sessionId: string | undefined, lastEventId: string): MessageEntry[] { + if (!sessionId || !this._config.resumability.enabled || + this._config.resumability.messageStoreType !== 'global' || + !this._globalMessageStore.has(sessionId)) { + return []; + } + + const messages = this._globalMessageStore.get(sessionId)!; + + const allEntries = Array.from(messages.values()) + .sort((a, b) => a.timestamp - b.timestamp); + + const lastReceivedIndex = allEntries.findIndex(entry => entry.eventId === lastEventId); + + if (lastReceivedIndex === -1) { + return []; + } + + return allEntries.slice(lastReceivedIndex + 1); + } } diff --git a/src/transports/http/types.ts b/src/transports/http/types.ts index eacb94b..089d663 100644 --- a/src/transports/http/types.ts +++ b/src/transports/http/types.ts @@ -139,6 +139,7 @@ export interface HttpStreamTransportConfig { * How long to keep message history in ms. Default: 300000 (5 min) */ historyDuration?: number; + messageStoreType?: 'connection' | 'global'; }; } @@ -170,6 +171,7 @@ export const DEFAULT_HTTP_STREAM_CONFIG: HttpStreamTransportConfigInternal = { resumability: { enabled: false, historyDuration: 300000, + messageStoreType: 'global', }, cors: { allowOrigin: "*", @@ -197,7 +199,7 @@ export interface ActiveSseConnection { sessionId?: string; streamId: string; lastEventIdSent: string | null; - messageHistory?: Array<{ eventId: string; message: JsonRpcMessage; timestamp: number }>; + messageHistory?: Array; pingInterval?: NodeJS.Timeout; isPostConnection: boolean; pendingResponseIds?: Set; @@ -213,3 +215,9 @@ export interface BatchResponseState { timeoutId: NodeJS.Timeout; isCompleted: boolean; } + +export interface MessageEntry { + eventId: string; + message: JsonRpcMessage; + timestamp: number; +}