Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: StreamableHTTPServerTransport implement #230

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

gylove1994
Copy link

Motivation and Context

To support streamable-http in mcp-2025-03-26

How Has This Been Tested?

The StreamableHTTPServerTransport has been extensively tested with the following scenarios:

  1. Session Management
  • Session ID generation and validation
  • Session ID inclusion in response headers
  • Rejection of invalid session IDs
  1. Request Handling
  • GET requests with and without proper Accept headers
  • POST requests for JSON-RPC messages
  • Handling of batch messages
  • DELETE requests for session closure
  • Validation of Content-Type headers
  1. Message Replay
  • Message replay functionality using Last-Event-ID
  • Proper message sequencing and delivery
  1. Message Targeting
  • Correct routing of response messages to requesting connections
  • Verification that responses aren't broadcast to all connections
  1. Error Handling
  • Invalid JSON data processing
  • Invalid JSON-RPC message handling
  • Proper error response formatting

All tests use mock HTTP requests and responses to simulate the StreamableHTTP specification.

Breaking Changes

None

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

Related issues: #220

@wjw99830
Copy link

Is it proper to maintain id in transport layer? It seems like a part of an application layer.

@gylove1994
Copy link
Author

Session Management is part of the specification. See https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/transports/#session-management

Copy link
Contributor

@cliffhall cliffhall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use lowercase header names.

