Skip to content

Commit 518b6fb

Browse files
FH-30francishodianto30
authored and
francishodianto30
committed
fix: propagate reactor context up till transport
1 parent f348a83 commit 518b6fb

File tree

2 files changed

+47
-46
lines changed

2 files changed

+47
-46
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public class McpAsyncClient {
154154
* @param features the MCP Client supported features.
155155
*/
156156
McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
157-
McpClientFeatures.Async features) {
157+
McpClientFeatures.Async features) {
158158

159159
Assert.notNull(transport, "Transport must not be null");
160160
Assert.notNull(requestTimeout, "Request timeout must not be null");
@@ -189,7 +189,7 @@ public class McpAsyncClient {
189189
// Tools Change Notification
190190
List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumersFinal = new ArrayList<>();
191191
toolsChangeConsumersFinal
192-
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Tools changed: {}", notification)));
192+
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Tools changed: {}", notification)));
193193

194194
if (!Utils.isEmpty(features.toolsChangeConsumers())) {
195195
toolsChangeConsumersFinal.addAll(features.toolsChangeConsumers());
@@ -200,7 +200,7 @@ public class McpAsyncClient {
200200
// Resources Change Notification
201201
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumersFinal = new ArrayList<>();
202202
resourcesChangeConsumersFinal
203-
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources changed: {}", notification)));
203+
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources changed: {}", notification)));
204204

205205
if (!Utils.isEmpty(features.resourcesChangeConsumers())) {
206206
resourcesChangeConsumersFinal.addAll(features.resourcesChangeConsumers());
@@ -212,7 +212,7 @@ public class McpAsyncClient {
212212
// Prompts Change Notification
213213
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumersFinal = new ArrayList<>();
214214
promptsChangeConsumersFinal
215-
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Prompts changed: {}", notification)));
215+
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Prompts changed: {}", notification)));
216216
if (!Utils.isEmpty(features.promptsChangeConsumers())) {
217217
promptsChangeConsumersFinal.addAll(features.promptsChangeConsumers());
218218
}
@@ -355,12 +355,12 @@ public Mono<McpSchema.InitializeResult> initialize() {
355355
* @return A Mono that completes with the result of the operation
356356
*/
357357
private <T> Mono<T> withInitializationCheck(String actionName,
358-
Function<McpSchema.InitializeResult, Mono<T>> operation) {
358+
Function<McpSchema.InitializeResult, Mono<T>> operation) {
359359
return this.initializedSink.asMono()
360-
.timeout(this.initializationTimeout)
361-
.onErrorResume(TimeoutException.class,
362-
ex -> Mono.error(new McpError("Client must be initialized before " + actionName)))
363-
.flatMap(operation);
360+
.timeout(this.initializationTimeout)
361+
.onErrorResume(TimeoutException.class,
362+
ex -> Mono.error(new McpError("Client must be initialized before " + actionName)))
363+
.flatMap(operation);
364364
}
365365

366366
// --------------------------
@@ -373,8 +373,8 @@ private <T> Mono<T> withInitializationCheck(String actionName,
373373
*/
374374
public Mono<Object> ping() {
375375
return this.withInitializationCheck("pinging the server", initializedResult -> this.mcpSession
376-
.sendRequest(McpSchema.METHOD_PING, null, new TypeReference<Object>() {
377-
}));
376+
.sendRequest(McpSchema.METHOD_PING, null, new TypeReference<Object>() {
377+
}));
378378
}
379379

380380
// --------------------------
@@ -540,13 +540,13 @@ private NotificationHandler asyncToolsChangeNotificationHandler(
540540
List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers) {
541541
// TODO: params are not used yet
542542
return params -> this.listTools()
543-
.flatMap(listToolsResult -> Flux.fromIterable(toolsChangeConsumers)
544-
.flatMap(consumer -> consumer.apply(listToolsResult.tools()))
545-
.onErrorResume(error -> {
546-
logger.error("Error handling tools list change notification", error);
547-
return Mono.empty();
548-
})
549-
.then());
543+
.flatMap(listToolsResult -> Flux.fromIterable(toolsChangeConsumers)
544+
.flatMap(consumer -> consumer.apply(listToolsResult.tools()))
545+
.onErrorResume(error -> {
546+
logger.error("Error handling tools list change notification", error);
547+
return Mono.empty();
548+
})
549+
.then());
550550
}
551551

552552
// --------------------------
@@ -664,7 +664,7 @@ public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates(String
664664
*/
665665
public Mono<Void> subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
666666
return this.withInitializationCheck("subscribing to resources", initializedResult -> this.mcpSession
667-
.sendRequest(McpSchema.METHOD_RESOURCES_SUBSCRIBE, subscribeRequest, VOID_TYPE_REFERENCE));
667+
.sendRequest(McpSchema.METHOD_RESOURCES_SUBSCRIBE, subscribeRequest, VOID_TYPE_REFERENCE));
668668
}
669669

670670
/**
@@ -678,18 +678,18 @@ public Mono<Void> subscribeResource(McpSchema.SubscribeRequest subscribeRequest)
678678
*/
679679
public Mono<Void> unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
680680
return this.withInitializationCheck("unsubscribing from resources", initializedResult -> this.mcpSession
681-
.sendRequest(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, unsubscribeRequest, VOID_TYPE_REFERENCE));
681+
.sendRequest(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, unsubscribeRequest, VOID_TYPE_REFERENCE));
682682
}
683683

684684
private NotificationHandler asyncResourcesChangeNotificationHandler(
685685
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers) {
686686
return params -> listResources().flatMap(listResourcesResult -> Flux.fromIterable(resourcesChangeConsumers)
687-
.flatMap(consumer -> consumer.apply(listResourcesResult.resources()))
688-
.onErrorResume(error -> {
689-
logger.error("Error handling resources list change notification", error);
690-
return Mono.empty();
691-
})
692-
.then());
687+
.flatMap(consumer -> consumer.apply(listResourcesResult.resources()))
688+
.onErrorResume(error -> {
689+
logger.error("Error handling resources list change notification", error);
690+
return Mono.empty();
691+
})
692+
.then());
693693
}
694694

695695
// --------------------------
@@ -720,7 +720,7 @@ public Mono<ListPromptsResult> listPrompts() {
720720
*/
721721
public Mono<ListPromptsResult> listPrompts(String cursor) {
722722
return this.withInitializationCheck("listing prompts", initializedResult -> this.mcpSession
723-
.sendRequest(McpSchema.METHOD_PROMPT_LIST, new PaginatedRequest(cursor), LIST_PROMPTS_RESULT_TYPE_REF));
723+
.sendRequest(McpSchema.METHOD_PROMPT_LIST, new PaginatedRequest(cursor), LIST_PROMPTS_RESULT_TYPE_REF));
724724
}
725725

726726
/**
@@ -734,18 +734,18 @@ public Mono<ListPromptsResult> listPrompts(String cursor) {
734734
*/
735735
public Mono<GetPromptResult> getPrompt(GetPromptRequest getPromptRequest) {
736736
return this.withInitializationCheck("getting prompts", initializedResult -> this.mcpSession
737-
.sendRequest(McpSchema.METHOD_PROMPT_GET, getPromptRequest, GET_PROMPT_RESULT_TYPE_REF));
737+
.sendRequest(McpSchema.METHOD_PROMPT_GET, getPromptRequest, GET_PROMPT_RESULT_TYPE_REF));
738738
}
739739

740740
private NotificationHandler asyncPromptsChangeNotificationHandler(
741741
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers) {
742742
return params -> listPrompts().flatMap(listPromptsResult -> Flux.fromIterable(promptsChangeConsumers)
743-
.flatMap(consumer -> consumer.apply(listPromptsResult.prompts()))
744-
.onErrorResume(error -> {
745-
logger.error("Error handling prompts list change notification", error);
746-
return Mono.empty();
747-
})
748-
.then());
743+
.flatMap(consumer -> consumer.apply(listPromptsResult.prompts()))
744+
.onErrorResume(error -> {
745+
logger.error("Error handling prompts list change notification", error);
746+
return Mono.empty();
747+
})
748+
.then());
749749
}
750750

751751
// --------------------------
@@ -768,8 +768,8 @@ private NotificationHandler asyncLoggingNotificationHandler(
768768
});
769769

