Skip to content

Server implementation of Streamable HTTP transport #266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Apr 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/client/index.test.ts
Original file line number Diff line number Diff line change
@@ -66,6 +66,9 @@ test("should initialize with matching protocol version", async () => {
protocolVersion: LATEST_PROTOCOL_VERSION,
}),
}),
expect.objectContaining({
relatedRequestId: undefined,
}),
);

// Should have the instructions returned
66 changes: 63 additions & 3 deletions src/server/mcp.test.ts
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import {
ListPromptsResultSchema,
GetPromptResultSchema,
CompleteResultSchema,
LoggingMessageNotificationSchema,
} from "../types.js";
import { ResourceTemplate } from "./mcp.js";
import { completable } from "./completable.js";
@@ -85,6 +86,8 @@ describe("ResourceTemplate", () => {
const abortController = new AbortController();
const result = await template.listCallback?.({
signal: abortController.signal,
sendRequest: () => { throw new Error("Not implemented") },
sendNotification: () => { throw new Error("Not implemented") }
});
expect(result?.resources).toHaveLength(1);
expect(list).toHaveBeenCalled();
@@ -318,7 +321,7 @@ describe("tool()", () => {

// This should succeed
mcpServer.tool("tool1", () => ({ content: [] }));

// This should also succeed and not throw about request handlers
mcpServer.tool("tool2", () => ({ content: [] }));
});
@@ -376,6 +379,63 @@ describe("tool()", () => {
expect(receivedSessionId).toBe("test-session-123");
});

test("should provide sendNotification within tool call", async () => {
const mcpServer = new McpServer(
{
name: "test server",
version: "1.0",
},
{ capabilities: { logging: {} } },
);

const client = new Client(
{
name: "test client",
version: "1.0",
},
{
capabilities: {
tools: {},
},
},
);

let receivedLogMessage: string | undefined;
const loggingMessage = "hello here is log message 1";

client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => {
receivedLogMessage = notification.params.data as string;
});

mcpServer.tool("test-tool", async ({ sendNotification }) => {
await sendNotification({ method: "notifications/message", params: { level: "debug", data: loggingMessage } });
return {
content: [
{
type: "text",
text: "Test response",
},
],
};
});

const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair();
await Promise.all([
client.connect(clientTransport),
mcpServer.server.connect(serverTransport),
]);
await client.request(
{
method: "tools/call",
params: {
name: "test-tool",
},
},
CallToolResultSchema,
);
expect(receivedLogMessage).toBe(loggingMessage);
});

test("should allow client to call server tools", async () => {
const mcpServer = new McpServer({
name: "test server",
@@ -815,7 +875,7 @@ describe("resource()", () => {
},
],
}));

// This should also succeed and not throw about request handlers
mcpServer.resource("resource2", "test://resource2", async () => ({
contents: [
@@ -1321,7 +1381,7 @@ describe("prompt()", () => {
},
],
}));

// This should also succeed and not throw about request handlers
mcpServer.prompt("prompt2", async () => ({
messages: [
16 changes: 9 additions & 7 deletions src/server/mcp.ts
Original file line number Diff line number Diff line change
@@ -37,6 +37,8 @@ import {
PromptArgument,
GetPromptResult,
ReadResourceResult,
ServerRequest,
ServerNotification,
} from "../types.js";
import { Completable, CompletableDef } from "./completable.js";
import { UriTemplate, Variables } from "../shared/uriTemplate.js";
@@ -694,9 +696,9 @@ export type ToolCallback<Args extends undefined | ZodRawShape = undefined> =
Args extends ZodRawShape
? (
args: z.objectOutputType<Args, ZodTypeAny>,
extra: RequestHandlerExtra,
extra: RequestHandlerExtra<ServerRequest, ServerNotification>,
) => CallToolResult | Promise<CallToolResult>
: (extra: RequestHandlerExtra) => CallToolResult | Promise<CallToolResult>;
: (extra: RequestHandlerExtra<ServerRequest, ServerNotification>) => CallToolResult | Promise<CallToolResult>;

type RegisteredTool = {
description?: string;
@@ -717,15 +719,15 @@ export type ResourceMetadata = Omit<Resource, "uri" | "name">;
* Callback to list all resources matching a given template.
*/
export type ListResourcesCallback = (
extra: RequestHandlerExtra,
extra: RequestHandlerExtra<ServerRequest, ServerNotification>,
) => ListResourcesResult | Promise<ListResourcesResult>;

/**
* Callback to read a resource at a given URI.
*/
export type ReadResourceCallback = (
uri: URL,
extra: RequestHandlerExtra,
extra: RequestHandlerExtra<ServerRequest, ServerNotification>,
) => ReadResourceResult | Promise<ReadResourceResult>;

type RegisteredResource = {
@@ -740,7 +742,7 @@ type RegisteredResource = {
export type ReadResourceTemplateCallback = (
uri: URL,
variables: Variables,
extra: RequestHandlerExtra,
extra: RequestHandlerExtra<ServerRequest, ServerNotification>,
) => ReadResourceResult | Promise<ReadResourceResult>;

type RegisteredResourceTemplate = {
@@ -760,9 +762,9 @@ export type PromptCallback<
> = Args extends PromptArgsRawShape
? (
args: z.objectOutputType<Args, ZodTypeAny>,
extra: RequestHandlerExtra,
extra: RequestHandlerExtra<ServerRequest, ServerNotification>,
) => GetPromptResult | Promise<GetPromptResult>
: (extra: RequestHandlerExtra) => GetPromptResult | Promise<GetPromptResult>;
: (extra: RequestHandlerExtra<ServerRequest, ServerNotification>) => GetPromptResult | Promise<GetPromptResult>;

type RegisteredPrompt = {
description?: string;
1,224 changes: 1,224 additions & 0 deletions src/server/streamableHttp.test.ts

Large diffs are not rendered by default.

397 changes: 397 additions & 0 deletions src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,397 @@
import { IncomingMessage, ServerResponse } from "node:http";
import { Transport } from "../shared/transport.js";
import { JSONRPCMessage, JSONRPCMessageSchema, RequestId } from "../types.js";
import getRawBody from "raw-body";
import contentType from "content-type";

const MAXIMUM_MESSAGE_SIZE = "4mb";

/**
* Configuration options for StreamableHTTPServerTransport
*/
export interface StreamableHTTPServerTransportOptions {
/**
* Function that generates a session ID for the transport.
* The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash)
*
* Return undefined to disable session management.
*/
sessionIdGenerator: () => string | undefined;



}

/**
* Server transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification.
* It supports both SSE streaming and direct HTTP responses.
*
* Usage example:
*
* ```typescript
* // Stateful mode - server sets the session ID
* const statefulTransport = new StreamableHTTPServerTransport({
* sessionId: randomUUID(),
* });
*
* // Stateless mode - explicitly set session ID to undefined
* const statelessTransport = new StreamableHTTPServerTransport({
* sessionId: undefined,
* });
*
* // Using with pre-parsed request body
* app.post('/mcp', (req, res) => {
* transport.handleRequest(req, res, req.body);
* });
* ```
*
* In stateful mode:
* - Session ID is generated and included in response headers
* - Session ID is always included in initialization responses
* - Requests with invalid session IDs are rejected with 404 Not Found
* - Non-initialization requests without a session ID are rejected with 400 Bad Request
* - State is maintained in-memory (connections, message history)
*
* In stateless mode:
* - Session ID is only included in initialization responses
* - No session validation is performed
*/
export class StreamableHTTPServerTransport implements Transport {
// when sessionId is not set (undefined), it means the transport is in stateless mode
private sessionIdGenerator: () => string | undefined;
private _started: boolean = false;
private _sseResponseMapping: Map<RequestId, ServerResponse> = new Map();
private _initialized: boolean = false;

sessionId?: string | undefined;
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;

constructor(options: StreamableHTTPServerTransportOptions) {
this.sessionIdGenerator = options.sessionIdGenerator;
}

/**
* Starts the transport. This is required by the Transport interface but is a no-op
* for the Streamable HTTP transport as connections are managed per-request.
*/
async start(): Promise<void> {
if (this._started) {
throw new Error("Transport already started");
}
this._started = true;
}

/**
* Handles an incoming HTTP request, whether GET or POST
*/
async handleRequest(req: IncomingMessage, res: ServerResponse, parsedBody?: unknown): Promise<void> {
if (req.method === "POST") {
await this.handlePostRequest(req, res, parsedBody);
} else if (req.method === "DELETE") {
await this.handleDeleteRequest(req, res);
} else {
await this.handleUnsupportedRequest(res);
}
}

/**
* Handles unsupported requests (GET, PUT, PATCH, etc.)
* For now we support only POST and DELETE requests. Support for GET for SSE connections will be added later.
*/
private async handleUnsupportedRequest(res: ServerResponse): Promise<void> {
res.writeHead(405, {
"Allow": "POST, DELETE"
}).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Method not allowed."
},
id: null
}));
}

/**
* Handles POST requests containing JSON-RPC messages
*/
private async handlePostRequest(req: IncomingMessage, res: ServerResponse, parsedBody?: unknown): Promise<void> {
try {
// Validate the Accept header
const acceptHeader = req.headers.accept;
// The client MUST include an Accept header, listing both application/json and text/event-stream as supported content types.
if (!acceptHeader?.includes("application/json") || !acceptHeader.includes("text/event-stream")) {
res.writeHead(406).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Not Acceptable: Client must accept both application/json and text/event-stream"
},
id: null
}));
return;
}

const ct = req.headers["content-type"];
if (!ct || !ct.includes("application/json")) {
res.writeHead(415).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Unsupported Media Type: Content-Type must be application/json"
},
id: null
}));
return;
}

let rawMessage;
if (parsedBody !== undefined) {
rawMessage = parsedBody;
} else {
const parsedCt = contentType.parse(ct);
const body = await getRawBody(req, {
limit: MAXIMUM_MESSAGE_SIZE,
encoding: parsedCt.parameters.charset ?? "utf-8",
});
rawMessage = JSON.parse(body.toString());
}

let messages: JSONRPCMessage[];

// handle batch and single messages
if (Array.isArray(rawMessage)) {
messages = rawMessage.map(msg => JSONRPCMessageSchema.parse(msg));
} else {
messages = [JSONRPCMessageSchema.parse(rawMessage)];
}

// Check if this is an initialization request
// https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/lifecycle/
const isInitializationRequest = messages.some(
msg => 'method' in msg && msg.method === 'initialize'
);
if (isInitializationRequest) {
if (this._initialized) {
res.writeHead(400).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32600,
message: "Invalid Request: Server already initialized"
},
id: null
}));
return;
}
if (messages.length > 1) {
res.writeHead(400).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32600,
message: "Invalid Request: Only one initialization request is allowed"
},
id: null
}));
return;
}
this.sessionId = this.sessionIdGenerator();
Copy link
Contributor

