diff --git a/README.md b/README.md index fe4caa3f..65f2336d 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,8 @@ - [Running Your Server](#running-your-server) - [stdio](#stdio) - [HTTP with SSE](#http-with-sse) + - [HTTP on Edge](#http-on-edge) + - [Cloudflare Worker](#cloudflare-worker-with-durable-object) - [Testing and Debugging](#testing-and-debugging) - [Examples](#examples) - [Echo Server](#echo-server) @@ -239,6 +241,97 @@ app.post("/messages", async (req, res) => { app.listen(3001); ``` +### HTTP on Edge + +For Edge based environments (Vercel, Cloudflare Workers) you can use the SSEEdgeTransport +which returns a SSE response and accepts normal HTTP requests. + +For example using Hono inside of a Cloudflare Worker: + +```typescript +import { Hono } from 'hono'; +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { SSEEdgeTransport } from "@modelcontextprotocol/sdk/server/sseEdge.js"; + +const server = new McpServer({ + name: "example-server", + version: "1.0.0" +}); + +// ... set up server resources, tools, and prompts ... + +const app = new Hono(); +app.get('/sse', async (c) => { + const uuid = crypto.randomUUID(); + const transport = new SSEEdgeTransport('/messages', uuid); + return transport.sseResponse +}); + +app.post('/messages', async (c) => { + // Note: to support multiple simultaneous connections, these messages will + // need to be routed to a specific matching transport. (This logic isn't + // implemented here, for simplicity.) + await transport.handlePostMessage(req, res); +}) + +export default { + fetch: app.fetch, +} satisfies ExportedHandler; +``` + +### Cloudflare Worker with Durable Object + +This makes it possible to keep track of MCP Servers with Durable Objects relatively easily. + +```ts +import { McpServer, ResourceTemplate } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { DurableObject } from 'cloudflare:workers'; +import { SSEEdgeTransport } from "@modelcontextprotocol/sdk/server/sseEdge.js"; +import { Hono } from 'hono'; + +export class McpObject extends DurableObject { + private transport?: SSEEdgeTransport; + private server: McpServer; + + constructor(ctx: DurableObjectState, env: any) { + super(ctx, env); + this.server = createMcpServer(); + } + + override async fetch(request: Request) { + const url = new URL(request.url); + // Create the transport if it doesn't exist + if (!this.transport) { + this.transport = new SSEEdgeTransport('/message', this.ctx.id.toString()); + } + + if (request.method === 'GET' && url.pathname.endsWith('/sse')) { + await this.server.connect(this.transport); + return this.transport.sseResponse; + } + + if (request.method === 'POST' && url.pathname.endsWith('/message')) { + return this.transport.handlePostMessage(request); + } + + return new Response('Not found', { status: 404 }); + } +} + +//---------- Define worker +const app = new Hono(); +app.all('*', async (c) => { + const sessionId = c.req.query('sessionId'); + const object = c.env.MCP_OBJECT.get( + sessionId ? c.env.MCP_OBJECT.idFromString(sessionId) : c.env.MCP_OBJECT.newUniqueId(), + ); + return object.fetch(c.req.raw); +}); + +export default { + fetch: app.fetch, +} satisfies ExportedHandler; + ### Testing and Debugging To test your server, you can use the [MCP Inspector](https://github.com/modelcontextprotocol/inspector). See its README for more information. diff --git a/src/server/sseEdge.ts b/src/server/sseEdge.ts new file mode 100644 index 00000000..0fb352e7 --- /dev/null +++ b/src/server/sseEdge.ts @@ -0,0 +1,133 @@ +import { Transport } from '../shared/transport.js'; +import { JSONRPCMessage, JSONRPCMessageSchema } from '../types.js'; + +const MAXIMUM_MESSAGE_SIZE = 4 * 1024 * 1024; // 4MB + +/** + * This transport is compatible with Cloudflare Workers and other edge environments + */ +export class SSEEdgeTransport implements Transport { + private controller: ReadableStreamDefaultController | null = null; + readonly stream: ReadableStream; + private closed = false; + + onclose?: () => void; + onerror?: (error: Error) => void; + onmessage?: (message: JSONRPCMessage) => void; + + /** + * Creates a new EdgeSSETransport, which will direct the MPC client to POST messages to messageUrl + */ + constructor( + private messageUrl: string, + readonly sessionId: string, + ) { + // Create a readable stream for SSE + this.stream = new ReadableStream({ + start: (controller) => { + this.controller = controller; + }, + cancel: () => { + this.closed = true; + this.onclose?.(); + }, + }); + } + + async start(): Promise { + if (this.closed) { + throw new Error( + 'SSE transport already closed! If using Server class, note that connect() calls start() automatically.', + ); + } + + // Make sure the controller exists + if (!this.controller) { + throw new Error('Stream controller not initialized'); + } + + // Send the endpoint event + const endpointMessage = `event: endpoint\ndata: ${encodeURI(this.messageUrl)}?sessionId=${this.sessionId}\n\n`; + this.controller.enqueue(new TextEncoder().encode(endpointMessage)); + } + + get sseResponse(): Response { + // Ensure the stream is properly initialized + if (!this.stream) { + throw new Error('Stream not initialized'); + } + + // Return a response with the SSE stream + return new Response(this.stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }); + } + + /** + * Handles incoming Requests + */ + async handlePostMessage(req: Request): Promise { + if (this.closed || !this.controller) { + const message = 'SSE connection not established'; + return new Response(message, { status: 500 }); + } + + try { + const contentType = req.headers.get('content-type') || ''; + if (!contentType.includes('application/json')) { + throw new Error(`Unsupported content-type: ${contentType}`); + } + + // Check if the request body is too large + const contentLength = parseInt(req.headers.get('content-length') || '0', 10); + if (contentLength > MAXIMUM_MESSAGE_SIZE) { + throw new Error(`Request body too large: ${contentLength} bytes`); + } + + // Clone the request before reading the body to avoid stream issues + const body = await req.json(); + await this.handleMessage(body); + return new Response('Accepted', { status: 202 }); + } catch (error) { + this.onerror?.(error as Error); + return new Response(String(error), { status: 400 }); + } + } + + /** + * Handle a client message, regardless of how it arrived. This can be used to inform the server of messages that arrive via a means different than HTTP POST. + */ + async handleMessage(message: unknown): Promise { + let parsedMessage: JSONRPCMessage; + try { + parsedMessage = JSONRPCMessageSchema.parse(message); + } catch (error) { + this.onerror?.(error as Error); + throw error; + } + + this.onmessage?.(parsedMessage); + } + + async close(): Promise { + if (!this.closed && this.controller) { + this.controller.close(); + this.stream.cancel(); + this.closed = true; + this.onclose?.(); + } + } + + async send(message: JSONRPCMessage): Promise { + if (this.closed || !this.controller) { + throw new Error('Not connected'); + } + + const messageText = `event: message\ndata: ${JSON.stringify(message)}\n\n`; + this.controller.enqueue(new TextEncoder().encode(messageText)); + } +}