-
Notifications
You must be signed in to change notification settings - Fork 716
Client implementation of Streamable HTTP transport #290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please refactor to use the eventsource package (which we already depend upon)? A lot of this code will be able to be deleted, and I expect that, e.g., resumability will become easier.
src/client/streamableHttp.ts
Outdated
if (response.status === 405 || response.status === 404) { | ||
// Server doesn't support GET for SSE, which is allowed by the spec | ||
// We'll rely on SSE responses to POST requests for communication | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we permitting 404 here? It might be OK, just curious for the rationale.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intentional as we still need to add support for GET and Auth and the order it should work. Right now it will be rejected for clients that provide session id as it's pre-initialization. Will be addressing it in the follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was refactored, the implementation was wrong for GET, we need to have a separate method, unlike SSE implementation
src/client/streamableHttp.ts
Outdated
// If we have a session ID, send a DELETE request to explicitly terminate the session | ||
if (this._sessionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to automatically do this. What if the client process is restarting, for example? We should definitely offer a way to do this, but leave it up to the user when to trigger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, that's right! I'll remove automatic handling and add to follow ups to implemenet a way for a user to do it
src/client/streamableHttp.ts
Outdated
// Server might respond with 405 if it doesn't support explicit session termination | ||
// We don't throw an error in that case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might also see a 404 if the session ID has already been deleted on the server side (because the server could clear it at any time).
this.onclose?.(); | ||
} | ||
|
||
async send(message: JSONRPCMessage | JSONRPCMessage[]): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave the batching to another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, it's already there, I'd just leave it, the full batch support agree, will be a separate PR
src/client/streamableHttp.ts
Outdated
} | ||
|
||
// Successful connection, handle the SSE stream as a standalone listener | ||
const streamId = `initial-${Date.now()}`; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need these stream IDs? I can't find any spot where we locate a stream by its ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use it in the clean up to close all of the sessions
src/client/streamableHttp.ts
Outdated
// Close all active streams | ||
for (const reader of this._activeStreams.values()) { | ||
try { | ||
reader.cancel(); | ||
} catch (error) { | ||
this.onerror?.(error as Error); | ||
} | ||
} | ||
this._activeStreams.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be easier to use a single AbortController
for the whole object, then pass its AbortSignal
into all the fetch/SSE calls.
// Extract IDs from request messages for tracking responses | ||
const requestIds = messages.filter(msg => 'method' in msg && 'id' in msg) | ||
.map(msg => 'id' in msg ? msg.id : undefined) | ||
.filter(id => id !== undefined); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to close the SSE stream after receiving responses to all these request IDs.
Also, we may want to figure out how to automatically close the stream if all the requests are cancelled (by the client).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are doing closing on the server side and as we are reading the stream response it's done when the stream reader is done:
const { done, value: event } = await reader.read();
if (done) {
this._activeStreams.delete(streamId);
break;
}
Also, we may want to figure out how to automatically close the stream if all the requests are cancelled (by the client).
Adding to the follow ups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes a well-behaving server, which might not always be the case (e.g., the server might not be using an official SDK, or one of the SDKs may behave differently).
Basically, the client should not assume that the server will close the connection on its own.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point! adding to follow ups
src/client/streamableHttp.ts
Outdated
} | ||
} | ||
|
||
private _handleSseStream(stream: ReadableStream<Uint8Array> | null, streamId: string): void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NB:
import { EventSourceParserStream } from 'eventsource-parser/stream';
would take care of a lot of this parsing logic and eventsource-parser
is already a transitive dependency of existing eventsource
dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beaulac this is great, thank you!
I tried to use eventsource in my own attempts at this, but |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/client/streamableHttp.ts
Outdated
@@ -121,32 +119,76 @@ export class StreamableHTTPClientTransport implements Transport { | |||
signal: this._abortController?.signal, | |||
}); | |||
|
|||
if (response.status === 405 || response.status === 404) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to handle 405s here, no?
How to implement session management without broker like redis in a distributed scenario |
Fix Reconnect handler
Introduces support for streamable http transport on client. Server was implemented in #266
TO DO
Follow ups