-
Notifications
You must be signed in to change notification settings - Fork 616
/
Copy pathstreamableHttp.ts
294 lines (252 loc) · 9.75 KB
/
streamableHttp.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import { Transport } from "../shared/transport.js";
import { isJSONRPCNotification, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
import { EventSourceParserStream } from "eventsource-parser/stream";
export class StreamableHTTPError extends Error {
constructor(
public readonly code: number | undefined,
message: string | undefined,
) {
super(`Streamable HTTP error: ${message}`);
}
}
/**
* Configuration options for the `StreamableHTTPClientTransport`.
*/
export type StreamableHTTPClientTransportOptions = {
/**
* An OAuth client provider to use for authentication.
*
* When an `authProvider` is specified and the connection is started:
* 1. The connection is attempted with any existing access token from the `authProvider`.
* 2. If the access token has expired, the `authProvider` is used to refresh the token.
* 3. If token refresh fails or no access token exists, and auth is required, `OAuthClientProvider.redirectToAuthorization` is called, and an `UnauthorizedError` will be thrown from `connect`/`start`.
*
* After the user has finished authorizing via their user agent, and is redirected back to the MCP client application, call `StreamableHTTPClientTransport.finishAuth` with the authorization code before retrying the connection.
*
* If an `authProvider` is not provided, and auth is required, an `UnauthorizedError` will be thrown.
*
* `UnauthorizedError` might also be thrown when sending any message over the transport, indicating that the session has expired, and needs to be re-authed and reconnected.
*/
authProvider?: OAuthClientProvider;
/**
* Customizes HTTP requests to the server.
*/
requestInit?: RequestInit;
};
/**
* Client transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification.
* It will connect to a server using HTTP POST for sending messages and HTTP GET with Server-Sent Events
* for receiving messages.
*/
export class StreamableHTTPClientTransport implements Transport {
private _abortController?: AbortController;
private _url: URL;
private _requestInit?: RequestInit;
private _authProvider?: OAuthClientProvider;
private _sessionId?: string;
private _lastEventId?: string;
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
constructor(
url: URL,
opts?: StreamableHTTPClientTransportOptions,
) {
this._url = url;
this._requestInit = opts?.requestInit;
this._authProvider = opts?.authProvider;
}
private async _authThenStart(): Promise<void> {
if (!this._authProvider) {
throw new UnauthorizedError("No auth provider");
}
let result: AuthResult;
try {
result = await auth(this._authProvider, { serverUrl: this._url });
} catch (error) {
this.onerror?.(error as Error);
throw error;
}
if (result !== "AUTHORIZED") {
throw new UnauthorizedError();
}
return await this._startOrAuthStandaloneSSE();
}
private async _commonHeaders(): Promise<Headers> {
const headers: HeadersInit = {};
if (this._authProvider) {
const tokens = await this._authProvider.tokens();
if (tokens) {
headers["Authorization"] = `Bearer ${tokens.access_token}`;
}
}
if (this._sessionId) {
headers["mcp-session-id"] = this._sessionId;
}
return new Headers(
{ ...headers, ...this._requestInit?.headers }
);
}
private async _startOrAuthStandaloneSSE(): Promise<void> {
try {
// Try to open an initial SSE stream with GET to listen for server messages
// This is optional according to the spec - server may not support it
const headers = await this._commonHeaders();
headers.set("Accept", "text/event-stream");
// Include Last-Event-ID header for resumable streams
if (this._lastEventId) {
headers.set("last-event-id", this._lastEventId);
}
const response = await fetch(this._url, {
method: "GET",
headers,
signal: this._abortController?.signal,
});
if (!response.ok) {
if (response.status === 401 && this._authProvider) {
// Need to authenticate
return await this._authThenStart();
}
// 405 indicates that the server does not offer an SSE stream at GET endpoint
// This is an expected case that should not trigger an error
if (response.status === 405) {
return;
}
throw new StreamableHTTPError(
response.status,
`Failed to open SSE stream: ${response.statusText}`,
);
}
// Successful connection, handle the SSE stream as a standalone listener
this._handleSseStream(response.body);
} catch (error) {
this.onerror?.(error as Error);
throw error;
}
}
private _handleSseStream(stream: ReadableStream<Uint8Array> | null): void {
if (!stream) {
return;
}
const processStream = async () => {
// Create a pipeline: binary stream -> text decoder -> SSE parser
const eventStream = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream());
for await (const event of eventStream) {
// Update last event ID if provided
if (event.id) {
this._lastEventId = event.id;
}
// Handle message events (default event type is undefined per docs)
// or explicit 'message' event type
if (!event.event || event.event === "message") {
try {
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
}
}
}
};
processStream().catch(err => this.onerror?.(err));
}
async start() {
if (this._abortController) {
throw new Error(
"StreamableHTTPClientTransport already started! If using Client class, note that connect() calls start() automatically.",
);
}
this._abortController = new AbortController();
}
/**
* Call this method after the user has finished authorizing via their user agent and is redirected back to the MCP client application. This will exchange the authorization code for an access token, enabling the next connection attempt to successfully auth.
*/
async finishAuth(authorizationCode: string): Promise<void> {
if (!this._authProvider) {
throw new UnauthorizedError("No auth provider");
}
const result = await auth(this._authProvider, { serverUrl: this._url, authorizationCode });
if (result !== "AUTHORIZED") {
throw new UnauthorizedError("Failed to authorize");
}
}
async close(): Promise<void> {
// Abort any pending requests
this._abortController?.abort();
this.onclose?.();
}
async send(message: JSONRPCMessage | JSONRPCMessage[]): Promise<void> {
try {
const headers = await this._commonHeaders();
headers.set("content-type", "application/json");
headers.set("accept", "application/json, text/event-stream");
const init = {
...this._requestInit,
method: "POST",
headers,
body: JSON.stringify(message),
signal: this._abortController?.signal,
};
const response = await fetch(this._url, init);
// Handle session ID received during initialization
const sessionId = response.headers.get("mcp-session-id");
if (sessionId) {
this._sessionId = sessionId;
}
if (!response.ok) {
if (response.status === 401 && this._authProvider) {
const result = await auth(this._authProvider, { serverUrl: this._url });
if (result !== "AUTHORIZED") {
throw new UnauthorizedError();
}
// Purposely _not_ awaited, so we don't call onerror twice
return this.send(message);
}
const text = await response.text().catch(() => null);
throw new Error(
`Error POSTing to endpoint (HTTP ${response.status}): ${text}`,
);
}
// If the response is 202 Accepted, there's no body to process
if (response.status === 202) {
// if the accepted notification is initialized, we start the SSE stream
// if it's supported by the server
if (isJSONRPCNotification(message) && message.method === "notifications/initialized") {
// We don't need to handle 405 here anymore as it's handled in _startOrAuthStandaloneSSE
this._startOrAuthStandaloneSSE().catch(err => this.onerror?.(err));
}
return;
}
// Get original message(s) for detecting request IDs
const messages = Array.isArray(message) ? message : [message];
const hasRequests = messages.filter(msg => "method" in msg && "id" in msg && msg.id !== undefined).length > 0;
// Check the response type
const contentType = response.headers.get("content-type");
if (hasRequests) {
if (contentType?.includes("text/event-stream")) {
this._handleSseStream(response.body);
} else if (contentType?.includes("application/json")) {
// For non-streaming servers, we might get direct JSON responses
const data = await response.json();
const responseMessages = Array.isArray(data)
? data.map(msg => JSONRPCMessageSchema.parse(msg))
: [JSONRPCMessageSchema.parse(data)];
for (const msg of responseMessages) {
this.onmessage?.(msg);
}
} else {
throw new StreamableHTTPError(
-1,
`Unexpected content type: ${contentType}`,
);
}
}
} catch (error) {
this.onerror?.(error as Error);
throw error;
}
}
}