Skip to content

StreamableHTTP - listening for messages from the server trough GET established SSE #304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 11, 2025
11 changes: 5 additions & 6 deletions src/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ describe("StreamableHTTPClientTransport", () => {
// We expect the 405 error to be caught and handled gracefully
// This should not throw an error that breaks the transport
await transport.start();
await expect(transport.openSseStream()).rejects.toThrow("Failed to open SSE stream: Method Not Allowed");

await expect(transport["_startOrAuthStandaloneSSE"]()).resolves.not.toThrow("Failed to open SSE stream: Method Not Allowed");
// Check that GET was attempted
expect(global.fetch).toHaveBeenCalledWith(
expect.anything(),
Expand Down Expand Up @@ -209,7 +208,7 @@ describe("StreamableHTTPClientTransport", () => {
transport.onmessage = messageSpy;

await transport.start();
await transport.openSseStream();
await transport["_startOrAuthStandaloneSSE"]();

// Give time for the SSE event to be processed
await new Promise(resolve => setTimeout(resolve, 50));
Expand Down Expand Up @@ -295,7 +294,7 @@ describe("StreamableHTTPClientTransport", () => {
});

await transport.start();
await transport.openSseStream();
await transport["_startOrAuthStandaloneSSE"]();
await new Promise(resolve => setTimeout(resolve, 50));

// Now simulate attempting to reconnect
Expand All @@ -306,7 +305,7 @@ describe("StreamableHTTPClientTransport", () => {
body: null
});

await transport.openSseStream();
await transport["_startOrAuthStandaloneSSE"]();

// Check that Last-Event-ID was included
const calls = (global.fetch as jest.Mock).mock.calls;
Expand Down Expand Up @@ -366,7 +365,7 @@ describe("StreamableHTTPClientTransport", () => {

await transport.start();

await transport.openSseStream();
await transport["_startOrAuthStandaloneSSE"]();
expect((actualReqInit.headers as Headers).get("x-custom-header")).toBe("CustomValue");

requestInit.headers["X-Custom-Header"] = "SecondCustomValue";
Expand Down
31 changes: 13 additions & 18 deletions src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Transport } from "../shared/transport.js";
import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
import { isJSONRPCNotification, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
import { EventSourceParserStream } from "eventsource-parser/stream";

Expand Down Expand Up @@ -126,12 +126,17 @@ export class StreamableHTTPClientTransport implements Transport {
return await this._authThenStart();
}

// 405 indicates that the server does not offer an SSE stream at GET endpoint
// This is an expected case that should not trigger an error
if (response.status === 405) {
return;
}

throw new StreamableHTTPError(
response.status,
`Failed to open SSE stream: ${response.statusText}`,
);
}

// Successful connection, handle the SSE stream as a standalone listener
this._handleSseStream(response.body);
} catch (error) {
Expand Down Expand Up @@ -244,6 +249,12 @@ export class StreamableHTTPClientTransport implements Transport {

// If the response is 202 Accepted, there's no body to process
if (response.status === 202) {
// if the accepted notification is initialized, we start the SSE stream
// if it's supported by the server
if (isJSONRPCNotification(message) && message.method === "notifications/initialized") {
// We don't need to handle 405 here anymore as it's handled in _startOrAuthStandaloneSSE
this._startOrAuthStandaloneSSE().catch(err => this.onerror?.(err));
}
return;
}

Expand Down Expand Up @@ -280,20 +291,4 @@ export class StreamableHTTPClientTransport implements Transport {
throw error;
}
}

/**
* Opens SSE stream to receive messages from the server.
*
* This allows the server to push messages to the client without requiring the client
* to first send a request via HTTP POST. Some servers may not support this feature.
* If authentication is required but fails, this method will throw an UnauthorizedError.
*/
async openSseStream(): Promise<void> {
if (!this._abortController) {
throw new Error(
"StreamableHTTPClientTransport not started! Call connect() before openSseStream().",
);
}
await this._startOrAuthStandaloneSSE();
}
}
147 changes: 77 additions & 70 deletions src/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,81 +2,82 @@

This directory contains example implementations of MCP clients and servers using the TypeScript SDK.

## Table of Contents

- [Streamable HTTP Servers - Single Node Deployment](#streamable-http---single-node-deployment-with-basic-session-state-management)
- [Simple Server with Streamable HTTP](#simple-server-with-streamable-http-transport-serversimplestreamablehttpts)
- [Server Supporting SSE via GET](#server-supporting-with-sse-via-get-serverstandalonessewithgetstreamablehttpts)
- [Server with JSON Response Mode](#server-with-json-response-mode-serverjsonresponsestreamablehttpts)
- [Client Example - Streamable HTTP](#client-clientsimplestreamablehttpts)
- [Useful bash commands for testing](#useful-commands-for-testing)

## Streamable HTTP - single node deployment with basic session state management

Multi node with stete management example will be added soon after we add support.
Multi node with state management example will be added soon after we add support.

### Server with JSON response mode (`server/jsonResponseStreamableHttp.ts`)

A simple MCP server that uses the Streamable HTTP transport with JSON response mode enabled, implemented with Express. The server provides a simple `greet` tool that returns a greeting for a name.
### Simple server with Streamable HTTP transport (`server/simpleStreamableHttp.ts`)

A simple MCP server that uses the Streamable HTTP transport, implemented with Express. The server provides:

- A simple `greet` tool that returns a greeting for a name
- A `greeting-template` prompt that generates a greeting template
- A static `greeting-resource` resource

#### Running the server

```bash
npx tsx src/examples/server/jsonResponseStreamableHttp.ts
npx tsx src/examples/server/simpleStreamableHttp.ts
```

The server will start on port 3000. You can test the initialization and tool calling:
The server will start on port 3000. You can test the initialization and tool listing:

```bash
# Initialize the server and get the session ID from headers
SESSION_ID=$(curl -X POST \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-H "Accept: text/event-stream" \
-d '{
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"capabilities": {},
"protocolVersion": "2025-03-26",
"clientInfo": {
"name": "test",
"version": "1.0.0"
}
},
"id": "1"
}' \
-i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r')
echo "Session ID: $SESSION_ID"
### Server supporting SSE via GET (`server/standaloneSseWithGetStreamableHttp.ts`)

# Call the greet tool using the saved session ID
curl -X POST \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-H "Accept: text/event-stream" \
-H "mcp-session-id: $SESSION_ID" \
-d '{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "greet",
"arguments": {
"name": "World"
}
},
"id": "2"
}' \
http://localhost:3000/mcp
An MCP server that demonstrates how to support SSE notifications via GET requests using the Streamable HTTP transport with Express. The server dynamically adds resources at regular intervals and supports notifications for resource list changes (server notifications are available through the standalone SSE connection established by GET request).

#### Running the server

```bash
npx tsx src/examples/server/standaloneSseWithGetStreamableHttp.ts
```

Note that in this example, we're using plain JSON response mode by setting `Accept: application/json` header.
The server will start on port 3000 and automatically create new resources every 5 seconds.

### Server (`server/simpleStreamableHttp.ts`)
### Server with JSON response mode (`server/jsonResponseStreamableHttp.ts`)

A simple MCP server that uses the Streamable HTTP transport, implemented with Express. The server provides:
A simple MCP server that uses the Streamable HTTP transport with JSON response mode enabled, implemented with Express. The server provides a simple `greet` tool that returns a greeting for a name.

- A simple `greet` tool that returns a greeting for a name
- A `greeting-template` prompt that generates a greeting template
- A static `greeting-resource` resource
_NOTE: This demonstrates a server that does not use SSE at all. Note that this limits its support for MCP features; for example, it cannot provide logging and progress notifications for tool execution._

#### Running the server

```bash
npx tsx src/examples/server/simpleStreamableHttp.ts
npx tsx src/examples/server/jsonResponseStreamableHttp.ts
```

The server will start on port 3000. You can test the initialization and tool listing:

### Client (`client/simpleStreamableHttp.ts`)

A client that connects to the server, initializes it, and demonstrates how to:

- List available tools and call the `greet` tool
- List available prompts and get the `greeting-template` prompt
- List available resources

#### Running the client

```bash
npx tsx src/examples/client/simpleStreamableHttp.ts
```

Make sure the server is running before starting the client.


### Useful commands for testing

#### Initialize
Streamable HTTP transport requires to do the initialization first.

```bash
# First initialize the server and save the session ID to a variable
Expand All @@ -100,31 +101,37 @@ SESSION_ID=$(curl -X POST \
-i http://localhost:3000/mcp 2>&1 | grep -i "mcp-session-id" | cut -d' ' -f2 | tr -d '\r')
echo "Session ID: $SESSION_ID

```
Once a session is established, we can send POST requests:

#### List tools
```bash
# Then list tools using the saved session ID
curl -X POST -H "Content-Type: application/json" -H "Accept: application/json, text/event-stream" \
-H "mcp-session-id: $SESSION_ID" \
-d '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":"2"}' \
http://localhost:3000/mcp
```

### Client (`client/simpleStreamableHttp.ts`)

A client that connects to the server, initializes it, and demonstrates how to:

- List available tools and call the `greet` tool
- List available prompts and get the `greeting-template` prompt
- List available resources

#### Running the client
#### Call tool

```bash
npx tsx src/examples/client/simpleStreamableHttp.ts
# Call the greet tool using the saved session ID
curl -X POST \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-H "Accept: text/event-stream" \
-H "mcp-session-id: $SESSION_ID" \
-d '{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "greet",
"arguments": {
"name": "World"
}
},
"id": "2"
}' \
http://localhost:3000/mcp
```

Make sure the server is running before starting the client.

## Notes

- These examples demonstrate the basic usage of the Streamable HTTP transport
- The server manages sessions between the calls
- The client handles both direct HTTP responses and SSE streaming responses
Loading