Skip to content

Commit 1867400

Browse files
committed
remove the _activeStreams map
1 parent 368d666 commit 1867400

File tree

1 file changed

+7
-23
lines changed

1 file changed

+7
-23
lines changed

src/client/streamableHttp.ts

+7-23
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ export type StreamableHTTPClientTransportOptions = {
4343
* for receiving messages.
4444
*/
4545
export class StreamableHTTPClientTransport implements Transport {
46-
private _activeStreams: Map<string, ReadableStreamDefaultReader<EventSourceMessage>> = new Map();
4746
private _abortController?: AbortController;
4847
private _url: URL;
4948
private _requestInit?: RequestInit;
@@ -134,33 +133,28 @@ export class StreamableHTTPClientTransport implements Transport {
134133
}
135134

136135
// Successful connection, handle the SSE stream as a standalone listener
137-
const streamId = `standalone-sse-${Date.now()}`;
138-
this._handleSseStream(response.body, streamId);
136+
this._handleSseStream(response.body);
139137
} catch (error) {
140138
this.onerror?.(error as Error);
141139
throw error;
142140
}
143141
}
144142

145-
private _handleSseStream(stream: ReadableStream<Uint8Array> | null, streamId: string): void {
143+
private _handleSseStream(stream: ReadableStream<Uint8Array> | null): void {
146144
if (!stream) {
147145
return;
148146
}
149-
150147
// Create a pipeline: binary stream -> text decoder -> SSE parser
151148
const eventStream = stream
152149
.pipeThrough(new TextDecoderStream())
153150
.pipeThrough(new EventSourceParserStream());
154151

155152
const reader = eventStream.getReader();
156-
this._activeStreams.set(streamId, reader);
157-
158153
const processStream = async () => {
159154
try {
160155
while (true) {
161156
const { done, value: event } = await reader.read();
162157
if (done) {
163-
this._activeStreams.delete(streamId);
164158
break;
165159
}
166160

@@ -181,7 +175,6 @@ export class StreamableHTTPClientTransport implements Transport {
181175
}
182176
}
183177
} catch (error) {
184-
this._activeStreams.delete(streamId);
185178
this.onerror?.(error as Error);
186179
}
187180
};
@@ -190,7 +183,7 @@ export class StreamableHTTPClientTransport implements Transport {
190183
}
191184

192185
async start() {
193-
if (this._activeStreams.size > 0) {
186+
if (this._abortController) {
194187
throw new Error(
195188
"StreamableHTTPClientTransport already started! If using Client class, note that connect() calls start() automatically.",
196189
);
@@ -214,16 +207,6 @@ export class StreamableHTTPClientTransport implements Transport {
214207
}
215208

216209
async close(): Promise<void> {
217-
// Close all active streams
218-
for (const reader of this._activeStreams.values()) {
219-
try {
220-
reader.cancel();
221-
} catch (error) {
222-
this.onerror?.(error as Error);
223-
}
224-
}
225-
this._activeStreams.clear();
226-
227210
// Abort any pending requests
228211
this._abortController?.abort();
229212

@@ -292,8 +275,7 @@ export class StreamableHTTPClientTransport implements Transport {
292275
if (hasRequests) {
293276
if (contentType?.includes("text/event-stream")) {
294277
// For streaming responses, create a unique stream ID based on request IDs
295-
const streamId = `req-${requestIds.join('-')}-${Date.now()}`;
296-
this._handleSseStream(response.body, streamId);
278+
this._handleSseStream(response.body);
297279
} else if (contentType?.includes("application/json")) {
298280
// For non-streaming servers, we might get direct JSON responses
299281
const data = await response.json();
@@ -321,7 +303,9 @@ export class StreamableHTTPClientTransport implements Transport {
321303
*/
322304
async openSseStream(): Promise<void> {
323305
if (!this._abortController) {
324-
this._abortController = new AbortController();
306+
throw new Error(
307+
"StreamableHTTPClientTransport not started! Call connect() before openSseStream().",
308+
);
325309
}
326310
await this._startOrAuthStandaloneSSE();
327311
}

0 commit comments

Comments
 (0)