expect(mockResponse.writeHead).toHaveBeenCalledWith(
200,
expect.objectContaining({
"Mcp-Session-Id": transport.sessionId,
Copy link
Contributor

@cliffhall cliffhall Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using lower case for the header names throughout. In this file, I see a mix of "Mcp-Session-Id" and "mcp-session-id".

Unfortunately the spec says "Mcp-Session-Id" and we could send this, expecting clients to force to lower case. Since we are testing in a case sensitive way some test results which pass are questionable.

Regarding the case-sensitivity of header names:

Screenshot 2025-03-31 at 4 53 07 PM

*/
async handleRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
// validate the session ID
const sessionId = req.headers["mcp-session-id"];
Copy link
Contributor

@cliffhall cliffhall Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a mix of upper and lower case for 'Mcp-Session-Id' in this file as well. I believe this is problematic and we should be using lowercase throughout.

Our tests are acting as the client, which must ignore the case but aren't. We can eliminate any potential issues by using all lower case in the server and the tests.

@gylove1994 gylove1994 requested a review from cliffhall April 1, 2025 03:05
Comment on lines 59 to 75
it("should include session ID in response headers", async () => {
const req = createMockRequest({
method: "GET",
headers: {
accept: "text/event-stream"
},
});

await transport.handleRequest(req, mockResponse);

expect(mockResponse.writeHead).toHaveBeenCalledWith(
200,
expect.objectContaining({
"mcp-session-id": transport.sessionId,
})
);
});

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came here hoping to see how this will be configured for stateless operation. Is that not intended to be part of this PR?

If it is in scope for this PR, it would be helpful if the various options for state management as described in the spec were demonstrated in the tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to add a option in constructor to support stateless like:

Usage example:

// Stateful mode (default) - with session management
const statefulTransport = new StreamableHTTPServerTransport("/mcp");

// Stateless mode - without session management
const statelessTransport = new StreamableHTTPServerTransport("/mcp", {
  enableSessionManagement: false
});

In stateful mode:

  • Session ID is generated and included in response headers
  • Requests with invalid session IDs are rejected with 404 Not Found
  • Non-initialization requests without a session ID are rejected with 400 Bad Request
  • State is maintained in-memory (connections, message history)

In stateless mode:

  • No session ID is included in response headers
  • No session validation is performed

Comment on lines 77 to 91
it("should reject invalid session ID", async () => {
const req = createMockRequest({
method: "GET",
headers: {
"mcp-session-id": "invalid-session-id",
},
});

await transport.handleRequest(req, mockResponse);

expect(mockResponse.writeHead).toHaveBeenCalledWith(404);
// check if the error response is a valid JSON-RPC error format
expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"jsonrpc":"2.0"'));
expect(mockResponse.end).toHaveBeenCalledWith(expect.stringContaining('"error"'));
});

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless of whether statelessness is in scope or not, I assume this implementation will follow this advice? If so, I think it would be good to have a test case for it:

  • If an Mcp-Session-Id is returned by the server during initialization, clients using the Streamable HTTP transport MUST include it in the Mcp-Session-Id header on all of their subsequent HTTP requests.
    • Servers that require a session ID SHOULD respond to requests without an Mcp-Session-Id header (other than initialization) with HTTP 400 Bad Request.

}

const parsedCt = contentType.parse(ct);
const body = await getRawBody(req, {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we consider accepting parsed request body here just like: https://github.com/modelcontextprotocol/typescript-sdk/blob/main/src/server/sse.ts#L68

In some cases the MCP Server lives in another application server, and that application server may have global body parsers in place.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it should be able work like:

app.post('/mcp', (req, res) => {
   transport.handleRequest(req, res, req.body);
});

@gylove1994 gylove1994 requested a review from wangshijun April 2, 2025 01:06
Comment on lines 178 to 182
const headers: Record<string, string> = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly there isn't any ability to customize the headers returned from these responses. Our use-case requires setting the "Access-Control-Allow-Origin" CORS header. This might make for another good option to add.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I add a new option customHeaders and it should be able to work for your case:

/**
 * Configuration options for StreamableHTTPServerTransport
 */
export interface StreamableHTTPServerTransportOptions {
  /**
   * Whether to enable session management through mcp-session-id headers
   * When set to false, the transport operates in stateless mode without session validation
   * @default true
   */
  enableSessionManagement?: boolean;

  /**
   * Custom headers to be included in all responses
   * These headers will be added to both SSE and regular HTTP responses
   */
  customHeaders?: Record<string, string>;
}
const transport = new StreamableHTTPServerTransport("/mcp", {
  customHeaders: {
    "X-Custom-Header": "custom-value",
    "Access-Control-Allow-Origin": "*",
    // Or something you want to add to header
  }
});

Comment on lines +349 to +368
// use direct JSON response
const headers: Record<string, string> = {
...this._customHeaders,
"Content-Type": "application/json",
};

// Only include session ID header if session management is enabled
// Always include session ID for initialization requests
if (this._enableSessionManagement || isInitializationRequest) {
headers["mcp-session-id"] = this._sessionId;
}

res.writeHead(200, headers);

// handle each message
for (const message of messages) {
this.onmessage?.(message);
}

res.end();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case how do the responses get back to the user if there are no SSE connections? I could be misreading something, but it looks like it would throw Error("No active connections");

Do we need this case? The spec seems to indicate that the client has to accept either:

If the input contains any number of JSON-RPC requests, the server MUST either return Content-Type: text/event-stream, to initiate an SSE stream, or Content-Type: application/json, to return one JSON object. The client MUST support both these cases.
https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/transports/#sending-messages-to-the-server

Copy link

@beaulac beaulac Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been playing with this, trying to use the stateless JSON (non-SSE) approach and ran into this exact problem.

Since the response needs to be a single JSON array, it seems the transport would need to record how many JsonRPC request-type were sent and build up an array to respond with, since the response stream should not be .end()ed until the full array is built + sent.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the response needs to be a single JSON array

I'm not sure if the spec allows an array of JSON here. Emphasis mine:

the server MUST either return Content-Type: text/event-stream, to initiate an SSE stream, or Content-Type: application/json, to return one JSON object.

I think the answer is probably to always return a SSE stream, even if there is just one incoming request. There is only one response, but the server might want to return progress notifications, and that is not possible with a simple JSON response.

I think this exists as an option in the spec so that it can degrade into essentially RPC req / res at the most simple if a service can't support SSE for some reason.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. The current send() method has issues with properly directing data to specific clients and lacks JSON
response support. I'm currently working on refactoring this part of the implementation.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to return one JSON object

I would interpret "one JSON object" constraint as satisfied by a JSON array, especially since the spec mentions and links to JSON-RPC batching in the non-SSE case. Batched JSON-RPC responses are grouped in an array, as per JSON-RPC spec (emphasis mine):

To send several Request objects at the same time, the Client MAY send an Array filled with Request objects.
The Server should respond with an Array containing the corresponding Response objects, after all of the batch Request objects have been processed.

Otherwise, it wouldn't be possible for a server to properly respond to batch requests without switching to SSE, which seems like an unintended limitation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary issue I'm facing is how to pair requests with the "send" method while follow the Transport interface. Currently, there seems to be no appropriate way to specify which client should be sent to. If this can be resolved, returning one JSON response would quickly solve too.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would interpret "one JSON object" constraint as satisfied by a JSON array, especially since the spec mentions and links to JSON-RPC batching in the non-SSE case.

I missed this, and think you are correct. Thank you for the pointer!

Comment on lines +60 to +79
* In stateful mode:
* - Session ID is generated and included in response headers
* - Session ID is always included in initialization responses
* - Requests with invalid session IDs are rejected with 404 Not Found
* - Non-initialization requests without a session ID are rejected with 400 Bad Request
* - State is maintained in-memory (connections, message history)
*
* In stateless mode:
* - Session ID is only included in initialization responses
* - No session validation is performed
*/
export class StreamableHTTPServerTransport implements Transport {
private _connections: Map<string, StreamConnection> = new Map();
private _sessionId: string;
private _messageHistory: Map<string, {
message: JSONRPCMessage;
connectionId?: string; // record which connection the message should be sent to
}> = new Map();
private _started: boolean = false;
private _requestConnections: Map<string, string> = new Map(); // request ID to connection ID mapping
Copy link

@jmorrell-cloudflare jmorrell-cloudflare Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By storing these in-memory we are restricting this transport to work only with single-node deployments. We would need to store this information in an external data store to enable it to scale beyond one instance in stateful mode. We might be able to make this customizable by implementing interfaces / adapters for Postgres / Redis / etc.

If we are okay with that single node restriction, I think it needs to be documented as this will make it untenable for many organizations (though still quite useful for anyone comfortable with a single node).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a wise decision. Since the changes are far more extensive than I initially anticipated, I'm currently refactoring the implementation, which might take a bit longer than expected.

@cliffhall cliffhall added enhancement New feature or request waiting on submitter Waiting for the submitter to provide more info labels Apr 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request waiting on submitter Waiting for the submitter to provide more info
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants