Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #201: Move incoming message stream from BaseSession to ServerSession #325

Merged
merged 3 commits into from
Mar 24, 2025

Conversation

dsp-ant
Copy link
Member

@dsp-ant dsp-ant commented Mar 19, 2025

This PR fixes a nasty but in the way we handle incoming messages, which currently leads to hanging connections when the clients sends notifications.

The observed issue is as follows:

  • A user creates a ClientSession and connects to a server.
  • Server sends messages such as logging events.
  • Upon then next request from the client, the client await hangs.

When I looked into this, I suspected that this must be some channel issues. We use in memory channels to handle messages and it has historically caused issues. In particular, we use channels with a max_lenght of 0, effectively making them non buffered. This is one, if not the only part of the codebase that can block as observed.

So I looked into the two parts where we use channels, once for read and write stream from the sse/stdio implementation to the ClientSession and then within the shared BaseSession class to handle messages separately in the server and client session.

I first changed in client/stdio.py the lines:

    read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
    write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

to

    read_stream_writer, read_stream = anyio.create_memory_object_stream(math.inf)
    write_stream, write_stream_reader = anyio.create_memory_object_stream(math.inf)

and tested again. This fixed the bug! However clearly we are now in bounded memory stream territory which we cannot do without growing buffers in memory linearly. I tested da bit more and boiled it down to

   read_stream_writer, read_stream = anyio.create_memory_object_stream(math.inf)

fixing the issue. So we know something is funky in the read stream.

I looked into @sheffler's issue #201 and PR #202 and confirmed that removing in src/mcp/shared/session.py

                            await self._incoming_message_stream_writer.send(
                                notification
                            )

will fix the probelm as well. So what does it tell us? It tells us that is that await self._incoming_message_stream_writer is blocking. Which in turn tells us that nobody ever reads from self._incoming_message_stream_reader. So where is self._incoming_message_stream_reader used? In BaseSession.incoming_messages.

Now we have theory:

  1. incoming_messages is used to read incoming messages on the server.
  2. But it's in the BaseSession, so it means the client also offers it.
  3. Most clients don't read client.incoming_messages.
  4. IF clients don't read client.incoming_messages, the unbuffered stream never empties and once one object is placed, the next write will block.

Now that we have the culprit we can figure out what to do. In my mind the probelm happens because we can't guarantee a consumer of incoming messages, however some implementations might want it. Similarly on the server side, incoming_messages is always used, since ServerSession is used inside Server.

Okay so the goal was then to:

  • Make consumption of incoming messages optionally for clients
  • Make consumption of incoming message required for servers.

So on the server side incoming_messages is fine, but the client needs a way to react if the developer is interested in. This means the approach of providing it via incoming_messages doesnt work. After thinking of ways to automatically drain, I came up what I think is best:

  1. The client offers a callback for all messages. This fits into the client interface using callbacks already.
  2. The server continues to expose incoming_messages.
  3. The BaseSession offers an abstraction for ClientSession and ServerSession to observe events from the _receive_loop but determine what to do by themselve.

Fixes GitHub issue #201 by moving the incoming message stream and related methods from BaseSession to ServerSession where they are actually needed. This change follows the principle of having functionality only where it's required.

GitHub-Issue:#201

🤖 Generated with [Claude Code](https://claude.ai/code)
@dsp-ant dsp-ant requested review from Kludex, jerome3o-anthropic and jspahrsummers and removed request for Kludex and jerome3o-anthropic March 19, 2025 22:11
This change adds a message_handler callback to ClientSession to allow for direct handling of incoming messages instead of requiring an async iterator. The change simplifies the client code by removing the need for a separate receive loop task.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
@dsp-ant dsp-ant mentioned this pull request Mar 20, 2025
6 tasks
@sheffler
Copy link
Contributor

Good explanation, and it makes sense.

This is the kind of subtle error that can be illustrated in an end-to-end test. Without a clear specification as to the behavior of the client, and whether it reads incoming_messages, then an implementation is free to do what it wants. In this case, the implementation I showed didn't read them, and there was the deadlock.

I think the callback pattern will work nicely.

@j-z10
Copy link

j-z10 commented Mar 21, 2025

We've also encountered this issue, and have to create a background task to consume the session.incoming_messages at the client side . Hope this can be merged quickly.

@dsp-ant dsp-ant requested a review from Kludex March 21, 2025 12:51
Copy link
Member

@Kludex Kludex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between _logging_callback and _message_handler? Seems those concepts are similar?

@Kludex
Copy link
Member

Kludex commented Mar 22, 2025

There was a lack of MRE in all references provided, so here it is:

import asyncio

from mcp.server.fastmcp import Context, FastMCP
from mcp.shared.memory import create_connected_server_and_client_session

app = FastMCP()


@app.tool(name="echo")
async def echo(message: str, context: Context) -> str:
    await context.info(f"echo: {message}")  # << HERE
    return message


async def main():
    server = app._mcp_server  # type: ignore
    async with create_connected_server_and_client_session(server=server) as client:
        await client.initialize()

        result = await client.call_tool("echo", {"message": "Hello, world!"})
        print(result.model_dump())


if __name__ == "__main__":
    asyncio.run(main())

@dsp-ant
Copy link
Member Author

dsp-ant commented Mar 24, 2025

What's the difference between _logging_callback and _message_handler? Seems those concepts are similar?

one is specific to logging, one handles all messages, incase you want do run custom messages or just intercept all messages

@dsp-ant dsp-ant merged commit 568cbd1 into main Mar 24, 2025
6 checks passed
@dsp-ant dsp-ant deleted the fix/201 branch March 24, 2025 14:14
@tanhaipeng
Copy link

very good, hope to update the version soon

@jkawamoto
Copy link

After commit 568cbd1, my end-to-end test #294 is getting stuck:
https://github.com/modelcontextprotocol/python-sdk/actions/runs/14043734979.

(It was working correctly with commit 9ae4df8:
https://github.com/modelcontextprotocol/python-sdk/actions/runs/14001749703)

I suspect something might be missing in commit 568cbd1.

#361 fixes my hanging issue so far.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants