Skip to content

Commit 4b47edb

Browse files
authored
Merge pull request #49 from QuantGeekDev/feature/resumability
feat: add sse resumability
2 parents c5d34a5 + e20b7cc commit 4b47edb

File tree

3 files changed

+139
-14
lines changed

3 files changed

+139
-14
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ README
55
.DS_Store
66
dist
77
node_modules
8+
.cursor

src/transports/http/server.ts

+129-13
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import {
77
JsonRpcSuccessResponse,
88
JsonRpcErrorResponse,
99
JsonRpcError,
10-
JsonRpcId
10+
JsonRpcId,
11+
MessageEntry
1112
} from "./types.js";
1213

1314
import contentType from "content-type";
@@ -68,6 +69,8 @@ export class HttpStreamTransport extends AbstractTransport {
6869
private _requestStreamMap = new Map<string | number, ActiveSseConnection>();
6970
private _pendingBatches = new Map<ServerResponse, BatchResponseState>();
7071
private _eventCounter = 0;
72+
private _globalMessageStore = new Map<string, Map<string, MessageEntry>>();
73+
private _pruneInterval?: NodeJS.Timeout;
7174

7275
constructor(config: HttpStreamTransportConfig = {}) {
7376
super();
@@ -91,9 +94,14 @@ export class HttpStreamTransport extends AbstractTransport {
9194
responseMode: this._config.responseMode,
9295
sessionEnabled: this._config.session.enabled,
9396
resumabilityEnabled: this._config.resumability.enabled,
97+
resumabilityStore: this._config.resumability.messageStoreType,
9498
authEnabled: !!this._config.auth,
9599
corsOrigin: this._config.cors.allowOrigin
96100
}, null, 2)}`);
101+
102+
if (this._config.resumability.enabled && this._config.resumability.messageStoreType === 'global') {
103+
this._pruneInterval = setInterval(() => this.pruneMessageStore(), this._config.resumability.historyDuration / 3);
104+
}
97105
}
98106

99107
private getCorsHeaders(req: IncomingMessage, includeMaxAge: boolean = false): Record<string, string> {
@@ -419,7 +427,7 @@ export class HttpStreamTransport extends AbstractTransport {
419427
const streamId = randomUUID();
420428
const connection: ActiveSseConnection = {
421429
res, sessionId, streamId, lastEventIdSent: null,
422-
messageHistory: this._config.resumability.enabled ? [] : undefined,
430+
messageHistory: this._config.resumability.enabled && this._config.resumability.messageStoreType === 'connection' ? [] : undefined,
423431
pingInterval: undefined,
424432
isPostConnection
425433
};
@@ -438,7 +446,7 @@ export class HttpStreamTransport extends AbstractTransport {
438446
res.write(': stream opened\n\n');
439447
connection.pingInterval = setInterval(() => this.sendPing(connection), 15000);
440448
if (lastEventId && this._config.resumability.enabled) {
441-
this.handleResumption(connection, lastEventId).catch(err => { logger.error(`Error during stream resumption for ${streamId}: ${err.message}`); this.cleanupConnection(connection, `Resumption error: ${err.message}`); });
449+
this.handleResumption(connection, lastEventId, sessionId).catch(err => { logger.error(`Error during stream resumption for ${streamId}: ${err.message}`); this.cleanupConnection(connection, `Resumption error: ${err.message}`); });
442450
}
443451
const cleanupHandler = (reason: string) => { if (connection.pingInterval) { clearInterval(connection.pingInterval); connection.pingInterval = undefined; } this.cleanupConnection(connection, reason); };
444452
res.on("close", () => cleanupHandler("Client closed connection"));
@@ -572,12 +580,16 @@ export class HttpStreamTransport extends AbstractTransport {
572580
if (this._config.resumability.enabled) {
573581
eventId = `${Date.now()}-${this._eventCounter++}`;
574582
targetConnection.lastEventIdSent = eventId;
575-
if (targetConnection.messageHistory) {
583+
584+
this.storeMessage(message, targetConnection.sessionId, eventId);
585+
586+
if (this._config.resumability.messageStoreType === 'connection' && targetConnection.messageHistory) {
576587
const timestamp = Date.now();
577588
targetConnection.messageHistory.push({ eventId, message, timestamp });
578589
const cutoff = timestamp - this._config.resumability.historyDuration;
579590
targetConnection.messageHistory = targetConnection.messageHistory.filter(entry => entry.timestamp >= cutoff);
580591
}
592+
581593
logger.debug(`Sending SSE event ID: ${eventId} on stream ${targetConnection.streamId}`);
582594
targetConnection.res.write(`id: ${eventId}\n`);
583595
}
@@ -641,24 +653,64 @@ export class HttpStreamTransport extends AbstractTransport {
641653
return session;
642654
}
643655

644-
private async handleResumption(connection: ActiveSseConnection, lastEventId: string): Promise<void> {
656+
private async handleResumption(connection: ActiveSseConnection, lastEventId: string, sessionId?: string): Promise<void> {
645657
logger.info(`Attempting resume stream ${connection.streamId} from event ${lastEventId}`);
646-
if (!connection.messageHistory || !this._config.resumability.enabled) { logger.warn(`Resume requested for ${connection.streamId}, but history unavailable/disabled. Starting fresh.`); return; }
647-
const history = connection.messageHistory;
648-
const lastReceivedIndex = history.findIndex(entry => entry.eventId === lastEventId);
649-
if (lastReceivedIndex === -1) { logger.warn(`Event ${lastEventId} not found in history for ${connection.streamId}. Starting fresh.`); return; }
650-
const messagesToReplay = history.slice(lastReceivedIndex + 1);
651-
if (messagesToReplay.length === 0) { logger.info(`Event ${lastEventId} was last known event for ${connection.streamId}. No replay needed.`); return; }
658+
659+
let messagesToReplay: MessageEntry[] = [];
660+
661+
if (this._config.resumability.messageStoreType === 'global') {
662+
if (!this._config.resumability.enabled) {
663+
logger.warn(`Resume requested for ${connection.streamId}, but resumability is disabled. Starting fresh.`);
664+
return;
665+
}
666+
667+
messagesToReplay = this.getMessagesAfterEvent(sessionId, lastEventId);
668+
669+
if (messagesToReplay.length === 0) {
670+
logger.warn(`Event ${lastEventId} not found in global message store for session ${sessionId || 'N/A'}. Starting fresh.`);
671+
return;
672+
}
673+
} else if (this._config.resumability.messageStoreType === 'connection') {
674+
if (!connection.messageHistory || !this._config.resumability.enabled) {
675+
logger.warn(`Resume requested for ${connection.streamId}, but history unavailable/disabled. Starting fresh.`);
676+
return;
677+
}
678+
679+
const history = connection.messageHistory;
680+
const lastReceivedIndex = history.findIndex(entry => entry.eventId === lastEventId);
681+
682+
if (lastReceivedIndex === -1) {
683+
logger.warn(`Event ${lastEventId} not found in history for ${connection.streamId}. Starting fresh.`);
684+
return;
685+
}
686+
687+
messagesToReplay = history.slice(lastReceivedIndex + 1);
688+
}
689+
690+
if (messagesToReplay.length === 0) {
691+
logger.info(`Event ${lastEventId} was last known event for ${connection.streamId}. No replay needed.`);
692+
return;
693+
}
694+
652695
logger.info(`Replaying ${messagesToReplay.length} messages for stream ${connection.streamId}`);
696+
653697
for (const entry of messagesToReplay) {
654-
if (!connection.res || connection.res.writableEnded) { logger.warn(`Stream ${connection.streamId} closed during replay. Aborting.`); return; }
698+
if (!connection.res || connection.res.writableEnded) {
699+
logger.warn(`Stream ${connection.streamId} closed during replay. Aborting.`);
700+
return;
701+
}
655702
try {
656703
logger.debug(`Replaying event ${entry.eventId}`);
657704
connection.res.write(`id: ${entry.eventId}\n`);
658705
connection.res.write(`data: ${JSON.stringify(entry.message)}\n\n`);
659706
connection.lastEventIdSent = entry.eventId;
660-
} catch(error: any) { logger.error(`Error replaying message ${entry.eventId} to ${connection.streamId}: ${error.message}. Aborting.`); this.cleanupConnection(connection, `Replay write error: ${error.message}`); return; }
707+
} catch(error: any) {
708+
logger.error(`Error replaying message ${entry.eventId} to ${connection.streamId}: ${error.message}. Aborting.`);
709+
this.cleanupConnection(connection, `Replay write error: ${error.message}`);
710+
return;
711+
}
661712
}
713+
662714
logger.info(`Finished replaying messages for stream ${connection.streamId}`);
663715
}
664716

@@ -699,7 +751,14 @@ export class HttpStreamTransport extends AbstractTransport {
699751

700752
async close(): Promise<void> {
701753
logger.info("Closing HttpStreamTransport...");
754+
755+
if (this._pruneInterval) {
756+
clearInterval(this._pruneInterval);
757+
this._pruneInterval = undefined;
758+
}
759+
702760
this.cleanupAllConnections();
761+
703762
return new Promise((resolve, reject) => {
704763
if (this._server) {
705764
const server = this._server; this._server = undefined;
@@ -709,4 +768,61 @@ export class HttpStreamTransport extends AbstractTransport {
709768
});
710769
}
711770
isRunning(): boolean { return Boolean(this._server?.listening); }
771+
772+
private storeMessage(message: JsonRpcMessage, sessionId: string | undefined, eventId: string): void {
773+
if (!this._config.resumability.enabled) return;
774+
775+
const timestamp = Date.now();
776+
const messageEntry: MessageEntry = { eventId, message, timestamp };
777+
778+
if (this._config.resumability.messageStoreType === 'global' && sessionId) {
779+
if (!this._globalMessageStore.has(sessionId)) {
780+
this._globalMessageStore.set(sessionId, new Map());
781+
}
782+
this._globalMessageStore.get(sessionId)!.set(eventId, messageEntry);
783+
}
784+
}
785+
786+
private pruneMessageStore(): void {
787+
if (!this._config.resumability.enabled || this._config.resumability.messageStoreType !== 'global') return;
788+
789+
const cutoff = Date.now() - this._config.resumability.historyDuration;
790+
791+
for (const [sessionId, messages] of this._globalMessageStore.entries()) {
792+
let expired = 0;
793+
for (const [eventId, entry] of messages.entries()) {
794+
if (entry.timestamp < cutoff) {
795+
messages.delete(eventId);
796+
expired++;
797+
}
798+
}
799+
800+
if (messages.size === 0) {
801+
this._globalMessageStore.delete(sessionId);
802+
} else if (expired > 0) {
803+
logger.debug(`Pruned ${expired} expired messages for session ${sessionId}`);
804+
}
805+
}
806+
}
807+
808+
private getMessagesAfterEvent(sessionId: string | undefined, lastEventId: string): MessageEntry[] {
809+
if (!sessionId || !this._config.resumability.enabled ||
810+
this._config.resumability.messageStoreType !== 'global' ||
811+
!this._globalMessageStore.has(sessionId)) {
812+
return [];
813+
}
814+
815+
const messages = this._globalMessageStore.get(sessionId)!;
816+
817+
const allEntries = Array.from(messages.values())
818+
.sort((a, b) => a.timestamp - b.timestamp);
819+
820+
const lastReceivedIndex = allEntries.findIndex(entry => entry.eventId === lastEventId);
821+
822+
if (lastReceivedIndex === -1) {
823+
return [];
824+
}
825+
826+
return allEntries.slice(lastReceivedIndex + 1);
827+
}
712828
}

src/transports/http/types.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ export interface HttpStreamTransportConfig {
139139
* How long to keep message history in ms. Default: 300000 (5 min)
140140
*/
141141
historyDuration?: number;
142+
messageStoreType?: 'connection' | 'global';
142143
};
143144
}
144145

@@ -170,6 +171,7 @@ export const DEFAULT_HTTP_STREAM_CONFIG: HttpStreamTransportConfigInternal = {
170171
resumability: {
171172
enabled: false,
172173
historyDuration: 300000,
174+
messageStoreType: 'global',
173175
},
174176
cors: {
175177
allowOrigin: "*",
@@ -197,7 +199,7 @@ export interface ActiveSseConnection {
197199
sessionId?: string;
198200
streamId: string;
199201
lastEventIdSent: string | null;
200-
messageHistory?: Array<{ eventId: string; message: JsonRpcMessage; timestamp: number }>;
202+
messageHistory?: Array<MessageEntry>;
201203
pingInterval?: NodeJS.Timeout;
202204
isPostConnection: boolean;
203205
pendingResponseIds?: Set<string | number>;
@@ -213,3 +215,9 @@ export interface BatchResponseState {
213215
timeoutId: NodeJS.Timeout;
214216
isCompleted: boolean;
215217
}
218+
219+
export interface MessageEntry {
220+
eventId: string;
221+
message: JsonRpcMessage;
222+
timestamp: number;
223+
}

0 commit comments

Comments
 (0)