diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java b/mcp/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java index 6a7d2903..a8b980e9 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java @@ -93,7 +93,9 @@ public StdioServerTransportProvider(ObjectMapper objectMapper, InputStream input @Override public void setSessionFactory(McpServerSession.Factory sessionFactory) { // Create a single session for the stdio connection - this.session = sessionFactory.create(new StdioMcpSessionTransport()); + var transport = new StdioMcpSessionTransport(); + this.session = sessionFactory.create(transport); + transport.initProcessing(); } @Override @@ -142,10 +144,6 @@ public StdioMcpSessionTransport() { "stdio-inbound"); this.outboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "stdio-outbound"); - - handleIncomingMessages(); - startInboundProcessing(); - startOutboundProcessing(); } @Override @@ -181,6 +179,12 @@ public void close() { logger.debug("Session transport closed"); } + private void initProcessing() { + handleIncomingMessages(); + startInboundProcessing(); + startOutboundProcessing(); + } + private void handleIncomingMessages() { this.inboundSink.asFlux().flatMap(message -> session.handle(message)).doOnTerminate(() -> { // The outbound processing will dispose its scheduler upon completion