diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java index 03c1d873944..ebc1da0c6f3 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java @@ -55,6 +55,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** * The base {@link AbstractMessageHandler} implementation for the {@link MessageProducer}. @@ -362,7 +363,9 @@ private void asyncNonReactiveReply(Message requestMessage, Object reply, @Nul else { reactiveReply = Mono.from((Publisher) reply); } - reactiveReply.subscribe(settableListenableFuture::set, settableListenableFuture::setException); + reactiveReply + .publishOn(Schedulers.boundedElastic()) + .subscribe(settableListenableFuture::set, settableListenableFuture::setException); future = settableListenableFuture; } future.addCallback(new ReplyFutureCallback(requestMessage, replyChannel)); diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java index f198d21f333..99c1713c908 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.rsocket.dsl; +import static org.assertj.core.api.Assertions.assertThat; + import java.time.Duration; import java.util.function.Function; @@ -28,6 +30,7 @@ import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.context.IntegrationFlowContext; import org.springframework.integration.rsocket.ClientRSocketConnector; import org.springframework.integration.rsocket.RSocketInteractionModel; import org.springframework.integration.rsocket.ServerRSocketConnector; @@ -74,6 +77,39 @@ void testRsocketUpperCaseWholeFlows() { .verifyComplete(); } + @Autowired + IntegrationFlowContext integrationFlowContext; + + @Autowired + ClientRSocketConnector clientRSocketConnector; + + @Test + void testNoBlockingForReactiveThreads() { + IntegrationFlow flow = + f -> f + .handle(RSockets.outboundGateway("/lowercase") + .clientRSocketConnector(this.clientRSocketConnector)) + .transform("{ firstResult: payload }") + .enrich(e -> e + .requestPayloadExpression("payload.firstResult") + .requestSubFlow( + sf -> sf + .handle(RSockets.outboundGateway("/lowercase") + .clientRSocketConnector(this.clientRSocketConnector))) + .propertyExpression("secondResult", "payload")) + .transform("payload.values().toString()"); + + IntegrationFlowContext.IntegrationFlowRegistration flowRegistration = + this.integrationFlowContext.registration(flow).register(); + + String result = flowRegistration.getMessagingTemplate().convertSendAndReceive("TEST", String.class); + + assertThat(result).isEqualTo("[test, test]"); + + flowRegistration.destroy(); + } + + @Configuration @EnableIntegration public static class TestConfiguration { @@ -96,7 +132,7 @@ public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector client return IntegrationFlows .from(Function.class) .handle(RSockets.outboundGateway(message -> - message.getHeaders().getOrDefault("route", "/uppercase")) + message.getHeaders().getOrDefault("route", "/uppercase")) .interactionModel((message) -> RSocketInteractionModel.requestChannel) .expectedResponseType("T(java.lang.String)") .clientRSocketConnector(clientRSocketConnector), @@ -126,6 +162,14 @@ public IntegrationFlow rsocketUpperCaseWholeFlow() { .get(); } + @Bean + public IntegrationFlow rsocketLowerCaseFlow() { + return IntegrationFlows + .from(RSockets.inboundGateway("/lowercase")) + ., Flux>transform((flux) -> flux.map(String::toLowerCase)) + .get(); + } + } }