Skip to content

StreamableHttp - Server transport with state management #553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions examples/servers/simple-streamablehttp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# MCP Simple StreamableHttp Server Example

A simple MCP server example demonstrating the StreamableHttp transport, which enables HTTP-based communication with MCP servers using streaming.

## Features

- Uses the StreamableHTTP transport for server-client communication
- Task management with anyio task groups
- Ability to send multiple notifications over time to the client
- Proper resource cleanup and lifespan management

## Usage

Start the server on the default or custom port:

```bash

# Using custom port
uv run mcp-simple-streamablehttp --port 3000

# Custom logging level
uv run mcp-simple-streamablehttp --log-level DEBUG

# Enable JSON responses instead of SSE streams
uv run mcp-simple-streamablehttp --json-response
```

The server exposes a tool named "start-notification-stream" that accepts three arguments:

- `interval`: Time between notifications in seconds (e.g., 1.0)
- `count`: Number of notifications to send (e.g., 5)
- `caller`: Identifier string for the caller

## Client

You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use (Inspector)[https://github.com/modelcontextprotocol/inspector]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .server import main

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import contextlib
import logging
from http import HTTPStatus
from uuid import uuid4

import anyio
import click
import mcp.types as types
from mcp.server.lowlevel import Server
from mcp.server.streamableHttp import (
MCP_SESSION_ID_HEADER,
StreamableHTTPServerTransport,
)
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Mount

# Configure logging
logger = logging.getLogger(__name__)

# Global task group that will be initialized in the lifespan
task_group = None


@contextlib.asynccontextmanager
async def lifespan(app):
"""Application lifespan context manager for managing task group."""
global task_group

async with anyio.create_task_group() as tg:
task_group = tg
logger.info("Application started, task group initialized!")
try:
yield
finally:
logger.info("Application shutting down, cleaning up resources...")
if task_group:
tg.cancel_scope.cancel()
task_group = None
logger.info("Resources cleaned up successfully.")


@click.command()
@click.option("--port", default=3000, help="Port to listen on for HTTP")
@click.option(
"--log-level",
default="INFO",
help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
)
@click.option(
"--json-response",
is_flag=True,
default=False,
help="Enable JSON responses instead of SSE streams",
)
def main(
port: int,
log_level: str,
json_response: bool,
) -> int:
# Configure logging
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)

app = Server("mcp-streamable-http-demo")

@app.call_tool()
async def call_tool(
name: str, arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
ctx = app.request_context
interval = arguments.get("interval", 1.0)
count = arguments.get("count", 5)
caller = arguments.get("caller", "unknown")

# Send the specified number of notifications with the given interval
for i in range(count):
await ctx.session.send_log_message(
level="info",
data=f"Notification {i+1}/{count} from caller: {caller}",
logger="notification_stream",
related_request_id=ctx.request_id,
)
if i < count - 1: # Don't wait after the last notification
await anyio.sleep(interval)

return [
types.TextContent(
type="text",
text=(
f"Sent {count} notifications with {interval}s interval"
f" for caller: {caller}"
),
)
]

@app.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="start-notification-stream",
description=(
"Sends a stream of notifications with configurable count"
" and interval"
),
inputSchema={
"type": "object",
"required": ["interval", "count", "caller"],
"properties": {
"interval": {
"type": "number",
"description": "Interval between notifications in seconds",
},
"count": {
"type": "number",
"description": "Number of notifications to send",
},
"caller": {
"type": "string",
"description": (
"Identifier of the caller to include in notifications"
),
},
},
},
)
]

# We need to store the server instances between requests
server_instances = {}
# Lock to prevent race conditions when creating new sessions
session_creation_lock = anyio.Lock()

# ASGI handler for streamable HTTP connections
async def handle_streamable_http(scope, receive, send):
request = Request(scope, receive)
request_mcp_session_id = request.headers.get(MCP_SESSION_ID_HEADER)
if (
request_mcp_session_id is not None
and request_mcp_session_id in server_instances
):
transport = server_instances[request_mcp_session_id]
logger.debug("Session already exists, handling request directly")
await transport.handle_request(scope, receive, send)
elif request_mcp_session_id is None:
# try to establish new session
logger.debug("Creating new transport")
# Use lock to prevent race conditions when creating new sessions
async with session_creation_lock:
new_session_id = uuid4().hex
http_transport = StreamableHTTPServerTransport(
mcp_session_id=new_session_id,
is_json_response_enabled=json_response,
)
server_instances[http_transport.mcp_session_id] = http_transport
async with http_transport.connect() as streams:
read_stream, write_stream = streams

async def run_server():
await app.run(
read_stream,
write_stream,
app.create_initialization_options(),
)

if not task_group:
raise RuntimeError("Task group is not initialized")

task_group.start_soon(run_server)

# Handle the HTTP request and return the response
await http_transport.handle_request(scope, receive, send)
else:
response = Response(
"Bad Request: No valid session ID provided",
status_code=HTTPStatus.BAD_REQUEST,
)
await response(scope, receive, send)

# Create an ASGI application using the transport
starlette_app = Starlette(
debug=True,
routes=[
Mount("/mcp", app=handle_streamable_http),
],
lifespan=lifespan,
)

import uvicorn

uvicorn.run(starlette_app, host="0.0.0.0", port=port)

return 0
36 changes: 36 additions & 0 deletions examples/servers/simple-streamablehttp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[project]
name = "mcp-simple-streamablehttp"
version = "0.1.0"
description = "A simple MCP server exposing a StreamableHttp transport for testing"
readme = "README.md"
requires-python = ">=3.10"
authors = [{ name = "Anthropic, PBC." }]
keywords = ["mcp", "llm", "automation", "web", "fetch", "http", "streamable"]
license = { text = "MIT" }
dependencies = ["anyio>=4.5", "click>=8.1.0", "httpx>=0.27", "mcp", "starlette", "uvicorn"]

[project.scripts]
mcp-simple-streamablehttp = "mcp_simple_streamablehttp.server:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["mcp_simple_streamablehttp"]

[tool.pyright]
include = ["mcp_simple_streamablehttp"]
venvPath = "."
venv = ".venv"

[tool.ruff.lint]
select = ["E", "F", "I"]
ignore = []

[tool.ruff]
line-length = 88
target-version = "py310"

[tool.uv]
dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"]
18 changes: 14 additions & 4 deletions src/mcp/server/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ async def _received_notification(
)

async def send_log_message(
self, level: types.LoggingLevel, data: Any, logger: str | None = None
self,
level: types.LoggingLevel,
data: Any,
logger: str | None = None,
related_request_id: types.RequestId | None = None,
) -> None:
"""Send a log message notification."""
await self.send_notification(
Expand All @@ -192,7 +196,8 @@ async def send_log_message(
logger=logger,
),
)
)
),
related_request_id,
)

async def send_resource_updated(self, uri: AnyUrl) -> None:
Expand Down Expand Up @@ -261,7 +266,11 @@ async def send_ping(self) -> types.EmptyResult:
)

async def send_progress_notification(
self, progress_token: str | int, progress: float, total: float | None = None
self,
progress_token: str | int,
progress: float,
total: float | None = None,
related_request_id: str | None = None,
) -> None:
"""Send a progress notification."""
await self.send_notification(
Expand All @@ -274,7 +283,8 @@ async def send_progress_notification(
total=total,
),
)
)
),
related_request_id,
)

async def send_resource_list_changed(self) -> None:
Expand Down
Loading
Loading