From 8ceb495252553d907edd314c2066544645602be8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?=
Date: Wed, 5 Mar 2025 12:03:58 +0100
Subject: [PATCH 01/20] Refactor server side to handle multiple clients
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
WARN: This is still work in progress and does not compile.
Breaking changes:
* McpAsyncServer
* getClientCapabilities deprecated + throws
* getClientInfo deprecated + throws
* listRoots deprecated + throws
* createMessage deprecated + throws
* McpTransport
* connect deprecated - should only belong to McpClientTransport
* ServerMcpTransport
* connect default implementation that throws
The major change is the introduction of ServerMcpSession for per-client
communication. The user should be exposed to a limited abstraction that
hides the session called ServerMcpExchange which currently exposes
sampling and roots.
Signed-off-by: Dariusz Jędrzejczyk
---
.../transport/WebFluxSseServerTransport.java | 211 +++++++---------
.../server/McpAsyncServer.java | 128 +++++-----
.../spec/ClientMcpTransport.java | 6 +
.../spec/DefaultMcpSession.java | 1 +
.../spec/McpTransport.java | 5 +-
.../spec/ServerMcpExchange.java | 75 ++++++
.../spec/ServerMcpSession.java | 225 ++++++++++++++++++
.../spec/ServerMcpTransport.java | 14 ++
8 files changed, 472 insertions(+), 193 deletions(-)
create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpExchange.java
create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpSession.java
diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java
index bed7293e..09db3ba7 100644
--- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java
+++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java
@@ -1,21 +1,22 @@
package io.modelcontextprotocol.server.transport;
import java.io.IOException;
-import java.time.Duration;
-import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.ServerMcpSession;
import io.modelcontextprotocol.spec.ServerMcpTransport;
import io.modelcontextprotocol.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
@@ -88,18 +89,22 @@ public class WebFluxSseServerTransport implements ServerMcpTransport {
private final RouterFunction> routerFunction;
+ private ServerMcpSession.InitHandler initHandler;
+
+ private Map> requestHandlers;
+
+ private Map notificationHandlers;
+
/**
* Map of active client sessions, keyed by session ID.
*/
- private final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
/**
* Flag indicating if the transport is shutting down.
*/
private volatile boolean isClosing = false;
- private Function, Mono> connectHandler;
-
/**
* Constructs a new WebFlux SSE server transport instance.
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
@@ -137,21 +142,13 @@ public WebFluxSseServerTransport(ObjectMapper objectMapper, String messageEndpoi
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
}
- /**
- * Configures the message handler for this transport. In the WebFlux SSE
- * implementation, this method stores the handler for processing incoming messages but
- * doesn't establish any connections since the server accepts connections rather than
- * initiating them.
- * @param handler A function that processes incoming JSON-RPC messages and returns
- * responses. This handler will be called for each message received through the
- * message endpoint.
- * @return An empty Mono since the server doesn't initiate connections
- */
@Override
- public Mono connect(Function, Mono> handler) {
- this.connectHandler = handler;
- // Server-side transport doesn't initiate connections
- return Mono.empty().then();
+ public void registerHandlers(ServerMcpSession.InitHandler initHandler,
+ Map> requestHandlers,
+ Map notificationHandlers) {
+ this.initHandler = initHandler;
+ this.requestHandlers = requestHandlers;
+ this.notificationHandlers = notificationHandlers;
}
/**
@@ -178,36 +175,14 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) {
return Mono.empty();
}
- return Mono.create(sink -> {
- try {// @formatter:off
- String jsonText = objectMapper.writeValueAsString(message);
- ServerSentEvent
+ * @deprecated This is only relevant for client-side transports and will be removed
+ * from this interface.
*/
+ @Deprecated
Mono connect(Function, Mono> handler);
/**
@@ -69,7 +72,7 @@ default void close() {
Mono closeGracefully();
/**
- * Sends a message to the server asynchronously.
+ * Sends a message to the peer asynchronously.
*
*
* This method handles the transmission of messages to the server in an asynchronous
diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpExchange.java b/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpExchange.java
new file mode 100644
index 00000000..4facdb1c
--- /dev/null
+++ b/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpExchange.java
@@ -0,0 +1,75 @@
+package io.modelcontextprotocol.spec;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import reactor.core.publisher.Mono;
+
+public class ServerMcpExchange {
+
+ private final ServerMcpSession session;
+
+ private final McpSchema.ClientCapabilities clientCapabilities;
+
+ private final McpSchema.Implementation clientInfo;
+
+ public ServerMcpExchange(ServerMcpSession session, McpSchema.ClientCapabilities clientCapabilities,
+ McpSchema.Implementation clientInfo) {
+ this.session = session;
+ this.clientCapabilities = clientCapabilities;
+ this.clientInfo = clientInfo;
+ }
+
+ private static final TypeReference CREATE_MESSAGE_RESULT_TYPE_REF = new TypeReference<>() {
+ };
+
+ /**
+ * Create a new message using the sampling capabilities of the client. The Model
+ * Context Protocol (MCP) provides a standardized way for servers to request LLM
+ * sampling (“completions” or “generations”) from language models via clients. This
+ * flow allows clients to maintain control over model access, selection, and
+ * permissions while enabling servers to leverage AI capabilities—with no server API
+ * keys necessary. Servers can request text or image-based interactions and optionally
+ * include context from MCP servers in their prompts.
+ * @param createMessageRequest The request to create a new message
+ * @return A Mono that completes when the message has been created
+ * @throws McpError if the client has not been initialized or does not support
+ * sampling capabilities
+ * @throws McpError if the client does not support the createMessage method
+ * @see McpSchema.CreateMessageRequest
+ * @see McpSchema.CreateMessageResult
+ * @see Sampling
+ * Specification
+ */
+ public Mono createMessage(McpSchema.CreateMessageRequest createMessageRequest) {
+ if (this.clientCapabilities == null) {
+ return Mono.error(new McpError("Client must be initialized. Call the initialize method first!"));
+ }
+ if (this.clientCapabilities.sampling() == null) {
+ return Mono.error(new McpError("Client must be configured with sampling capabilities"));
+ }
+ return this.session.sendRequest(McpSchema.METHOD_SAMPLING_CREATE_MESSAGE, createMessageRequest,
+ CREATE_MESSAGE_RESULT_TYPE_REF);
+ }
+
+ private static final TypeReference LIST_ROOTS_RESULT_TYPE_REF = new TypeReference<>() {
+ };
+
+ /**
+ * Retrieves the list of all roots provided by the client.
+ * @return A Mono that emits the list of roots result.
+ */
+ public Mono listRoots() {
+ return this.listRoots(null);
+ }
+
+ /**
+ * Retrieves a paginated list of roots provided by the server.
+ * @param cursor Optional pagination cursor from a previous list request
+ * @return A Mono that emits the list of roots result containing
+ */
+ public Mono listRoots(String cursor) {
+ return this.session.sendRequest(McpSchema.METHOD_ROOTS_LIST, new McpSchema.PaginatedRequest(cursor),
+ LIST_ROOTS_RESULT_TYPE_REF);
+ }
+
+}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpSession.java
new file mode 100644
index 00000000..1e910d2b
--- /dev/null
+++ b/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpSession.java
@@ -0,0 +1,225 @@
+package io.modelcontextprotocol.spec;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
+import reactor.core.publisher.Sinks;
+
+public abstract class ServerMcpSession implements McpSession {
+
+ private static final Logger logger = LoggerFactory.getLogger(ServerMcpSession.class);
+
+ private final ConcurrentHashMap> pendingResponses = new ConcurrentHashMap<>();
+
+ private final String sessionPrefix = UUID.randomUUID().toString().substring(0, 8);
+
+ private final AtomicLong requestCounter = new AtomicLong(0);
+
+ private final InitHandler initHandler;
+
+ private final Map> requestHandlers;
+
+ private final Map notificationHandlers;
+
+ // TODO: used only to unmarshall - could be extracted to another interface
+ private final McpTransport transport;
+
+ private final Sinks.One exchangeSink = Sinks.one();
+
+ volatile boolean isInitialized = false;
+
+ public ServerMcpSession(McpTransport transport, InitHandler initHandler,
+ Map> requestHandlers, Map notificationHandlers) {
+ this.transport = transport;
+ this.initHandler = initHandler;
+ this.requestHandlers = requestHandlers;
+ this.notificationHandlers = notificationHandlers;
+ }
+
+ public void init(McpSchema.ClientCapabilities clientCapabilities, McpSchema.Implementation clientInfo) {
+ exchangeSink.tryEmitValue(new ServerMcpExchange(this, clientCapabilities, clientInfo));
+ }
+
+ public Mono exchange() {
+ return exchangeSink.asMono();
+ }
+
+ protected abstract Mono sendMessage(McpSchema.JSONRPCMessage message);
+
+ private String generateRequestId() {
+ return this.sessionPrefix + "-" + this.requestCounter.getAndIncrement();
+ }
+
+ public Mono sendRequest(String method, Object requestParams, TypeReference typeRef) {
+ String requestId = this.generateRequestId();
+
+ return Mono.create(sink -> {
+ this.pendingResponses.put(requestId, sink);
+ McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method,
+ requestId, requestParams);
+ this.sendMessage(jsonrpcRequest).subscribe(v -> {
+ }, error -> {
+ this.pendingResponses.remove(requestId);
+ sink.error(error);
+ });
+ }).timeout(Duration.ofSeconds(10)).handle((jsonRpcResponse, sink) -> {
+ if (jsonRpcResponse.error() != null) {
+ sink.error(new McpError(jsonRpcResponse.error()));
+ }
+ else {
+ if (typeRef.getType().equals(Void.class)) {
+ sink.complete();
+ }
+ else {
+ sink.next(this.transport.unmarshalFrom(jsonRpcResponse.result(), typeRef));
+ }
+ }
+ });
+ }
+
+ @Override
+ public Mono sendNotification(String method, Map params) {
+ McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
+ method, params);
+ return this.sendMessage(jsonrpcNotification);
+ }
+
+ public Mono handle(McpSchema.JSONRPCMessage message) {
+ return Mono.defer(() -> {
+ // TODO handle errors for communication to without initialization happening
+ // first
+ if (message instanceof McpSchema.JSONRPCResponse response) {
+ logger.debug("Received Response: {}", response);
+ var sink = pendingResponses.remove(response.id());
+ if (sink == null) {
+ logger.warn("Unexpected response for unknown id {}", response.id());
+ }
+ else {
+ sink.success(response);
+ }
+ return Mono.empty();
+ }
+ else if (message instanceof McpSchema.JSONRPCRequest request) {
+ logger.debug("Received request: {}", request);
+ return handleIncomingRequest(request).onErrorResume(error -> {
+ var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null,
+ new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
+ error.getMessage(), null));
+ // TODO: Should the error go to SSE or back as POST return?
+ return this.sendMessage(errorResponse).then(Mono.empty());
+ }).flatMap(this::sendMessage);
+ }
+ else if (message instanceof McpSchema.JSONRPCNotification notification) {
+ // TODO handle errors for communication to without initialization
+ // happening first
+ logger.debug("Received notification: {}", notification);
+ // TODO: in case of error, should the POST request be signalled?
+ return handleIncomingNotification(notification)
+ .doOnError(error -> logger.error("Error handling notification: {}", error.getMessage()));
+ }
+ else {
+ logger.warn("Received unknown message type: {}", message);
+ return Mono.empty();
+ }
+ });
+ }
+
+ /**
+ * Handles an incoming JSON-RPC request by routing it to the appropriate handler.
+ * @param request The incoming JSON-RPC request
+ * @return A Mono containing the JSON-RPC response
+ */
+ private Mono handleIncomingRequest(McpSchema.JSONRPCRequest request) {
+ return Mono.defer(() -> {
+ Mono> resultMono;
+ if (McpSchema.METHOD_INITIALIZE.equals(request.method())) {
+ // TODO handle situation where already initialized!
+ resultMono = this.initHandler.handle(new ClientInitConsumer(), request.params())
+ .doOnNext(initResult -> this.isInitialized = true);
+ }
+ else {
+ // TODO handle errors for communication to without initialization
+ // happening first
+ var handler = this.requestHandlers.get(request.method());
+ if (handler == null) {
+ MethodNotFoundError error = getMethodNotFoundError(request.method());
+ return Mono.just(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null,
+ new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.METHOD_NOT_FOUND,
+ error.message(), error.data())));
+ }
+
+ resultMono = this.exchangeSink.asMono().flatMap(exchange -> handler.handle(exchange, request.params()));
+ }
+ return resultMono
+ .map(result -> new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), result, null))
+ .onErrorResume(error -> Mono.just(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(),
+ null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
+ error.getMessage(), null)))); // TODO: add error message
+ // through the data field
+ });
+ }
+
+ /**
+ * Handles an incoming JSON-RPC notification by routing it to the appropriate handler.
+ * @param notification The incoming JSON-RPC notification
+ * @return A Mono that completes when the notification is processed
+ */
+ private Mono handleIncomingNotification(McpSchema.JSONRPCNotification notification) {
+ return Mono.defer(() -> {
+ var handler = notificationHandlers.get(notification.method());
+ if (handler == null) {
+ logger.error("No handler registered for notification method: {}", notification.method());
+ return Mono.empty();
+ }
+ return handler.handle(this, notification.params());
+ });
+ }
+
+ record MethodNotFoundError(String method, String message, Object data) {
+ }
+
+ static MethodNotFoundError getMethodNotFoundError(String method) {
+ switch (method) {
+ case McpSchema.METHOD_ROOTS_LIST:
+ return new MethodNotFoundError(method, "Roots not supported",
+ Map.of("reason", "Client does not have roots capability"));
+ default:
+ return new MethodNotFoundError(method, "Method not found: " + method, null);
+ }
+ }
+
+ public class ClientInitConsumer {
+
+ public void init(McpSchema.ClientCapabilities clientCapabilities, McpSchema.Implementation clientInfo) {
+ ServerMcpSession.this.init(clientCapabilities, clientInfo);
+ }
+
+ }
+
+ public interface InitHandler {
+
+ Mono handle(ClientInitConsumer clientInitConsumer, Object params);
+
+ }
+
+ public interface NotificationHandler {
+
+ Mono handle(ServerMcpSession connection, Object params);
+
+ }
+
+ public interface RequestHandler {
+
+ Mono handle(ServerMcpExchange exchange, Object params);
+
+ }
+
+}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpTransport.java
index 13591432..0c2069f3 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpTransport.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpTransport.java
@@ -3,6 +3,11 @@
*/
package io.modelcontextprotocol.spec;
+import java.util.Map;
+import java.util.function.Function;
+
+import reactor.core.publisher.Mono;
+
/**
* Marker interface for the server-side MCP transport.
*
@@ -10,4 +15,13 @@
*/
public interface ServerMcpTransport extends McpTransport {
+ @Override
+ default Mono connect(Function, Mono> handler) {
+ throw new IllegalStateException("Server transport does not support connect method");
+ }
+
+ void registerHandlers(ServerMcpSession.InitHandler initHandler,
+ Map> requestHandlers,
+ Map notificationHandlers);
+
}
From 8316ad8b2755f452957f6e05b08d9dc85d098455 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?=
Date: Thu, 6 Mar 2025 14:13:25 +0100
Subject: [PATCH 02/20] Introduce Child server transport and Session Factory
---
.../transport/WebFluxSseServerTransport.java | 52 +++++++++++--------
.../server/McpAsyncServer.java | 9 ++--
.../spec/ServerMcpSession.java | 40 +++++++++-----
.../spec/ServerMcpTransport.java | 11 ++--
4 files changed, 68 insertions(+), 44 deletions(-)
diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java
index 09db3ba7..65b02bba 100644
--- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java
+++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java
@@ -1,7 +1,6 @@
package io.modelcontextprotocol.server.transport;
import java.io.IOException;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -89,16 +88,24 @@ public class WebFluxSseServerTransport implements ServerMcpTransport {
private final RouterFunction> routerFunction;
- private ServerMcpSession.InitHandler initHandler;
-
- private Map> requestHandlers;
-
- private Map notificationHandlers;
+ private ServerMcpSession.Factory sessionFactory;
/**
* Map of active client sessions, keyed by session ID.
*/
- private final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
+
+ // FIXME: This is a bit clumsy. The McpAsyncServer handles global notifications
+ // using the transport and we need access to child transports for each session to
+ // use the sendMessage method. Ideally, the particular transport would be an
+ // abstraction of a specialized session that can handle only notifications and we
+ // could delegate to all child sessions without directly going through the transport.
+ // The conversion from a notification to message happens both in McpAsyncServer
+ // and in ServerMcpSession and it would be beneficial to have a unified interface
+ // for both. An MCP server implementation can use both McpServerExchange and
+ // Mcp(Sync|Async)Server to send notifications so the capability needs to lie in
+ // both places.
+ private final ConcurrentHashMap sessionTransports = new ConcurrentHashMap<>();
/**
* Flag indicating if the transport is shutting down.
@@ -143,12 +150,8 @@ public WebFluxSseServerTransport(ObjectMapper objectMapper, String messageEndpoi
}
@Override
- public void registerHandlers(ServerMcpSession.InitHandler initHandler,
- Map> requestHandlers,
- Map notificationHandlers) {
- this.initHandler = initHandler;
- this.requestHandlers = requestHandlers;
- this.notificationHandlers = notificationHandlers;
+ public void setSessionFactory(ServerMcpSession.Factory sessionFactory) {
+ this.sessionFactory = sessionFactory;
}
/**
@@ -177,7 +180,7 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) {
logger.debug("Attempting to broadcast message to {} active sessions", sessions.size());
- return Flux.fromStream(sessions.values().stream())
+ return Flux.fromStream(sessionTransports.values().stream())
.flatMap(session -> session.sendMessage(message)
.doOnError(e -> logger.error("Failed to " + "send message to session {}: {}", session.sessionId,
e.getMessage()))
@@ -218,7 +221,7 @@ public T unmarshalFrom(Object data, TypeReference typeRef) {
public Mono closeGracefully() {
return Flux.fromIterable(sessions.values())
.doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()))
- .doOnNext(WebFluxMcpSession::close)
+ .flatMap(ServerMcpSession::closeGracefully)
.then();
}
@@ -264,9 +267,11 @@ private Mono handleSseConnection(ServerRequest request) {
.body(Flux.>create(sink -> {
String sessionId = UUID.randomUUID().toString();
logger.debug("Creating new SSE connection for session: {}", sessionId);
- WebFluxMcpSession session = new WebFluxMcpSession(sessionId, sink, initHandler, requestHandlers,
- notificationHandlers);
- sessions.put(sessionId, session);
+ WebFluxMcpSessionTransport
+ sessionTransport = new WebFluxMcpSessionTransport(sessionId, sink);
+
+ sessions.put(sessionId, sessionFactory.create(sessionTransport));
+ sessionTransports.put(sessionId, sessionTransport);
// Send initial endpoint event
logger.debug("Sending initial endpoint event to session: {}", sessionId);
@@ -323,15 +328,13 @@ private Mono handleMessage(ServerRequest request) {
});
}
- private class WebFluxMcpSession extends ServerMcpSession {
+ private class WebFluxMcpSessionTransport implements ServerMcpTransport.Child {
final String sessionId;
private final FluxSink> sink;
- public WebFluxMcpSession(String sessionId, FluxSink> sink, InitHandler initHandler,
- Map> requestHandlers, Map notificationHandlers) {
- super(WebFluxSseServerTransport.this, initHandler, requestHandlers, notificationHandlers);
+ public WebFluxMcpSessionTransport(String sessionId, FluxSink> sink) {
this.sessionId = sessionId;
this.sink = sink;
}
@@ -358,6 +361,11 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) {
}).then();
}
+ @Override
+ public T unmarshalFrom(Object data, TypeReference typeRef) {
+ return WebFluxSseServerTransport.this.unmarshalFrom(data, typeRef);
+ }
+
@Override
public Mono closeGracefully() {
return Mono.fromRunnable(sink::complete);
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
index 63b19156..4072e504 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
@@ -157,18 +157,17 @@ public class McpAsyncServer {
asyncRootsListChangedNotificationHandler(rootsChangeConsumers));
this.transport = mcpTransport;
- mcpTransport.registerHandlers(this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers);
+ mcpTransport.setSessionFactory(transport -> new ServerMcpSession(transport,
+ this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
}
// ---------------------------------------
// Lifecycle Management
// ---------------------------------------
private Mono asyncInitializeRequestHandler(
- ServerMcpSession.ClientInitConsumer initConsumer, Object params) {
+ ServerMcpSession.ClientInitConsumer initConsumer, McpSchema.InitializeRequest initializeRequest) {
return Mono.defer(() -> {
- McpSchema.InitializeRequest initializeRequest = transport.unmarshalFrom(params,
- new TypeReference() {
- });
+
initConsumer.init(initializeRequest.capabilities(), initializeRequest.clientInfo());
diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpSession.java
index 1e910d2b..8265343a 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpSession.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpSession.java
@@ -13,7 +13,7 @@
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
-public abstract class ServerMcpSession implements McpSession {
+public class ServerMcpSession implements McpSession {
private static final Logger logger = LoggerFactory.getLogger(ServerMcpSession.class);
@@ -48,12 +48,6 @@ public void init(McpSchema.ClientCapabilities clientCapabilities, McpSchema.Impl
exchangeSink.tryEmitValue(new ServerMcpExchange(this, clientCapabilities, clientInfo));
}
- public Mono exchange() {
- return exchangeSink.asMono();
- }
-
- protected abstract Mono sendMessage(McpSchema.JSONRPCMessage message);
-
private String generateRequestId() {
return this.sessionPrefix + "-" + this.requestCounter.getAndIncrement();
}
@@ -65,7 +59,7 @@ public Mono sendRequest(String method, Object requestParams, TypeReferenc
this.pendingResponses.put(requestId, sink);
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method,
requestId, requestParams);
- this.sendMessage(jsonrpcRequest).subscribe(v -> {
+ this.transport.sendMessage(jsonrpcRequest).subscribe(v -> {
}, error -> {
this.pendingResponses.remove(requestId);
sink.error(error);
@@ -89,7 +83,7 @@ public Mono sendRequest(String method, Object requestParams, TypeReferenc
public Mono sendNotification(String method, Map params) {
McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
method, params);
- return this.sendMessage(jsonrpcNotification);
+ return this.transport.sendMessage(jsonrpcNotification);
}
public Mono handle(McpSchema.JSONRPCMessage message) {
@@ -114,8 +108,8 @@ else if (message instanceof McpSchema.JSONRPCRequest request) {
new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
error.getMessage(), null));
// TODO: Should the error go to SSE or back as POST return?
- return this.sendMessage(errorResponse).then(Mono.empty());
- }).flatMap(this::sendMessage);
+ return this.transport.sendMessage(errorResponse).then(Mono.empty());
+ }).flatMap(this.transport::sendMessage);
}
else if (message instanceof McpSchema.JSONRPCNotification notification) {
// TODO handle errors for communication to without initialization
@@ -142,7 +136,11 @@ private Mono handleIncomingRequest(McpSchema.JSONRPCR
Mono> resultMono;
if (McpSchema.METHOD_INITIALIZE.equals(request.method())) {
// TODO handle situation where already initialized!
- resultMono = this.initHandler.handle(new ClientInitConsumer(), request.params())
+ McpSchema.InitializeRequest initializeRequest =
+ transport.unmarshalFrom(request.params(),
+ new TypeReference() {
+ });
+ resultMono = this.initHandler.handle(new ClientInitConsumer(), initializeRequest)
.doOnNext(initResult -> this.isInitialized = true);
}
else {
@@ -196,6 +194,16 @@ static MethodNotFoundError getMethodNotFoundError(String method) {
}
}
+ @Override
+ public Mono closeGracefully() {
+ return this.transport.closeGracefully();
+ }
+
+ @Override
+ public void close() {
+ this.transport.close();
+ }
+
public class ClientInitConsumer {
public void init(McpSchema.ClientCapabilities clientCapabilities, McpSchema.Implementation clientInfo) {
@@ -206,7 +214,8 @@ public void init(McpSchema.ClientCapabilities clientCapabilities, McpSchema.Impl
public interface InitHandler {
- Mono handle(ClientInitConsumer clientInitConsumer, Object params);
+ Mono handle(ClientInitConsumer clientInitConsumer,
+ McpSchema.InitializeRequest initializeRequest);
}
@@ -222,4 +231,9 @@ public interface RequestHandler {
}
+ @FunctionalInterface
+ public interface Factory {
+ ServerMcpSession create(ServerMcpTransport.Child sessionTransport);
+ }
+
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpTransport.java
index 0c2069f3..6c3442d1 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpTransport.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/spec/ServerMcpTransport.java
@@ -3,7 +3,6 @@
*/
package io.modelcontextprotocol.spec;
-import java.util.Map;
import java.util.function.Function;
import reactor.core.publisher.Mono;
@@ -20,8 +19,12 @@ default Mono connect(Function, Mono> requestHandlers,
- Map notificationHandlers);
+ void setSessionFactory(ServerMcpSession.Factory sessionFactory);
+ interface Child extends McpTransport {
+ @Override
+ default Mono connect(Function, Mono> handler) {
+ throw new IllegalStateException("Server transport does not support connect method");
+ }
+ }
}
From 0823591456e6c7210e4c3fc23045499a009fb9b0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?=
Date: Wed, 12 Mar 2025 16:45:05 +0100
Subject: [PATCH 03/20] Aim for smoother transition to new APIs
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Dariusz Jędrzejczyk
---
...=> WebFluxSseServerTransportProvider.java} | 107 +-
.../WebFluxSseIntegrationTests.java | 6 +-
.../server/WebFluxSseMcpAsyncServerTests.java | 6 +-
.../server/WebFluxSseMcpSyncServerTests.java | 8 +-
.../server/McpAsyncServer.java | 1629 +++++++++++++----
.../server/McpServer.java | 45 +-
.../server/McpSyncServer.java | 5 +
.../spec/ClientMcpTransport.java | 4 +-
.../spec/McpClientTransport.java | 12 +
.../spec/McpServerTransport.java | 5 +
.../spec/McpServerTransportProvider.java | 32 +
.../spec/McpTransport.java | 6 +-
.../spec/ServerMcpExchange.java | 4 +
.../spec/ServerMcpSession.java | 88 +-
.../spec/ServerMcpTransport.java | 19 +-
15 files changed, 1463 insertions(+), 513 deletions(-)
rename mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/{WebFluxSseServerTransport.java => WebFluxSseServerTransportProvider.java} (80%)
create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/McpClientTransport.java
create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/McpServerTransport.java
create mode 100644 mcp/src/main/java/io/modelcontextprotocol/spec/McpServerTransportProvider.java
diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java
similarity index 80%
rename from mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java
rename to mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java
index 65b02bba..732616dc 100644
--- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransport.java
+++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java
@@ -1,13 +1,15 @@
package io.modelcontextprotocol.server.transport;
import java.io.IOException;
-import java.util.UUID;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.McpServerTransport;
+import io.modelcontextprotocol.spec.McpServerTransportProvider;
import io.modelcontextprotocol.spec.ServerMcpSession;
import io.modelcontextprotocol.spec.ServerMcpTransport;
import io.modelcontextprotocol.util.Assert;
@@ -61,9 +63,10 @@
* @see ServerMcpTransport
* @see ServerSentEvent
*/
-public class WebFluxSseServerTransport implements ServerMcpTransport {
+public class WebFluxSseServerTransportProvider implements McpServerTransportProvider {
- private static final Logger logger = LoggerFactory.getLogger(WebFluxSseServerTransport.class);
+ private static final Logger logger = LoggerFactory.getLogger(
+ WebFluxSseServerTransportProvider.class);
/**
* Event type for JSON-RPC messages sent through the SSE connection.
@@ -95,18 +98,6 @@ public class WebFluxSseServerTransport implements ServerMcpTransport {
*/
private final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
- // FIXME: This is a bit clumsy. The McpAsyncServer handles global notifications
- // using the transport and we need access to child transports for each session to
- // use the sendMessage method. Ideally, the particular transport would be an
- // abstraction of a specialized session that can handle only notifications and we
- // could delegate to all child sessions without directly going through the transport.
- // The conversion from a notification to message happens both in McpAsyncServer
- // and in ServerMcpSession and it would be beneficial to have a unified interface
- // for both. An MCP server implementation can use both McpServerExchange and
- // Mcp(Sync|Async)Server to send notifications so the capability needs to lie in
- // both places.
- private final ConcurrentHashMap sessionTransports = new ConcurrentHashMap<>();
-
/**
* Flag indicating if the transport is shutting down.
*/
@@ -121,7 +112,7 @@ public class WebFluxSseServerTransport implements ServerMcpTransport {
* setup. Must not be null.
* @throws IllegalArgumentException if either parameter is null
*/
- public WebFluxSseServerTransport(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
+ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
@@ -145,7 +136,7 @@ public WebFluxSseServerTransport(ObjectMapper objectMapper, String messageEndpoi
* setup. Must not be null.
* @throws IllegalArgumentException if either parameter is null
*/
- public WebFluxSseServerTransport(ObjectMapper objectMapper, String messageEndpoint) {
+ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
}
@@ -167,12 +158,13 @@ public void setSessionFactory(ServerMcpSession.Factory sessionFactory) {
*
Attempts to send the event to all active sessions
*
Tracks and reports any delivery failures
*
- * @param message The JSON-RPC message to broadcast
+ * @param method The JSON-RPC method to send to clients
+ * @param params The method parameters to send to clients
* @return A Mono that completes when the message has been sent to all sessions, or
* errors if any session fails to receive the message
*/
@Override
- public Mono sendMessage(McpSchema.JSONRPCMessage message) {
+ public Mono notifyClients(String method, Map params) {
if (sessions.isEmpty()) {
logger.debug("No active sessions to broadcast message to");
return Mono.empty();
@@ -180,29 +172,15 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) {
logger.debug("Attempting to broadcast message to {} active sessions", sessions.size());
- return Flux.fromStream(sessionTransports.values().stream())
- .flatMap(session -> session.sendMessage(message)
- .doOnError(e -> logger.error("Failed to " + "send message to session {}: {}", session.sessionId,
+ return Flux.fromStream(sessions.values().stream())
+ .flatMap(session -> session.sendNotification(method, params)
+ .doOnError(e -> logger.error("Failed to " + "send message to session " +
+ "{}: {}", session.getId(),
e.getMessage()))
.onErrorComplete())
.then();
}
- /**
- * Converts data from one type to another using the configured ObjectMapper. This
- * method is primarily used for converting between different representations of
- * JSON-RPC message data.
- * @param The target type to convert to
- * @param data The source data to convert
- * @param typeRef Type reference describing the target type
- * @return The converted data
- * @throws IllegalArgumentException if the conversion fails
- */
- @Override
- public T unmarshalFrom(Object data, TypeReference typeRef) {
- return this.objectMapper.convertValue(data, typeRef);
- }
-
/**
* Initiates a graceful shutdown of the transport. This method ensures all active
* sessions are properly closed and cleaned up.
@@ -265,13 +243,13 @@ private Mono handleSseConnection(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(Flux.>create(sink -> {
- String sessionId = UUID.randomUUID().toString();
- logger.debug("Creating new SSE connection for session: {}", sessionId);
- WebFluxMcpSessionTransport
- sessionTransport = new WebFluxMcpSessionTransport(sessionId, sink);
+ WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink);
+
+ ServerMcpSession session = sessionFactory.create(sessionTransport);
+ String sessionId = session.getId();
- sessions.put(sessionId, sessionFactory.create(sessionTransport));
- sessionTransports.put(sessionId, sessionTransport);
+ logger.debug("Created new SSE connection for session: {}", sessionId);
+ sessions.put(sessionId, session);
// Send initial endpoint event
logger.debug("Sending initial endpoint event to session: {}", sessionId);
@@ -328,14 +306,47 @@ private Mono handleMessage(ServerRequest request) {
});
}
- private class WebFluxMcpSessionTransport implements ServerMcpTransport.Child {
+ /*
+ Current:
+
+ framework layer:
+ var transport = new WebFluxSseServerTransport(objectMapper, "/mcp", "/sse");
+ McpServer.async(ServerMcpTransport transport)
+
+ client connects ->
+ WebFluxSseServerTransport creates a:
+ - var sessionTransport = WebFluxMcpSessionTransport
+ - ServerMcpSession(sessionId, sessionTransport)
+
+ WebFluxSseServerTransport IS_A ServerMcpTransport IS_A McpTransport
+ WebFluxMcpSessionTransport IS_A ServerMcpSessionTransport IS_A McpTransport
+
+ McpTransport contains connect() which should be removed
+ ClientMcpTransport should have connect()
+ ServerMcpTransport should have setSessionFactory()
+
+ Possible Future:
+ var transportProvider = new WebFluxSseServerTransport(objectMapper, "/mcp", "/sse");
+ WebFluxSseServerTransport IS_A ServerMcpTransportProvider ?
+ ServerMcpTransportProvider creates ServerMcpTransport
+
+ // disadvantage - too much breaks, e.g.
+ McpServer.async(ServerMcpTransportProvider transportProvider)
+
+ // advantage
+
+ ClientMcpTransport and ServerMcpTransport BOTH represent 1:1 relationship
+
+
+
+
+ */
- final String sessionId;
+ private class WebFluxMcpSessionTransport implements McpServerTransport {
private final FluxSink> sink;
- public WebFluxMcpSessionTransport(String sessionId, FluxSink> sink) {
- this.sessionId = sessionId;
+ public WebFluxMcpSessionTransport(FluxSink> sink) {
this.sink = sink;
}
@@ -363,7 +374,7 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) {
@Override
public T unmarshalFrom(Object data, TypeReference typeRef) {
- return WebFluxSseServerTransport.this.unmarshalFrom(data, typeRef);
+ return objectMapper.convertValue(data, typeRef);
}
@Override
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java
index 4cd24c62..3df80db8 100644
--- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java
@@ -16,7 +16,7 @@
import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport;
import io.modelcontextprotocol.server.McpServer;
import io.modelcontextprotocol.server.McpServerFeatures;
-import io.modelcontextprotocol.server.transport.WebFluxSseServerTransport;
+import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.CallToolResult;
@@ -55,14 +55,14 @@ public class WebFluxSseIntegrationTests {
private DisposableServer httpServer;
- private WebFluxSseServerTransport mcpServerTransport;
+ private WebFluxSseServerTransportProvider mcpServerTransport;
ConcurrentHashMap clientBulders = new ConcurrentHashMap<>();
@BeforeEach
public void before() {
- this.mcpServerTransport = new WebFluxSseServerTransport(new ObjectMapper(), MESSAGE_ENDPOINT);
+ this.mcpServerTransport = new WebFluxSseServerTransportProvider(new ObjectMapper(), MESSAGE_ENDPOINT);
HttpHandler httpHandler = RouterFunctions.toHttpHandler(mcpServerTransport.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpAsyncServerTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpAsyncServerTests.java
index 1ed0d99b..34f4b689 100644
--- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpAsyncServerTests.java
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpAsyncServerTests.java
@@ -5,7 +5,7 @@
package io.modelcontextprotocol.server;
import com.fasterxml.jackson.databind.ObjectMapper;
-import io.modelcontextprotocol.server.transport.WebFluxSseServerTransport;
+import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
import io.modelcontextprotocol.spec.ServerMcpTransport;
import org.junit.jupiter.api.Timeout;
import reactor.netty.DisposableServer;
@@ -16,7 +16,7 @@
import org.springframework.web.reactive.function.server.RouterFunctions;
/**
- * Tests for {@link McpAsyncServer} using {@link WebFluxSseServerTransport}.
+ * Tests for {@link McpAsyncServer} using {@link WebFluxSseServerTransportProvider}.
*
* @author Christian Tzolov
*/
@@ -31,7 +31,7 @@ class WebFluxSseMcpAsyncServerTests extends AbstractMcpAsyncServerTests {
@Override
protected ServerMcpTransport createMcpTransport() {
- var transport = new WebFluxSseServerTransport(new ObjectMapper(), MESSAGE_ENDPOINT);
+ var transport = new WebFluxSseServerTransportProvider(new ObjectMapper(), MESSAGE_ENDPOINT);
HttpHandler httpHandler = RouterFunctions.toHttpHandler(transport.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpSyncServerTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpSyncServerTests.java
index 4db00dd4..2cf1087d 100644
--- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpSyncServerTests.java
+++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpSyncServerTests.java
@@ -5,7 +5,7 @@
package io.modelcontextprotocol.server;
import com.fasterxml.jackson.databind.ObjectMapper;
-import io.modelcontextprotocol.server.transport.WebFluxSseServerTransport;
+import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
import io.modelcontextprotocol.spec.ServerMcpTransport;
import org.junit.jupiter.api.Timeout;
import reactor.netty.DisposableServer;
@@ -16,7 +16,7 @@
import org.springframework.web.reactive.function.server.RouterFunctions;
/**
- * Tests for {@link McpSyncServer} using {@link WebFluxSseServerTransport}.
+ * Tests for {@link McpSyncServer} using {@link WebFluxSseServerTransportProvider}.
*
* @author Christian Tzolov
*/
@@ -29,11 +29,11 @@ class WebFluxSseMcpSyncServerTests extends AbstractMcpSyncServerTests {
private DisposableServer httpServer;
- private WebFluxSseServerTransport transport;
+ private WebFluxSseServerTransportProvider transport;
@Override
protected ServerMcpTransport createMcpTransport() {
- transport = new WebFluxSseServerTransport(new ObjectMapper(), MESSAGE_ENDPOINT);
+ transport = new WebFluxSseServerTransportProvider(new ObjectMapper(), MESSAGE_ENDPOINT);
return transport;
}
diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
index 4072e504..d565cb9e 100644
--- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
+++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
@@ -4,25 +4,29 @@
package io.modelcontextprotocol.server;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.DefaultMcpSession;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.McpServerTransportProvider;
import io.modelcontextprotocol.spec.ServerMcpSession;
-import io.modelcontextprotocol.spec.ServerMcpTransport;
import io.modelcontextprotocol.spec.McpSchema.CallToolResult;
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.spec.McpSchema.Tool;
+import io.modelcontextprotocol.spec.ServerMcpTransport;
import io.modelcontextprotocol.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,125 +78,31 @@ public class McpAsyncServer {
private static final Logger logger = LoggerFactory.getLogger(McpAsyncServer.class);
- private final ServerMcpTransport transport;
-
- private final McpSchema.ServerCapabilities serverCapabilities;
-
- private final McpSchema.Implementation serverInfo;
-
- /**
- * Thread-safe list of tool handlers that can be modified at runtime.
- */
- private final CopyOnWriteArrayList tools = new CopyOnWriteArrayList<>();
-
- private final CopyOnWriteArrayList resourceTemplates = new CopyOnWriteArrayList<>();
+ private final McpAsyncServer delegate;
- private final ConcurrentHashMap resources = new ConcurrentHashMap<>();
-
- private final ConcurrentHashMap prompts = new ConcurrentHashMap<>();
-
- private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG;
-
- /**
- * Supported protocol versions.
- */
- private List protocolVersions = List.of(McpSchema.LATEST_PROTOCOL_VERSION);
+ McpAsyncServer() {
+ this.delegate = null;
+ }
/**
* Create a new McpAsyncServer with the given transport and capabilities.
* @param mcpTransport The transport layer implementation for MCP communication.
* @param features The MCP server supported features.
*/
+ @Deprecated
McpAsyncServer(ServerMcpTransport mcpTransport, McpServerFeatures.Async features) {
- this.serverInfo = features.serverInfo();
- this.serverCapabilities = features.serverCapabilities();
- this.tools.addAll(features.tools());
- this.resources.putAll(features.resources());
- this.resourceTemplates.addAll(features.resourceTemplates());
- this.prompts.putAll(features.prompts());
-
- Map> requestHandlers = new HashMap<>();
-
- // Initialize request handlers for standard MCP methods
-
- // Ping MUST respond with an empty data, but not NULL response.
- requestHandlers.put(McpSchema.METHOD_PING, (exchange, params) -> Mono.just(""));
-
- // Add tools API handlers if the tool capability is enabled
- if (this.serverCapabilities.tools() != null) {
- requestHandlers.put(McpSchema.METHOD_TOOLS_LIST, toolsListRequestHandler());
- requestHandlers.put(McpSchema.METHOD_TOOLS_CALL, toolsCallRequestHandler());
- }
-
- // Add resources API handlers if provided
- if (this.serverCapabilities.resources() != null) {
- requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler());
- requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler());
- requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler());
- }
-
- // Add prompts API handlers if provider exists
- if (this.serverCapabilities.prompts() != null) {
- requestHandlers.put(McpSchema.METHOD_PROMPT_LIST, promptsListRequestHandler());
- requestHandlers.put(McpSchema.METHOD_PROMPT_GET, promptsGetRequestHandler());
- }
-
- // Add logging API handlers if the logging capability is enabled
- if (this.serverCapabilities.logging() != null) {
- requestHandlers.put(McpSchema.METHOD_LOGGING_SET_LEVEL, setLoggerRequestHandler());
- }
-
- Map notificationHandlers = new HashMap<>();
-
- notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());
-
- List, Mono>> rootsChangeConsumers = features.rootsChangeConsumers();
-
- if (Utils.isEmpty(rootsChangeConsumers)) {
- rootsChangeConsumers = List.of((roots) -> Mono.fromRunnable(() -> logger
- .warn("Roots list changed notification, but no consumers provided. Roots list changed: {}", roots)));
- }
-
- notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED,
- asyncRootsListChangedNotificationHandler(rootsChangeConsumers));
-
- this.transport = mcpTransport;
- mcpTransport.setSessionFactory(transport -> new ServerMcpSession(transport,
- this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
+ this.delegate = new LegacyAsyncServer(mcpTransport, features);
}
- // ---------------------------------------
- // Lifecycle Management
- // ---------------------------------------
- private Mono asyncInitializeRequestHandler(
- ServerMcpSession.ClientInitConsumer initConsumer, McpSchema.InitializeRequest initializeRequest) {
- return Mono.defer(() -> {
-
-
- initConsumer.init(initializeRequest.capabilities(), initializeRequest.clientInfo());
-
- logger.info("Client initialize request - Protocol: {}, Capabilities: {}, Info: {}",
- initializeRequest.protocolVersion(), initializeRequest.capabilities(),
- initializeRequest.clientInfo());
-
- // The server MUST respond with the highest protocol version it supports if
- // it does not support the requested (e.g. Client) version.
- String serverProtocolVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);
-
- if (this.protocolVersions.contains(initializeRequest.protocolVersion())) {
- // If the server supports the requested protocol version, it MUST respond
- // with the same version.
- serverProtocolVersion = initializeRequest.protocolVersion();
- }
- else {
- logger.warn(
- "Client requested unsupported protocol version: {}, so the server will sugggest the {} version instead",
- initializeRequest.protocolVersion(), serverProtocolVersion);
- }
-
- return Mono.just(new McpSchema.InitializeResult(serverProtocolVersion, this.serverCapabilities,
- this.serverInfo, null));
- });
+ /**
+ * Create a new McpAsyncServer with the given transport and capabilities.
+ * @param mcpTransportProvider The transport layer implementation for MCP communication.
+ * @param features The MCP server supported features.
+ */
+ McpAsyncServer(McpServerTransportProvider mcpTransportProvider,
+ ObjectMapper objectMapper,
+ McpServerFeatures.Async features) {
+ this.delegate = new AsyncServerImpl(mcpTransportProvider, objectMapper, features);
}
/**
@@ -200,7 +110,7 @@ private Mono asyncInitializeRequestHandler(
* @return The server capabilities
*/
public McpSchema.ServerCapabilities getServerCapabilities() {
- return this.serverCapabilities;
+ return this.delegate.getServerCapabilities();
}
/**
@@ -208,25 +118,27 @@ public McpSchema.ServerCapabilities getServerCapabilities() {
* @return The server implementation details
*/
public McpSchema.Implementation getServerInfo() {
- return this.serverInfo;
+ return this.delegate.getServerInfo();
}
/**
* Get the client capabilities that define the supported features and functionality.
* @return The client capabilities
+ * @deprecated This will be removed in 0.9.0
*/
@Deprecated
public ClientCapabilities getClientCapabilities() {
- throw new IllegalStateException("This method is deprecated and should not be called");
+ return this.delegate.getClientCapabilities();
}
/**
* Get the client implementation information.
* @return The client implementation details
+ * @deprecated This will be removed in 0.9.0
*/
@Deprecated
public McpSchema.Implementation getClientInfo() {
- throw new IllegalStateException("This method is deprecated and should not be called");
+ return this.delegate.getClientInfo();
}
/**
@@ -234,47 +146,34 @@ public McpSchema.Implementation getClientInfo() {
* @return A Mono that completes when the server has been closed
*/
public Mono closeGracefully() {
- return this.transport.closeGracefully();
+ return this.delegate.closeGracefully();
}
/**
* Close the server immediately.
*/
public void close() {
- this.transport.close();
+ this.delegate.close();
}
- private static final TypeReference LIST_ROOTS_RESULT_TYPE_REF = new TypeReference<>() {
- };
-
/**
* Retrieves the list of all roots provided by the client.
* @return A Mono that emits the list of roots result.
*/
+ @Deprecated
public Mono listRoots() {
- return this.listRoots(null);
+ return this.delegate.listRoots(null);
}
/**
* Retrieves a paginated list of roots provided by the server.
* @param cursor Optional pagination cursor from a previous list request
* @return A Mono that emits the list of roots result containing
+ * @deprecated This will be removed in 0.9.0
*/
@Deprecated
public Mono listRoots(String cursor) {
- return Mono.error(new RuntimeException("Not implemented"));
- }
-
- private ServerMcpSession.NotificationHandler asyncRootsListChangedNotificationHandler(
- List, Mono>> rootsChangeConsumers) {
- return (exchange,
- params) -> listRoots().flatMap(listRootsResult -> Flux.fromIterable(rootsChangeConsumers)
- .flatMap(consumer -> consumer.apply(listRootsResult.roots()))
- .onErrorResume(error -> {
- logger.error("Error handling roots list change notification", error);
- return Mono.empty();
- })
- .then());
+ return this.delegate.listRoots(cursor);
}
// ---------------------------------------
@@ -287,34 +186,7 @@ private ServerMcpSession.NotificationHandler asyncRootsListChangedNotificationHa
* @return Mono that completes when clients have been notified of the change
*/
public Mono addTool(McpServerFeatures.AsyncToolRegistration toolRegistration) {
- if (toolRegistration == null) {
- return Mono.error(new McpError("Tool registration must not be null"));
- }
- if (toolRegistration.tool() == null) {
- return Mono.error(new McpError("Tool must not be null"));
- }
- if (toolRegistration.call() == null) {
- return Mono.error(new McpError("Tool call handler must not be null"));
- }
- if (this.serverCapabilities.tools() == null) {
- return Mono.error(new McpError("Server must be configured with tool capabilities"));
- }
-
- return Mono.defer(() -> {
- // Check for duplicate tool names
- if (this.tools.stream().anyMatch(th -> th.tool().name().equals(toolRegistration.tool().name()))) {
- return Mono
- .error(new McpError("Tool with name '" + toolRegistration.tool().name() + "' already exists"));
- }
-
- this.tools.add(toolRegistration);
- logger.debug("Added tool handler: {}", toolRegistration.tool().name());
-
- if (this.serverCapabilities.tools().listChanged()) {
- return notifyToolsListChanged();
- }
- return Mono.empty();
- });
+ return this.delegate.addTool(toolRegistration);
}
/**
@@ -323,24 +195,7 @@ public Mono addTool(McpServerFeatures.AsyncToolRegistration toolRegistrati
* @return Mono that completes when clients have been notified of the change
*/
public Mono removeTool(String toolName) {
- if (toolName == null) {
- return Mono.error(new McpError("Tool name must not be null"));
- }
- if (this.serverCapabilities.tools() == null) {
- return Mono.error(new McpError("Server must be configured with tool capabilities"));
- }
-
- return Mono.defer(() -> {
- boolean removed = this.tools.removeIf(toolRegistration -> toolRegistration.tool().name().equals(toolName));
- if (removed) {
- logger.debug("Removed tool handler: {}", toolName);
- if (this.serverCapabilities.tools().listChanged()) {
- return notifyToolsListChanged();
- }
- return Mono.empty();
- }
- return Mono.error(new McpError("Tool with name '" + toolName + "' not found"));
- });
+ return this.delegate.removeTool(toolName);
}
/**
@@ -348,36 +203,7 @@ public Mono removeTool(String toolName) {
* @return A Mono that completes when all clients have been notified
*/
public Mono notifyToolsListChanged() {
- McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
- McpSchema.METHOD_NOTIFICATION_TOOLS_LIST_CHANGED, null);
- return this.transport.sendMessage(jsonrpcNotification);
- }
-
- private ServerMcpSession.RequestHandler toolsListRequestHandler() {
- return (exchange, params) -> {
- List tools = this.tools.stream().map(McpServerFeatures.AsyncToolRegistration::tool).toList();
-
- return Mono.just(new McpSchema.ListToolsResult(tools, null));
- };
- }
-
- private ServerMcpSession.RequestHandler toolsCallRequestHandler() {
- return (exchange, params) -> {
- McpSchema.CallToolRequest callToolRequest = transport.unmarshalFrom(params,
- new TypeReference() {
- });
-
- Optional toolRegistration = this.tools.stream()
- .filter(tr -> callToolRequest.name().equals(tr.tool().name()))
- .findAny();
-
- if (toolRegistration.isEmpty()) {
- return Mono.error(new McpError("Tool not found: " + callToolRequest.name()));
- }
-
- return toolRegistration.map(tool -> tool.call().apply(callToolRequest.arguments()))
- .orElse(Mono.error(new McpError("Tool not found: " + callToolRequest.name())));
- };
+ return this.delegate.notifyToolsListChanged();
}
// ---------------------------------------
@@ -390,25 +216,7 @@ private ServerMcpSession.RequestHandler toolsCallRequestHandler(
* @return Mono that completes when clients have been notified of the change
*/
public Mono addResource(McpServerFeatures.AsyncResourceRegistration resourceHandler) {
- if (resourceHandler == null || resourceHandler.resource() == null) {
- return Mono.error(new McpError("Resource must not be null"));
- }
-
- if (this.serverCapabilities.resources() == null) {
- return Mono.error(new McpError("Server must be configured with resource capabilities"));
- }
-
- return Mono.defer(() -> {
- if (this.resources.putIfAbsent(resourceHandler.resource().uri(), resourceHandler) != null) {
- return Mono
- .error(new McpError("Resource with URI '" + resourceHandler.resource().uri() + "' already exists"));
- }
- logger.debug("Added resource handler: {}", resourceHandler.resource().uri());
- if (this.serverCapabilities.resources().listChanged()) {
- return notifyResourcesListChanged();
- }
- return Mono.empty();
- });
+ return this.delegate.addResource(resourceHandler);
}
/**
@@ -417,24 +225,7 @@ public Mono addResource(McpServerFeatures.AsyncResourceRegistration resour
* @return Mono that completes when clients have been notified of the change
*/
public Mono removeResource(String resourceUri) {
- if (resourceUri == null) {
- return Mono.error(new McpError("Resource URI must not be null"));
- }
- if (this.serverCapabilities.resources() == null) {
- return Mono.error(new McpError("Server must be configured with resource capabilities"));
- }
-
- return Mono.defer(() -> {
- McpServerFeatures.AsyncResourceRegistration removed = this.resources.remove(resourceUri);
- if (removed != null) {
- logger.debug("Removed resource handler: {}", resourceUri);
- if (this.serverCapabilities.resources().listChanged()) {
- return notifyResourcesListChanged();
- }
- return Mono.empty();
- }
- return Mono.error(new McpError("Resource with URI '" + resourceUri + "' not found"));
- });
+ return this.delegate.removeResource(resourceUri);
}
/**
@@ -442,38 +233,7 @@ public Mono removeResource(String resourceUri) {
* @return A Mono that completes when all clients have been notified
*/
public Mono notifyResourcesListChanged() {
- McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
- McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, null);
- return this.transport.sendMessage(jsonrpcNotification);
- }
-
- private ServerMcpSession.RequestHandler resourcesListRequestHandler() {
- return (exchange, params) -> {
- var resourceList = this.resources.values()
- .stream()
- .map(McpServerFeatures.AsyncResourceRegistration::resource)
- .toList();
- return Mono.just(new McpSchema.ListResourcesResult(resourceList, null));
- };
- }
-
- private ServerMcpSession.RequestHandler resourceTemplateListRequestHandler() {
- return (exchange, params) -> Mono.just(new McpSchema.ListResourceTemplatesResult(this.resourceTemplates, null));
-
- }
-
- private ServerMcpSession.RequestHandler resourcesReadRequestHandler() {
- return (exchange, params) -> {
- McpSchema.ReadResourceRequest resourceRequest = transport.unmarshalFrom(params,
- new TypeReference() {
- });
- var resourceUri = resourceRequest.uri();
- McpServerFeatures.AsyncResourceRegistration registration = this.resources.get(resourceUri);
- if (registration != null) {
- return registration.readHandler().apply(resourceRequest);
- }
- return Mono.error(new McpError("Resource not found: " + resourceUri));
- };
+ return this.delegate.notifyResourcesListChanged();
}
// ---------------------------------------
@@ -486,31 +246,7 @@ private ServerMcpSession.RequestHandler resourcesR
* @return Mono that completes when clients have been notified of the change
*/
public Mono addPrompt(McpServerFeatures.AsyncPromptRegistration promptRegistration) {
- if (promptRegistration == null) {
- return Mono.error(new McpError("Prompt registration must not be null"));
- }
- if (this.serverCapabilities.prompts() == null) {
- return Mono.error(new McpError("Server must be configured with prompt capabilities"));
- }
-
- return Mono.defer(() -> {
- McpServerFeatures.AsyncPromptRegistration registration = this.prompts
- .putIfAbsent(promptRegistration.prompt().name(), promptRegistration);
- if (registration != null) {
- return Mono.error(
- new McpError("Prompt with name '" + promptRegistration.prompt().name() + "' already exists"));
- }
-
- logger.debug("Added prompt handler: {}", promptRegistration.prompt().name());
-
- // Servers that declared the listChanged capability SHOULD send a
- // notification,
- // when the list of available prompts changes
- if (this.serverCapabilities.prompts().listChanged()) {
- return notifyPromptsListChanged();
- }
- return Mono.empty();
- });
+ return this.delegate.addPrompt(promptRegistration);
}
/**
@@ -519,27 +255,7 @@ public Mono addPrompt(McpServerFeatures.AsyncPromptRegistration promptRegi
* @return Mono that completes when clients have been notified of the change
*/
public Mono removePrompt(String promptName) {
- if (promptName == null) {
- return Mono.error(new McpError("Prompt name must not be null"));
- }
- if (this.serverCapabilities.prompts() == null) {
- return Mono.error(new McpError("Server must be configured with prompt capabilities"));
- }
-
- return Mono.defer(() -> {
- McpServerFeatures.AsyncPromptRegistration removed = this.prompts.remove(promptName);
-
- if (removed != null) {
- logger.debug("Removed prompt handler: {}", promptName);
- // Servers that declared the listChanged capability SHOULD send a
- // notification, when the list of available prompts changes
- if (this.serverCapabilities.prompts().listChanged()) {
- return this.notifyPromptsListChanged();
- }
- return Mono.empty();
- }
- return Mono.error(new McpError("Prompt with name '" + promptName + "' not found"));
- });
+ return this.delegate.removePrompt(promptName);
}
/**
@@ -547,41 +263,7 @@ public Mono removePrompt(String promptName) {
* @return A Mono that completes when all clients have been notified
*/
public Mono notifyPromptsListChanged() {
- McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
- McpSchema.METHOD_NOTIFICATION_PROMPTS_LIST_CHANGED, null);
- return this.transport.sendMessage(jsonrpcNotification);
- }
-
- private ServerMcpSession.RequestHandler promptsListRequestHandler() {
- return (exchange, params) -> {
- // TODO: Implement pagination
- // McpSchema.PaginatedRequest request = transport.unmarshalFrom(params,
- // new TypeReference() {
- // });
-
- var promptList = this.prompts.values()
- .stream()
- .map(McpServerFeatures.AsyncPromptRegistration::prompt)
- .toList();
-
- return Mono.just(new McpSchema.ListPromptsResult(promptList, null));
- };
- }
-
- private ServerMcpSession.RequestHandler promptsGetRequestHandler() {
- return (exchange, params) -> {
- McpSchema.GetPromptRequest promptRequest = transport.unmarshalFrom(params,
- new TypeReference() {
- });
-
- // Implement prompt retrieval logic here
- McpServerFeatures.AsyncPromptRegistration registration = this.prompts.get(promptRequest.name());
- if (registration == null) {
- return Mono.error(new McpError("Prompt not found: " + promptRequest.name()));
- }
-
- return registration.promptHandler().apply(promptRequest);
- };
+ return this.delegate.notifyPromptsListChanged();
}
// ---------------------------------------
@@ -595,43 +277,12 @@ private ServerMcpSession.RequestHandler promptsGetReq
* @return A Mono that completes when the notification has been sent
*/
public Mono loggingNotification(LoggingMessageNotification loggingMessageNotification) {
-
- if (loggingMessageNotification == null) {
- return Mono.error(new McpError("Logging message must not be null"));
- }
-
- Map params = this.transport.unmarshalFrom(loggingMessageNotification,
- new TypeReference