@beaulac beaulac Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will throw if this.sessionIdGenerator is undefined - should it default to a () => undefined stub if it isn't passed in the constructor args?
(or could just change to this.sessionId = this.sessionIdGenerator?.();)

this._initialized = true;
const headers: Record<string, string> = {};

if (this.sessionId !== undefined) {
headers["mcp-session-id"] = this.sessionId;
}

// Process initialization messages before responding
for (const message of messages) {
this.onmessage?.(message);
}

res.writeHead(200, headers).end();
return;
}
// If an Mcp-Session-Id is returned by the server during initialization,
// clients using the Streamable HTTP transport MUST include it
// in the Mcp-Session-Id header on all of their subsequent HTTP requests.
if (!isInitializationRequest && !this.validateSession(req, res)) {
return;
}


// check if it contains requests
const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg);
const hasOnlyNotificationsOrResponses = messages.every(msg =>
('method' in msg && !('id' in msg)) || ('result' in msg || 'error' in msg));
Comment on lines +223 to +225

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these ever both going to be false? (Maybe all errors?) If so we will fail to return a response at all unless I'm missing something.

If they are never both going to be false do you need both?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they are never both going to be false do you need both?

Yes agreed, hasOnlyNotificationsOrResponses could probably be inlined into !hasRequests.
There should be different validation on the messages based on the spec since

