Skip to content

Commit 6be96bd

Browse files
Made message handling concurrent
1 parent c8618de commit 6be96bd

File tree

1 file changed

+34
-21
lines changed

1 file changed

+34
-21
lines changed

Diff for: src/mcp/server/lowlevel/server.py

+34-21
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ async def main():
7070
from collections.abc import Awaitable, Callable
7171
from typing import Any, Sequence
7272

73+
import anyio
7374
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
7475
from pydantic import AnyUrl
7576

@@ -434,6 +435,29 @@ async def handler(req: types.CompleteRequest):
434435

435436
return decorator
436437

438+
async def _handle_message(
439+
self,
440+
message: RequestResponder[types.ClientRequest, types.ServerResult]
441+
| types.ClientNotification
442+
| Exception,
443+
session: ServerSession,
444+
raise_exceptions: bool = False,
445+
):
446+
with warnings.catch_warnings(record=True) as w:
447+
match message:
448+
case (
449+
RequestResponder(request=types.ClientRequest(root=req)) as responder
450+
):
451+
with responder:
452+
await self._handle_request(
453+
message, req, session, raise_exceptions
454+
)
455+
case types.ClientNotification(root=notify):
456+
await self._handle_notification(notify)
457+
458+
for warning in w:
459+
logger.info(f"Warning: {warning.category.__name__}: {warning.message}")
460+
437461
async def run(
438462
self,
439463
read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception],
@@ -445,30 +469,19 @@ async def run(
445469
# in-process servers.
446470
raise_exceptions: bool = False,
447471
):
448-
with warnings.catch_warnings(record=True) as w:
449-
async with ServerSession(
450-
read_stream, write_stream, initialization_options
451-
) as session:
472+
async with ServerSession(
473+
read_stream, write_stream, initialization_options
474+
) as session:
475+
async with anyio.create_task_group() as tg:
452476
async for message in session.incoming_messages:
453477
logger.debug(f"Received message: {message}")
454478

455-
match message:
456-
case (
457-
RequestResponder(
458-
request=types.ClientRequest(root=req)
459-
) as responder
460-
):
461-
with responder:
462-
await self._handle_request(
463-
message, req, session, raise_exceptions
464-
)
465-
case types.ClientNotification(root=notify):
466-
await self._handle_notification(notify)
467-
468-
for warning in w:
469-
logger.info(
470-
f"Warning: {warning.category.__name__}: {warning.message}"
471-
)
479+
tg.start_soon(
480+
self._handle_message,
481+
message,
482+
session,
483+
raise_exceptions,
484+
)
472485

473486
async def _handle_request(
474487
self,

0 commit comments

Comments
 (0)