770770
return Flux.fromIterable(loggingConsumers)
771-
.flatMap(consumer -> consumer.apply(loggingMessageNotification))
772-
.then();
771+
.flatMap(consumer -> consumer.apply(loggingMessageNotification))
772+
.then();
773773
};
774774
}
775775

@@ -801,4 +801,4 @@ void setProtocolVersions(List<String> protocolVersions) {
801801
this.protocolVersions = protocolVersions;
802802
}
803803

804-
}
804+
}

mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -225,18 +225,19 @@ private String generateRequestId() {
225225
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
226226
String requestId = this.generateRequestId();
227227

228-
return Mono.<McpSchema.JSONRPCResponse>create(sink -> {
228+
return Mono.deferContextual(ctx -> Mono.<McpSchema.JSONRPCResponse>create(sink -> {
229229
this.pendingResponses.put(requestId, sink);
230230
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method,
231231
requestId, requestParams);
232232
this.transport.sendMessage(jsonrpcRequest)
233-
// TODO: It's most efficient to create a dedicated Subscriber here
234-
.subscribe(v -> {
235-
}, error -> {
236-
this.pendingResponses.remove(requestId);
237-
sink.error(error);
238-
});
239-
}).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> {
233+
.contextWrite(ctx)
234+
// TODO: It's most efficient to create a dedicated Subscriber here
235+
.subscribe(v -> {
236+
}, error -> {
237+
this.pendingResponses.remove(requestId);
238+
sink.error(error);
239+
});
240+
})).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> {
240241
if (jsonRpcResponse.error() != null) {
241242
sink.error(new McpError(jsonRpcResponse.error()));
242243
}
@@ -285,4 +286,4 @@ public void close() {
285286
transport.close();
286287
}
287288

288-
}
289+
}

0 commit comments

Comments
 (0)