The body of the POST request MUST be one of the following:
A single JSON-RPC request, notification, or response
An array batching one or more requests and/or notifications
An array batching one or more responses

is not correctly enforced as-is.


If so we will fail to return a response at all

Yes - this is to satisfy:

If the server accepts the input, the server MUST return HTTP status code 202 Accepted with no body.

(should actually probably be reworded to mention "accepts the input when there are no requests" or something, CTTOI)

and accords with JSON-RPC batch spec

A Response object SHOULD exist for each Request object
...
there SHOULD NOT be any Response objects for notifications
...
If there are no Response objects contained within the Response array as it is to be sent to the client, the server MUST NOT return an empty Array and should return nothing at all.

so res.writeHead(202).end(); below intentionally responds immediately with no body, the server can process the messages without blocking the ServerResponse.

This should be clarified with some additional code comments, imo.


if (hasOnlyNotificationsOrResponses) {
// if it only contains notifications or responses, return 202
res.writeHead(202).end();

// handle each message
for (const message of messages) {
this.onmessage?.(message);
}
} else if (hasRequests) {
const headers: Record<string, string> = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};

// After initialization, always include the session ID if we have one
if (this.sessionId !== undefined) {
headers["mcp-session-id"] = this.sessionId;
}

res.writeHead(200, headers);

