diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java index 033139ad..18ec06c6 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java @@ -446,18 +446,9 @@ void testNotificationHandlers() { resources -> Mono.fromRunnable(() -> resourcesNotificationReceived.set(true))) .promptsChangeConsumer(prompts -> Mono.fromRunnable(() -> promptsNotificationReceived.set(true))), mcpAsyncClient -> { - - var transport = createMcpTransport(); - var client = McpClient.async(transport) - .requestTimeout(getRequestTimeout()) - .toolsChangeConsumer(tools -> Mono.fromRunnable(() -> toolsNotificationReceived.set(true))) - .resourcesChangeConsumer( - resources -> Mono.fromRunnable(() -> resourcesNotificationReceived.set(true))) - .promptsChangeConsumer( - prompts -> Mono.fromRunnable(() -> promptsNotificationReceived.set(true))) - .build(); - - StepVerifier.create(client.initialize()).expectNextMatches(Objects::nonNull).verifyComplete(); + StepVerifier.create(mcpAsyncClient.initialize()) + .expectNextMatches(Objects::nonNull) + .verifyComplete(); }); } diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java index 032f8684..191de23b 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java @@ -119,15 +119,20 @@ void verifyNotificationTimesOut(Consumer operation, String ac }, action); } - void verifyCallTimesOut(Function operation, String action) { + void verifyCallTimesOut(Function blockingOperation, String action) { withClient(createMcpTransport(), mcpSyncClient -> { // This scheduler is not replaced by virtual time scheduler Scheduler customScheduler = Schedulers.newBoundedElastic(1, 1, "actualBoundedElastic"); - StepVerifier.withVirtualTime(() -> Mono.fromSupplier(() -> operation.apply(mcpSyncClient)) - // offload the blocking call to the real scheduler + StepVerifier.withVirtualTime(() -> Mono.fromSupplier(() -> blockingOperation.apply(mcpSyncClient)) + // Offload the blocking call to the real scheduler .subscribeOn(customScheduler)) .expectSubscription() + // This works without actually waiting but executes all the + // tasks pending execution on the VirtualTimeScheduler. + // It is possible to execute the blocking code from the operation + // because it is blocked on a dedicated Scheduler and the main + // flow is not blocked and uses the VirtualTimeScheduler. .thenAwait(getInitializationTimeout()) .consumeErrorWith(e -> assertThat(e).isInstanceOf(McpError.class) .hasMessage("Client must be initialized before " + action)) diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java index 72038854..06a231ed 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java @@ -402,18 +402,9 @@ void testNotificationHandlers() { resources -> Mono.fromRunnable(() -> resourcesNotificationReceived.set(true))) .promptsChangeConsumer(prompts -> Mono.fromRunnable(() -> promptsNotificationReceived.set(true))), mcpAsyncClient -> { - - var transport = createMcpTransport(); - var client = McpClient.async(transport) - .requestTimeout(getRequestTimeout()) - .toolsChangeConsumer(tools -> Mono.fromRunnable(() -> toolsNotificationReceived.set(true))) - .resourcesChangeConsumer( - resources -> Mono.fromRunnable(() -> resourcesNotificationReceived.set(true))) - .promptsChangeConsumer( - prompts -> Mono.fromRunnable(() -> promptsNotificationReceived.set(true))) - .build(); - - StepVerifier.create(client.initialize()).expectNextMatches(Objects::nonNull).verifyComplete(); + StepVerifier.create(mcpAsyncClient.initialize()) + .expectNextMatches(Objects::nonNull) + .verifyComplete(); }); } diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java index 1c042bf2..f4d8dbdb 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java @@ -120,15 +120,20 @@ void verifyNotificationTimesOut(Consumer operation, String ac }, action); } - void verifyCallTimesOut(Function operation, String action) { + void verifyCallTimesOut(Function blockingOperation, String action) { withClient(createMcpTransport(), mcpSyncClient -> { // This scheduler is not replaced by virtual time scheduler Scheduler customScheduler = Schedulers.newBoundedElastic(1, 1, "actualBoundedElastic"); - StepVerifier.withVirtualTime(() -> Mono.fromSupplier(() -> operation.apply(mcpSyncClient)) - // offload the blocking call to the real scheduler + StepVerifier.withVirtualTime(() -> Mono.fromSupplier(() -> blockingOperation.apply(mcpSyncClient)) + // Offload the blocking call to the real scheduler .subscribeOn(customScheduler)) .expectSubscription() + // This works without actually waiting but executes all the + // tasks pending execution on the VirtualTimeScheduler. + // It is possible to execute the blocking code from the operation + // because it is blocked on a dedicated Scheduler and the main + // flow is not blocked and uses the VirtualTimeScheduler. .thenAwait(getInitializationTimeout()) .consumeErrorWith(e -> assertThat(e).isInstanceOf(McpError.class) .hasMessage("Client must be initialized before " + action))