Skip to content

Support server returning only JSON on requests #299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 71 additions & 8 deletions src/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ describe("StreamableHTTPClientTransport", () => {
(global.fetch as jest.Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ "mcp-session-id": "test-session-id" }),
headers: new Headers({ "content-type": "text/event-stream", "mcp-session-id": "test-session-id" }),
});

await transport.send(message);
Expand Down Expand Up @@ -164,7 +164,7 @@ describe("StreamableHTTPClientTransport", () => {
// We expect the 405 error to be caught and handled gracefully
// This should not throw an error that breaks the transport
await transport.start();
await expect(transport.openSseStream()).rejects.toThrow('Failed to open SSE stream: Method Not Allowed');
await expect(transport.openSseStream()).rejects.toThrow("Failed to open SSE stream: Method Not Allowed");

// Check that GET was attempted
expect(global.fetch).toHaveBeenCalledWith(
Expand Down Expand Up @@ -192,7 +192,7 @@ describe("StreamableHTTPClientTransport", () => {
const stream = new ReadableStream({
start(controller) {
// Send a server notification via SSE
const event = 'event: message\ndata: {"jsonrpc": "2.0", "method": "serverNotification", "params": {}}\n\n';
const event = "event: message\ndata: {\"jsonrpc\": \"2.0\", \"method\": \"serverNotification\", \"params\": {}}\n\n";
controller.enqueue(encoder.encode(event));
}
});
Expand Down Expand Up @@ -237,7 +237,7 @@ describe("StreamableHTTPClientTransport", () => {

(global.fetch as jest.Mock)
.mockResolvedValueOnce({
ok: true,
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/event-stream" }),
body: makeStream("request1")
Expand All @@ -263,13 +263,13 @@ describe("StreamableHTTPClientTransport", () => {

// Both streams should have delivered their messages
expect(messageSpy).toHaveBeenCalledTimes(2);

// Verify received messages without assuming specific order
expect(messageSpy.mock.calls.some(call => {
const msg = call[0];
return msg.id === "request1" && msg.result?.id === "request1";
})).toBe(true);

expect(messageSpy.mock.calls.some(call => {
const msg = call[0];
return msg.id === "request2" && msg.result?.id === "request2";
Expand All @@ -281,7 +281,7 @@ describe("StreamableHTTPClientTransport", () => {
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
const event = 'id: event-123\nevent: message\ndata: {"jsonrpc": "2.0", "method": "serverNotification", "params": {}}\n\n';
const event = "id: event-123\nevent: message\ndata: {\"jsonrpc\": \"2.0\", \"method\": \"serverNotification\", \"params\": {}}\n\n";
controller.enqueue(encoder.encode(event));
controller.close();
}
Expand Down Expand Up @@ -313,4 +313,67 @@ describe("StreamableHTTPClientTransport", () => {
const lastCall = calls[calls.length - 1];
expect(lastCall[1].headers.get("last-event-id")).toBe("event-123");
});
});

it("should throw error when invalid content-type is received", async () => {
const message: JSONRPCMessage = {
jsonrpc: "2.0",
method: "test",
params: {},
id: "test-id"
};

const stream = new ReadableStream({
start(controller) {
controller.enqueue("invalid text response");
controller.close();
}
});

const errorSpy = jest.fn();
transport.onerror = errorSpy;

(global.fetch as jest.Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/plain" }),
body: stream
});

await transport.start();
await expect(transport.send(message)).rejects.toThrow("Unexpected content type: text/plain");
expect(errorSpy).toHaveBeenCalled();
});


it("should always send specified custom headers", async () => {
const requestInit = {
headers: {
"X-Custom-Header": "CustomValue"
}
};
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
requestInit: requestInit
});

let actualReqInit: RequestInit = {};

((global.fetch as jest.Mock)).mockImplementation(
async (_url, reqInit) => {
actualReqInit = reqInit;
return new Response(null, { status: 200, headers: { "content-type": "text/event-stream" } });
}
);

await transport.start();

await transport.openSseStream();
expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue");

requestInit.headers["X-Custom-Header"] = "SecondCustomValue";

await transport.send({ jsonrpc: "2.0", method: "test", params: {} } as JSONRPCMessage);
expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("SecondCustomValue");

expect(global.fetch).toHaveBeenCalledTimes(2);
});
});
99 changes: 43 additions & 56 deletions src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Transport } from "../shared/transport.js";
import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
import { EventSourceParserStream } from 'eventsource-parser/stream';
import { EventSourceParserStream } from "eventsource-parser/stream";

export class StreamableHTTPError extends Error {
constructor(
public readonly code: number | undefined,
Expand All @@ -17,16 +18,16 @@ export class StreamableHTTPError extends Error {
export type StreamableHTTPClientTransportOptions = {
/**
* An OAuth client provider to use for authentication.
*
*
* When an `authProvider` is specified and the connection is started:
* 1. The connection is attempted with any existing access token from the `authProvider`.
* 2. If the access token has expired, the `authProvider` is used to refresh the token.
* 3. If token refresh fails or no access token exists, and auth is required, `OAuthClientProvider.redirectToAuthorization` is called, and an `UnauthorizedError` will be thrown from `connect`/`start`.
*
*
* After the user has finished authorizing via their user agent, and is redirected back to the MCP client application, call `StreamableHTTPClientTransport.finishAuth` with the authorization code before retrying the connection.
*
*
* If an `authProvider` is not provided, and auth is required, an `UnauthorizedError` will be thrown.
*
*
* `UnauthorizedError` might also be thrown when sending any message over the transport, indicating that the session has expired, and needs to be re-authed and reconnected.
*/
authProvider?: OAuthClientProvider;
Expand Down Expand Up @@ -83,7 +84,7 @@ export class StreamableHTTPClientTransport implements Transport {
return await this._startOrAuthStandaloneSSE();
}

private async _commonHeaders(): Promise<HeadersInit> {
private async _commonHeaders(): Promise<Headers> {
const headers: HeadersInit = {};
if (this._authProvider) {
const tokens = await this._authProvider.tokens();
Expand All @@ -96,24 +97,25 @@ export class StreamableHTTPClientTransport implements Transport {
headers["mcp-session-id"] = this._sessionId;
}

return headers;
return new Headers(
{ ...headers, ...this._requestInit?.headers }
);
}

private async _startOrAuthStandaloneSSE(): Promise<void> {
try {
// Try to open an initial SSE stream with GET to listen for server messages
// This is optional according to the spec - server may not support it
const commonHeaders = await this._commonHeaders();
const headers = new Headers(commonHeaders);
headers.set('Accept', 'text/event-stream');
const headers = await this._commonHeaders();
headers.set("Accept", "text/event-stream");

// Include Last-Event-ID header for resumable streams
if (this._lastEventId) {
headers.set('last-event-id', this._lastEventId);
headers.set("last-event-id", this._lastEventId);
}

const response = await fetch(this._url, {
method: 'GET',
method: "GET",
headers,
signal: this._abortController?.signal,
});
Expand All @@ -124,12 +126,10 @@ export class StreamableHTTPClientTransport implements Transport {
return await this._authThenStart();
}

const error = new StreamableHTTPError(
throw new StreamableHTTPError(
response.status,
`Failed to open SSE stream: ${response.statusText}`,
);
this.onerror?.(error);
throw error;
}

// Successful connection, handle the SSE stream as a standalone listener
Expand All @@ -144,42 +144,32 @@ export class StreamableHTTPClientTransport implements Transport {
if (!stream) {
return;
}
// Create a pipeline: binary stream -> text decoder -> SSE parser
const eventStream = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream());

const reader = eventStream.getReader();
const processStream = async () => {
try {
while (true) {
const { done, value: event } = await reader.read();
if (done) {
break;
}

// Update last event ID if provided
if (event.id) {
this._lastEventId = event.id;
}

// Handle message events (default event type is undefined per docs)
// or explicit 'message' event type
if (!event.event || event.event === 'message') {
try {
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
}
// Create a pipeline: binary stream -> text decoder -> SSE parser
const eventStream = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream());

for await (const event of eventStream) {
// Update last event ID if provided
if (event.id) {
this._lastEventId = event.id;
}
// Handle message events (default event type is undefined per docs)
// or explicit 'message' event type
if (!event.event || event.event === "message") {
try {
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
}
}
} catch (error) {
this.onerror?.(error as Error);
}
};

processStream();
processStream().catch(err => this.onerror?.(err));
}

async start() {
Expand Down Expand Up @@ -215,8 +205,7 @@ export class StreamableHTTPClientTransport implements Transport {

async send(message: JSONRPCMessage | JSONRPCMessage[]): Promise<void> {
try {
const commonHeaders = await this._commonHeaders();
const headers = new Headers({ ...commonHeaders, ...this._requestInit?.headers });
const headers = await this._commonHeaders();
headers.set("content-type", "application/json");
headers.set("accept", "application/json, text/event-stream");

Expand Down Expand Up @@ -261,20 +250,13 @@ export class StreamableHTTPClientTransport implements Transport {
// Get original message(s) for detecting request IDs
const messages = Array.isArray(message) ? message : [message];

// Extract IDs from request messages for tracking responses
const requestIds = messages.filter(msg => 'method' in msg && 'id' in msg)
.map(msg => 'id' in msg ? msg.id : undefined)
.filter(id => id !== undefined);

// If we have request IDs and an SSE response, create a unique stream ID
const hasRequests = requestIds.length > 0;
const hasRequests = messages.filter(msg => "method" in msg && "id" in msg && msg.id !== undefined).length > 0;

// Check the response type
const contentType = response.headers.get("content-type");

if (hasRequests) {
if (contentType?.includes("text/event-stream")) {
// For streaming responses, create a unique stream ID based on request IDs
this._handleSseStream(response.body);
} else if (contentType?.includes("application/json")) {
// For non-streaming servers, we might get direct JSON responses
Expand All @@ -286,6 +268,11 @@ export class StreamableHTTPClientTransport implements Transport {
for (const msg of responseMessages) {
this.onmessage?.(msg);
}
} else {
throw new StreamableHTTPError(
-1,
`Unexpected content type: ${contentType}`,
);
}
}
} catch (error) {
Expand All @@ -296,7 +283,7 @@ export class StreamableHTTPClientTransport implements Transport {

/**
* Opens SSE stream to receive messages from the server.
*
*
* This allows the server to push messages to the client without requiring the client
* to first send a request via HTTP POST. Some servers may not support this feature.
* If authentication is required but fails, this method will throw an UnauthorizedError.
Expand All @@ -309,4 +296,4 @@ export class StreamableHTTPClientTransport implements Transport {
}
await this._startOrAuthStandaloneSSE();
}
}
}
28 changes: 28 additions & 0 deletions src/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,34 @@ This directory contains example implementations of MCP clients and servers using

Multi node with stete management example will be added soon after we add support.

### Server with JSON response mode (`server/jsonResponseStreamableHttp.ts`)

A simple MCP server that uses the Streamable HTTP transport with JSON response mode enabled, implemented with Express. The server provides a simple `greet` tool that returns a greeting for a name.

#### Running the server

```bash
npx tsx src/examples/server/jsonResponseStreamableHttp.ts
```

The server will start on port 3000. You can test the initialization and tool calling:

```bash
# Initialize the server and get the session ID from headers
SESSION_ID=$(curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work for me - I get:

❯ curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" \
  -d '{"jsonrpc":"2.0","method":"initialize","params":{"capabilities":{}},"id":"1"}' \
  -i http://localhost:3000/mcp 2>&1
HTTP/1.1 406 Not Acceptable
X-Powered-By: Express
Date: Thu, 10 Apr 2025 10:50:08 GMT
Connection: keep-alive
Keep-Alive: timeout=5
Transfer-Encoding: chunked

{"jsonrpc":"2.0","error":{"code":-32000,"message":"Not Acceptable: Client must accept both application/json and text/event-stream"},"id":null}%   

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This worked with the current implementation:

SESSION_ID=$(curl -X POST \
  -H "Content-Type: application/json" \
  -H "Accept: application/json" \
  -H "Accept: text/event-stream" \
  -d '{
    "jsonrpc": "2.0",
    "method": "initialize",
    "params": {
      "capabilities": {},
      "protocolVersion": "2025-03-26", 
      "clientInfo": {
        "name": "test",
        "version": "1.0.0"
      }
    },
    "id": "1"
  }' \
  -i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r')
echo "Session ID: $SESSION_ID"

-d '{"jsonrpc":"2.0","method":"initialize","params":{"capabilities":{}},"id":"1"}' \
-i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r')
echo "Session ID: $SESSION_ID"

# Call the greet tool using the saved session ID
curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" \
-H "mcp-session-id: $SESSION_ID" \
-d '{"jsonrpc":"2.0","method":"mcp.call_tool","params":{"name":"greet","arguments":{"name":"World"}},"id":"2"}' \
http://localhost:3000/mcp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mcp.call_tool isn't correct, also I needed to have the Accept: text/event-stream header here too


curl -X POST \
  -H "Content-Type: application/json" \
  -H "Accept: application/json" \
  -H "Accept: text/event-stream" \
  -H "mcp-session-id: $SESSION_ID" \
  -d '{
    "jsonrpc": "2.0",
    "method": "tools/call",
    "params": {
      "name": "greet",
      "arguments": {
        "name": "World"
      }
    },
    "id": "2"
  }' \
  http://localhost:3000/mcp

```

Note that in this example, we're using plain JSON response mode by setting `Accept: application/json` header.

### Server (`server/simpleStreamableHttp.ts`)

A simple MCP server that uses the Streamable HTTP transport, implemented with Express. The server provides:
Expand Down
Loading