// Store the response for this request to send messages back through this connection
// We need to track by request ID to maintain the connection
for (const message of messages) {
if ('method' in message && 'id' in message) {
this._sseResponseMapping.set(message.id, res);
}
}

// handle each message
for (const message of messages) {
this.onmessage?.(message);
}
// The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses
// This will be handled by the send() method when responses are ready
}
} catch (error) {
// return JSON-RPC formatted error
res.writeHead(400).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32700,
message: "Parse error",
data: String(error)
},
id: null
}));
this.onerror?.(error as Error);
}
}

/**
* Handles DELETE requests to terminate sessions
*/
private async handleDeleteRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
if (!this.validateSession(req, res)) {
return;
}
await this.close();
res.writeHead(200).end();
}

/**
* Validates session ID for non-initialization requests
* Returns true if the session is valid, false otherwise
*/
private validateSession(req: IncomingMessage, res: ServerResponse): boolean {
if (!this._initialized) {
// If the server has not been initialized yet, reject all requests
res.writeHead(400).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: Server not initialized"
},
id: null
}));
return false;
}
if (this.sessionId === undefined) {
// If the session ID is not set, the session management is disabled
// and we don't need to validate the session ID
return true;
}
const sessionId = req.headers["mcp-session-id"];

if (!sessionId) {
// Non-initialization requests without a session ID should return 400 Bad Request
res.writeHead(400).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: Mcp-Session-Id header is required"
},
id: null
}));
return false;
} else if (Array.isArray(sessionId)) {
res.writeHead(400).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: Mcp-Session-Id header must be a single value"
},
id: null
}));
return false;
}
else if (sessionId !== this.sessionId) {
// Reject requests with invalid session ID with 404 Not Found
res.writeHead(404).end(JSON.stringify({
jsonrpc: "2.0",
error: {
code: -32001,
message: "Session not found"
},
id: null
}));
return false;
}

