Skip to content

Commit 1c4a896

Browse files
committed
test: add tests for server lifespan support
Adds comprehensive tests for lifespan functionality: - Tests for both low-level Server and FastMCP classes - Coverage for startup, shutdown, and context access - Verifies context passing to request handlers 🤖 Generated with Claude CLI. Co-Authored-By: Claude <[email protected]>
1 parent 2d8a1eb commit 1c4a896

File tree

2 files changed

+211
-1
lines changed

2 files changed

+211
-1
lines changed

tests/issues/test_176_progress_token.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ async def test_progress_token_zero_first_call():
2020
mock_meta.progressToken = 0 # This is the key test case - token is 0
2121

2222
request_context = RequestContext(
23-
request_id="test-request", session=mock_session, meta=mock_meta
23+
request_id="test-request",
24+
session=mock_session,
25+
meta=mock_meta,
26+
lifespan_context=None,
2427
)
2528

2629
# Create context with our mocks

tests/server/test_lifespan.py

+207
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
"""Tests for lifespan functionality in both low-level and FastMCP servers."""
2+
3+
from contextlib import asynccontextmanager
4+
from typing import AsyncIterator
5+
6+
import anyio
7+
import pytest
8+
from pydantic import TypeAdapter
9+
10+
from mcp.server.fastmcp import Context, FastMCP
11+
from mcp.server.lowlevel.server import NotificationOptions, Server
12+
from mcp.server.models import InitializationOptions
13+
from mcp.types import (
14+
ClientCapabilities,
15+
Implementation,
16+
InitializeRequestParams,
17+
JSONRPCMessage,
18+
JSONRPCNotification,
19+
JSONRPCRequest,
20+
)
21+
22+
23+
@pytest.mark.anyio
24+
async def test_lowlevel_server_lifespan():
25+
"""Test that lifespan works in low-level server."""
26+
27+
@asynccontextmanager
28+
async def test_lifespan(server: Server) -> AsyncIterator[dict]:
29+
"""Test lifespan context that tracks startup/shutdown."""
30+
context = {"started": False, "shutdown": False}
31+
try:
32+
context["started"] = True
33+
yield context
34+
finally:
35+
context["shutdown"] = True
36+
37+
server = Server("test", lifespan=test_lifespan)
38+
39+
# Create memory streams for testing
40+
send_stream1, receive_stream1 = anyio.create_memory_object_stream(100)
41+
send_stream2, receive_stream2 = anyio.create_memory_object_stream(100)
42+
43+
# Create a tool that accesses lifespan context
44+
@server.call_tool()
45+
async def check_lifespan(name: str, arguments: dict) -> list:
46+
ctx = server.request_context
47+
assert isinstance(ctx.lifespan_context, dict)
48+
assert ctx.lifespan_context["started"]
49+
assert not ctx.lifespan_context["shutdown"]
50+
return [{"type": "text", "text": "true"}]
51+
52+
# Run server in background task
53+
async with anyio.create_task_group() as tg:
54+
55+
async def run_server():
56+
await server.run(
57+
receive_stream1,
58+
send_stream2,
59+
InitializationOptions(
60+
server_name="test",
61+
server_version="0.1.0",
62+
capabilities=server.get_capabilities(
63+
notification_options=NotificationOptions(),
64+
experimental_capabilities={},
65+
),
66+
),
67+
raise_exceptions=True,
68+
)
69+
70+
tg.start_soon(run_server)
71+
72+
# Initialize the server
73+
params = InitializeRequestParams(
74+
protocolVersion="2024-11-05",
75+
capabilities=ClientCapabilities(),
76+
clientInfo=Implementation(name="test-client", version="0.1.0"),
77+
)
78+
await send_stream1.send(
79+
JSONRPCMessage(
80+
root=JSONRPCRequest(
81+
jsonrpc="2.0",
82+
id=1,
83+
method="initialize",
84+
params=TypeAdapter(InitializeRequestParams).dump_python(params),
85+
)
86+
)
87+
)
88+
response = await receive_stream2.receive()
89+
90+
# Send initialized notification
91+
await send_stream1.send(
92+
JSONRPCMessage(
93+
root=JSONRPCNotification(
94+
jsonrpc="2.0",
95+
method="notifications/initialized",
96+
)
97+
)
98+
)
99+
100+
# Call the tool to verify lifespan context
101+
await send_stream1.send(
102+
JSONRPCMessage(
103+
root=JSONRPCRequest(
104+
jsonrpc="2.0",
105+
id=2,
106+
method="tools/call",
107+
params={"name": "check_lifespan", "arguments": {}},
108+
)
109+
)
110+
)
111+
112+
# Get response and verify
113+
response = await receive_stream2.receive()
114+
assert response.root.result["content"][0]["text"] == "true"
115+
116+
# Cancel server task
117+
tg.cancel_scope.cancel()
118+
119+
120+
@pytest.mark.anyio
121+
async def test_fastmcp_server_lifespan():
122+
"""Test that lifespan works in FastMCP server."""
123+
124+
@asynccontextmanager
125+
async def test_lifespan(server: FastMCP) -> AsyncIterator[dict]:
126+
"""Test lifespan context that tracks startup/shutdown."""
127+
context = {"started": False, "shutdown": False}
128+
try:
129+
context["started"] = True
130+
yield context
131+
finally:
132+
context["shutdown"] = True
133+
134+
server = FastMCP("test", lifespan=test_lifespan)
135+
136+
# Create memory streams for testing
137+
send_stream1, receive_stream1 = anyio.create_memory_object_stream(100)
138+
send_stream2, receive_stream2 = anyio.create_memory_object_stream(100)
139+
140+
# Add a tool that checks lifespan context
141+
@server.tool()
142+
def check_lifespan(ctx: Context) -> bool:
143+
"""Tool that checks lifespan context."""
144+
assert isinstance(ctx.request_context.lifespan_context, dict)
145+
assert ctx.request_context.lifespan_context["started"]
146+
assert not ctx.request_context.lifespan_context["shutdown"]
147+
return True
148+
149+
# Run server in background task
150+
async with anyio.create_task_group() as tg:
151+
152+
async def run_server():
153+
await server._mcp_server.run(
154+
receive_stream1,
155+
send_stream2,
156+
server._mcp_server.create_initialization_options(),
157+
raise_exceptions=True,
158+
)
159+
160+
tg.start_soon(run_server)
161+
162+
# Initialize the server
163+
params = InitializeRequestParams(
164+
protocolVersion="2024-11-05",
165+
capabilities=ClientCapabilities(),
166+
clientInfo=Implementation(name="test-client", version="0.1.0"),
167+
)
168+
await send_stream1.send(
169+
JSONRPCMessage(
170+
root=JSONRPCRequest(
171+
jsonrpc="2.0",
172+
id=1,
173+
method="initialize",
174+
params=TypeAdapter(InitializeRequestParams).dump_python(params),
175+
)
176+
)
177+
)
178+
response = await receive_stream2.receive()
179+
180+
# Send initialized notification
181+
await send_stream1.send(
182+
JSONRPCMessage(
183+
root=JSONRPCNotification(
184+
jsonrpc="2.0",
185+
method="notifications/initialized",
186+
)
187+
)
188+
)
189+
190+
# Call the tool to verify lifespan context
191+
await send_stream1.send(
192+
JSONRPCMessage(
193+
root=JSONRPCRequest(
194+
jsonrpc="2.0",
195+
id=2,
196+
method="tools/call",
197+
params={"name": "check_lifespan", "arguments": {}},
198+
)
199+
)
200+
)
201+
202+
# Get response and verify
203+
response = await receive_stream2.receive()
204+
assert response.root.result["content"][0]["text"] == "true"
205+
206+
# Cancel server task
207+
tg.cancel_scope.cancel()

0 commit comments

Comments
 (0)