From 9140dcb2527b211cc159b6916b5d23670d09df6e Mon Sep 17 00:00:00 2001 From: Christian Tzolov <christian.tzolov@broadcom.com> Date: Wed, 26 Mar 2025 11:52:47 +0100 Subject: [PATCH] refactor(server): Fi StdioServerTransportProvider initialization flow Extract message processing initialization from StdioMcpSessionTransport constructor into a separate initProcessing() method. Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com> --- .../transport/StdioServerTransportProvider.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java b/mcp/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java index 6a7d2903..a8b980e9 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java @@ -93,7 +93,9 @@ public StdioServerTransportProvider(ObjectMapper objectMapper, InputStream input @Override public void setSessionFactory(McpServerSession.Factory sessionFactory) { // Create a single session for the stdio connection - this.session = sessionFactory.create(new StdioMcpSessionTransport()); + var transport = new StdioMcpSessionTransport(); + this.session = sessionFactory.create(transport); + transport.initProcessing(); } @Override @@ -142,10 +144,6 @@ public StdioMcpSessionTransport() { "stdio-inbound"); this.outboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "stdio-outbound"); - - handleIncomingMessages(); - startInboundProcessing(); - startOutboundProcessing(); } @Override @@ -181,6 +179,12 @@ public void close() { logger.debug("Session transport closed"); } + private void initProcessing() { + handleIncomingMessages(); + startInboundProcessing(); + startOutboundProcessing(); + } + private void handleIncomingMessages() { this.inboundSink.asFlux().flatMap(message -> session.handle(message)).doOnTerminate(() -> { // The outbound processing will dispose its scheduler upon completion