@@ -108,7 +108,7 @@ public interface NotificationHandler {
108
108
public McpClientSession (Duration requestTimeout , McpClientTransport transport ,
109
109
Map <String , RequestHandler <?>> requestHandlers , Map <String , NotificationHandler > notificationHandlers ) {
110
110
111
- Assert .notNull (requestTimeout , "The requstTimeout can not be null" );
111
+ Assert .notNull (requestTimeout , "The requestTimeout can not be null" );
112
112
Assert .notNull (transport , "The transport can not be null" );
113
113
Assert .notNull (requestHandlers , "The requestHandlers can not be null" );
114
114
Assert .notNull (notificationHandlers , "The notificationHandlers can not be null" );
@@ -123,33 +123,41 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
123
123
// Observation associated with the individual message - it can be used to
124
124
// create child Observation and emit it together with the message to the
125
125
// consumer
126
- this .connection = this .transport .connect (mono -> mono .doOnNext (message -> {
126
+ this .connection = this .transport .connect (mono -> mono .doOnNext ((msg ) -> handle (msg ).subscribe ())).subscribe ();
127
+ }
128
+
129
+ public Mono <Void > handle (McpSchema .JSONRPCMessage message ) {
130
+ return Mono .defer (() -> {
127
131
if (message instanceof McpSchema .JSONRPCResponse response ) {
128
132
logger .debug ("Received Response: {}" , response );
129
133
var sink = pendingResponses .remove (response .id ());
130
134
if (sink == null ) {
131
- logger .warn ("Unexpected response for unkown id {}" , response .id ());
135
+ logger .warn ("Unexpected response for unknown id {}" , response .id ());
132
136
}
133
137
else {
134
138
sink .success (response );
135
139
}
140
+ return Mono .empty ();
136
141
}
137
142
else if (message instanceof McpSchema .JSONRPCRequest request ) {
138
143
logger .debug ("Received request: {}" , request );
139
- handleIncomingRequest (request ). flatMap ( transport :: sendMessage ).onErrorResume (error -> {
144
+ return handleIncomingRequest (request ).onErrorResume (error -> {
140
145
var errorResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION , request .id (), null ,
141
146
new McpSchema .JSONRPCResponse .JSONRPCError (McpSchema .ErrorCodes .INTERNAL_ERROR ,
142
147
error .getMessage (), null ));
143
- return transport .sendMessage (errorResponse );
144
- }).subscribe ();
145
-
148
+ return this .transport .sendMessage (errorResponse ).then (Mono .empty ());
149
+ }).flatMap (this .transport ::sendMessage );
146
150
}
147
151
else if (message instanceof McpSchema .JSONRPCNotification notification ) {
148
152
logger .debug ("Received notification: {}" , notification );
149
- handleIncomingNotification (notification ). subscribe ( null ,
150
- error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
153
+ return handleIncomingNotification (notification )
154
+ . doOnError ( error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
151
155
}
152
- })).subscribe ();
156
+ else {
157
+ logger .warn ("Received unknown message type: {}" , message );
158
+ return Mono .empty ();
159
+ }
160
+ });
153
161
}
154
162
155
163
/**
0 commit comments