Skip to content

Commit 28594fb

Browse files
committed
Optimize nested streams
1 parent 1bee8d4 commit 28594fb

File tree

1 file changed

+8
-7
lines changed

1 file changed

+8
-7
lines changed

mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpSession.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import reactor.core.Disposable;
1818
import reactor.core.publisher.Mono;
1919
import reactor.core.publisher.MonoSink;
20+
import reactor.core.scheduler.Schedulers;
2021

2122
/**
2223
* Default implementation of the MCP (Model Context Protocol) session that manages
@@ -135,13 +136,13 @@ public DefaultMcpSession(Duration requestTimeout, McpTransport transport,
135136
}
136137
else if (message instanceof McpSchema.JSONRPCRequest request) {
137138
logger.debug("Received request: {}", request);
138-
handleIncomingRequest(request).subscribe(response -> transport.sendMessage(response).subscribe(),
139-
error -> {
140-
var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(),
141-
null, new McpSchema.JSONRPCResponse.JSONRPCError(
142-
McpSchema.ErrorCodes.INTERNAL_ERROR, error.getMessage(), null));
143-
transport.sendMessage(errorResponse).subscribe();
144-
});
139+
handleIncomingRequest(request).flatMap(transport::sendMessage).onErrorResume(error -> {
140+
var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null,
141+
new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
142+
error.getMessage(), null));
143+
return transport.sendMessage(errorResponse);
144+
}).subscribe();
145+
145146
}
146147
else if (message instanceof McpSchema.JSONRPCNotification notification) {
147148
logger.debug("Received notification: {}", notification);

0 commit comments

Comments
 (0)