return true;
}


async close(): Promise<void> {
// Close all SSE connections
this._sseResponseMapping.forEach((response) => {
response.end();
});
this._sseResponseMapping.clear();
this.onclose?.();
}

async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
const relatedRequestId = options?.relatedRequestId;
// SSE connections are established per POST request, for now we don't support it through the GET
// this will be changed when we implement the GET SSE connection
if (relatedRequestId === undefined) {
throw new Error("relatedRequestId is required for Streamable HTTP transport");
}

const sseResponse = this._sseResponseMapping.get(relatedRequestId);
if (!sseResponse) {
throw new Error(`No SSE connection established for request ID: ${String(relatedRequestId)}`);
}

// Send the message as an SSE event
sseResponse.write(
`event: message\ndata: ${JSON.stringify(message)}\n\n`,
);

// If this is a response message with the same ID as the request, we can check
// if we need to close the stream after sending the response
if ('result' in message || 'error' in message) {
if (message.id === relatedRequestId) {
// This is a response to the original request, we can close the stream
// after sending all related responses
this._sseResponseMapping.delete(relatedRequestId);

// Only close the connection if it's not needed by other requests
const canCloseConnection = ![...this._sseResponseMapping.entries()].some(([id, res]) => res === sseResponse && id !== relatedRequestId);
if (canCloseConnection) {
sseResponse.end();
}
}
}
}

}
80 changes: 59 additions & 21 deletions src/shared/protocol.ts
Original file line number Diff line number Diff line change
@@ -78,22 +78,52 @@ export type RequestOptions = {
* If not specified, there is no maximum total timeout.
*/
maxTotalTimeout?: number;

/**
* May be used to indicate to the transport which incoming request to associate this outgoing request with.
*/
relatedRequestId?: RequestId;
};

/**
* Extra data given to request handlers.
* Options that can be given per notification.
*/
export type RequestHandlerExtra = {
export type NotificationOptions = {
/**
* An abort signal used to communicate if the request was cancelled from the sender's side.
* May be used to indicate to the transport which incoming request to associate this outgoing notification with.
*/
signal: AbortSignal;
relatedRequestId?: RequestId;
}

/**
* The session ID from the transport, if available.
*/
sessionId?: string;
};
/**
* Extra data given to request handlers.
*/
export type RequestHandlerExtra<SendRequestT extends Request,
SendNotificationT extends Notification> = {
/**
* An abort signal used to communicate if the request was cancelled from the sender's side.
*/
signal: AbortSignal;

/**
* The session ID from the transport, if available.
*/
sessionId?: string;

/**
* Sends a notification that relates to the current request being handled.
*
* This is used by certain transports to correctly associate related messages.
*/
sendNotification: (notification: SendNotificationT) => Promise<void>;

/**
* Sends a request that relates to the current request being handled.
*
* This is used by certain transports to correctly associate related messages.
*/
sendRequest: <U extends ZodType<object>>(request: SendRequestT, resultSchema: U, options?: RequestOptions) => Promise<z.infer<U>>;
};

