Skip to content

Commit cab69db

Browse files
committed
Improved the relay realtime cleanup
1 parent 4dd27b4 commit cab69db

File tree

1 file changed

+32
-2
lines changed

1 file changed

+32
-2
lines changed

apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts

+32-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { singleton } from "~/utils/singleton";
77

88
export type RelayRealtimeStreamsOptions = {
99
ttl: number;
10+
cleanupInterval: number;
1011
fallbackIngestor: StreamIngestor;
1112
fallbackResponder: StreamResponder;
1213
waitForBufferTimeout?: number; // Time to wait for buffer in ms (default: 500ms)
@@ -17,6 +18,7 @@ interface RelayedStreamRecord {
1718
stream: ReadableStream<Uint8Array>;
1819
createdAt: number;
1920
lastAccessed: number;
21+
locked: boolean;
2022
finalized: boolean;
2123
}
2224

@@ -33,7 +35,7 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
3335
// Periodic cleanup
3436
this.cleanupInterval = setInterval(() => {
3537
this.cleanup();
36-
}, this.options.ttl).unref();
38+
}, this.options.cleanupInterval).unref();
3739
}
3840

3941
async streamResponse(
@@ -76,6 +78,23 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
7678
}
7779
}
7880

81+
// Only 1 reader of the stream can use the relayed stream, the rest should use the fallback
82+
if (record.locked) {
83+
logger.debug("[RelayRealtimeStreams][streamResponse] Stream already locked, using fallback", {
84+
streamId,
85+
runId,
86+
});
87+
88+
return this.options.fallbackResponder.streamResponse(
89+
request,
90+
runId,
91+
streamId,
92+
environment,
93+
signal
94+
);
95+
}
96+
97+
record.locked = true;
7998
record.lastAccessed = Date.now();
8099

81100
logger.debug("[RelayRealtimeStreams][streamResponse] Streaming from ephemeral record", {
@@ -106,7 +125,7 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
106125
"Content-Type": "text/event-stream",
107126
"Cache-Control": "no-cache",
108127
Connection: "keep-alive",
109-
"x-relay-realtime-streams": "true",
128+
"x-trigger-relay-realtime-streams": "true",
110129
},
111130
});
112131
}
@@ -157,6 +176,7 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
157176
createdAt: Date.now(),
158177
lastAccessed: Date.now(),
159178
finalized: false,
179+
locked: false,
160180
};
161181
this._buffers.set(bufferKey, record);
162182
} else {
@@ -167,12 +187,21 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
167187

168188
private cleanup() {
169189
const now = Date.now();
190+
191+
logger.debug("[RelayRealtimeStreams][cleanup] Cleaning up old buffers", {
192+
bufferCount: this._buffers.size,
193+
});
194+
170195
for (const [key, record] of this._buffers.entries()) {
171196
// If last accessed is older than ttl, clean up
172197
if (now - record.lastAccessed > this.options.ttl) {
173198
this.deleteBuffer(key);
174199
}
175200
}
201+
202+
logger.debug("[RelayRealtimeStreams][cleanup] Cleaned up old buffers", {
203+
bufferCount: this._buffers.size,
204+
});
176205
}
177206

178207
private deleteBuffer(bufferKey: string) {
@@ -216,6 +245,7 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
216245
function initializeRelayRealtimeStreams() {
217246
return new RelayRealtimeStreams({
218247
ttl: 1000 * 60 * 5, // 5 minutes
248+
cleanupInterval: 1000 * 60, // 1 minute
219249
fallbackIngestor: v1RealtimeStreams,
220250
fallbackResponder: v1RealtimeStreams,
221251
});

0 commit comments

Comments
 (0)