@@ -62,8 +62,6 @@ public class McpClientSession implements McpSession {
62
62
/** Atomic counter for generating unique request IDs */
63
63
private final AtomicLong requestCounter = new AtomicLong (0 );
64
64
65
- private final Disposable connection ;
66
-
67
65
/**
68
66
* Functional interface for handling incoming JSON-RPC requests. Implementations
69
67
* should process the request parameters and return a response.
@@ -108,7 +106,7 @@ public interface NotificationHandler {
108
106
public McpClientSession (Duration requestTimeout , McpClientTransport transport ,
109
107
Map <String , RequestHandler <?>> requestHandlers , Map <String , NotificationHandler > notificationHandlers ) {
110
108
111
- Assert .notNull (requestTimeout , "The requstTimeout can not be null" );
109
+ Assert .notNull (requestTimeout , "The requestTimeout can not be null" );
112
110
Assert .notNull (transport , "The transport can not be null" );
113
111
Assert .notNull (requestHandlers , "The requestHandlers can not be null" );
114
112
Assert .notNull (notificationHandlers , "The notificationHandlers can not be null" );
@@ -123,33 +121,41 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
123
121
// Observation associated with the individual message - it can be used to
124
122
// create child Observation and emit it together with the message to the
125
123
// consumer
126
- this .connection = this .transport .connect (mono -> mono .doOnNext (message -> {
124
+ this .transport .connect (mono -> mono .doOnNext ((msg ) -> handle (msg ).subscribe ()));
125
+ }
126
+
127
+ public Mono <Void > handle (McpSchema .JSONRPCMessage message ) {
128
+ return Mono .defer (() -> {
127
129
if (message instanceof McpSchema .JSONRPCResponse response ) {
128
130
logger .debug ("Received Response: {}" , response );
129
131
var sink = pendingResponses .remove (response .id ());
130
132
if (sink == null ) {
131
- logger .warn ("Unexpected response for unkown id {}" , response .id ());
133
+ logger .warn ("Unexpected response for unknown id {}" , response .id ());
132
134
}
133
135
else {
134
136
sink .success (response );
135
137
}
138
+ return Mono .empty ();
136
139
}
137
140
else if (message instanceof McpSchema .JSONRPCRequest request ) {
138
141
logger .debug ("Received request: {}" , request );
139
- handleIncomingRequest (request ). flatMap ( transport :: sendMessage ).onErrorResume (error -> {
142
+ return handleIncomingRequest (request ).onErrorResume (error -> {
140
143
var errorResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION , request .id (), null ,
141
144
new McpSchema .JSONRPCResponse .JSONRPCError (McpSchema .ErrorCodes .INTERNAL_ERROR ,
142
145
error .getMessage (), null ));
143
- return transport .sendMessage (errorResponse );
144
- }).subscribe ();
145
-
146
+ return this .transport .sendMessage (errorResponse ).then (Mono .empty ());
147
+ }).flatMap (this .transport ::sendMessage );
146
148
}
147
149
else if (message instanceof McpSchema .JSONRPCNotification notification ) {
148
150
logger .debug ("Received notification: {}" , notification );
149
- handleIncomingNotification (notification ). subscribe ( null ,
150
- error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
151
+ return handleIncomingNotification (notification )
152
+ . doOnError ( error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
151
153
}
152
- })).subscribe ();
154
+ else {
155
+ logger .warn ("Received unknown message type: {}" , message );
156
+ return Mono .empty ();
157
+ }
158
+ });
153
159
}
154
160
155
161
/**
@@ -271,18 +277,14 @@ public Mono<Void> sendNotification(String method, Map<String, Object> params) {
271
277
*/
272
278
@ Override
273
279
public Mono <Void > closeGracefully () {
274
- return Mono .defer (() -> {
275
- this .connection .dispose ();
276
- return transport .closeGracefully ();
277
- });
280
+ return Mono .defer (transport ::closeGracefully );
278
281
}
279
282
280
283
/**
281
284
* Closes the session immediately, potentially interrupting pending operations.
282
285
*/
283
286
@ Override
284
287
public void close () {
285
- this .connection .dispose ();
286
288
transport .close ();
287
289
}
288
290
0 commit comments