/**
* Information about a request's timeout state
@@ -122,7 +152,7 @@ export abstract class Protocol<
string,
(
request: JSONRPCRequest,
extra: RequestHandlerExtra,
extra: RequestHandlerExtra<SendRequestT, SendNotificationT>,
) => Promise<SendResultT>
> = new Map();
private _requestHandlerAbortControllers: Map<RequestId, AbortController> =
@@ -316,9 +346,14 @@ export abstract class Protocol<
this._requestHandlerAbortControllers.set(request.id, abortController);

// Create extra object with both abort signal and sessionId from transport
const extra: RequestHandlerExtra = {
const extra: RequestHandlerExtra<SendRequestT, SendNotificationT> = {
signal: abortController.signal,
sessionId: this._transport?.sessionId,
sendNotification:
(notification) =>
this.notification(notification, { relatedRequestId: request.id }),
sendRequest: (r, resultSchema, options?) =>
this.request(r, resultSchema, { ...options, relatedRequestId: request.id })
};

// Starting with Promise.resolve() puts any synchronous errors into the monad as well.
@@ -364,7 +399,7 @@ export abstract class Protocol<
private _onprogress(notification: ProgressNotification): void {
const { progressToken, ...params } = notification.params;
const messageId = Number(progressToken);

const handler = this._progressHandlers.get(messageId);
if (!handler) {
this._onerror(new Error(`Received a progress notification for an unknown token: ${JSON.stringify(notification)}`));
@@ -373,7 +408,7 @@ export abstract class Protocol<

const responseHandler = this._responseHandlers.get(messageId);
const timeoutInfo = this._timeoutInfo.get(messageId);

if (timeoutInfo && responseHandler && timeoutInfo.resetTimeoutOnProgress) {
try {
this._resetTimeout(messageId);
@@ -460,6 +495,8 @@ export abstract class Protocol<
resultSchema: T,
options?: RequestOptions,
): Promise<z.infer<T>> {
const { relatedRequestId } = options ?? {};

return new Promise((resolve, reject) => {
if (!this._transport) {
reject(new Error("Not connected"));
@@ -500,7 +537,7 @@ export abstract class Protocol<
requestId: messageId,
reason: String(reason),
},
})
}, { relatedRequestId })
.catch((error) =>
this._onerror(new Error(`Failed to send cancellation: ${error}`)),
);
@@ -538,7 +575,7 @@ export abstract class Protocol<

this._setupTimeout(messageId, timeout, options?.maxTotalTimeout, timeoutHandler, options?.resetTimeoutOnProgress ?? false);

this._transport.send(jsonrpcRequest).catch((error) => {
this._transport.send(jsonrpcRequest, { relatedRequestId }).catch((error) => {
this._cleanupTimeout(messageId);
reject(error);
});
@@ -548,7 +585,7 @@ export abstract class Protocol<
/**
* Emits a notification, which is a one-way message that does not expect a response.
*/
async notification(notification: SendNotificationT): Promise<void> {
async notification(notification: SendNotificationT, options?: NotificationOptions): Promise<void> {
if (!this._transport) {
throw new Error("Not connected");
}
@@ -560,7 +597,7 @@ export abstract class Protocol<
jsonrpc: "2.0",
};

await this._transport.send(jsonrpcNotification);
await this._transport.send(jsonrpcNotification, options);
}

/**
@@ -576,14 +613,15 @@ export abstract class Protocol<
requestSchema: T,
handler: (
request: z.infer<T>,
extra: RequestHandlerExtra,
extra: RequestHandlerExtra<SendRequestT, SendNotificationT>,
) => SendResultT | Promise<SendResultT>,
): void {
const method = requestSchema.shape.method.value;
this.assertRequestHandlerCapability(method);
this._requestHandlers.set(method, (request, extra) =>
Promise.resolve(handler(requestSchema.parse(request), extra)),
);

this._requestHandlers.set(method, (request, extra) => {
return Promise.resolve(handler(requestSchema.parse(request), extra));
});
}

/**
6 changes: 4 additions & 2 deletions src/shared/transport.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JSONRPCMessage } from "../types.js";
import { JSONRPCMessage, RequestId } from "../types.js";

/**
* Describes the minimal contract for a MCP transport that a client or server can communicate over.
@@ -15,8 +15,10 @@ export interface Transport {

/**
* Sends a JSON-RPC message (request or response).
*
* If present, `relatedRequestId` is used to indicate to the transport which incoming request to associate this outgoing message with.
*/
send(message: JSONRPCMessage): Promise<void>;
send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void>;

/**
* Closes the connection.