From 66c18480858f8559bd711241b77e695f0b0a1b1f Mon Sep 17 00:00:00 2001 From: Olga MaciaszekSharma Date: Fri, 28 Apr 2023 17:34:45 +0200 Subject: [PATCH 1/2] Remove default blocking timeout on ResponseFunction. --- .../docs/asciidoc/integration/rest-clients.adoc | 5 +++++ .../web/service/invoker/HttpServiceMethod.java | 16 ++++++++++------ .../service/invoker/HttpServiceProxyFactory.java | 10 ++++++---- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/framework-docs/src/docs/asciidoc/integration/rest-clients.adoc b/framework-docs/src/docs/asciidoc/integration/rest-clients.adoc index 8a47198d4b10..2ee33cc1d6ac 100644 --- a/framework-docs/src/docs/asciidoc/integration/rest-clients.adoc +++ b/framework-docs/src/docs/asciidoc/integration/rest-clients.adoc @@ -497,6 +497,11 @@ Annotated, HTTP exchange methods support the following return values: TIP: You can also use any other async or reactive types registered in the `ReactiveAdapterRegistry`. +TIP: For non-reactive types, blocking from a reactive publisher is performed +under the hood by the framework. By default, it is done without a timeout. +You can set a timeout for blocking by calling `blockTimeout(Duration blockTimeout)` +on `HttpServiceProxyFactory.Builder`. + [[rest-http-interface-exceptions]] === Exception Handling diff --git a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java index 701d640d8979..b6341c8a4a17 100644 --- a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java +++ b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java @@ -71,7 +71,7 @@ final class HttpServiceMethod { HttpServiceMethod( Method method, Class containingClass, List argumentResolvers, HttpClientAdapter client, @Nullable StringValueResolver embeddedValueResolver, - ReactiveAdapterRegistry reactiveRegistry, Duration blockTimeout) { + ReactiveAdapterRegistry reactiveRegistry, @Nullable Duration blockTimeout) { this.method = method; this.parameters = initMethodParameters(method); @@ -129,7 +129,8 @@ private void applyArguments(HttpRequestValues.Builder requestValues, Object[] ar private static String formatArgumentError(MethodParameter param, String message) { return "Could not resolve parameter [" + param.getParameterIndex() + "] in " + - param.getExecutable().toGenericString() + (StringUtils.hasText(message) ? ": " + message : ""); + param.getExecutable() + .toGenericString() + (StringUtils.hasText(message) ? ": " + message : ""); } @@ -275,7 +276,7 @@ private static List initAccept(@Nullable HttpExchange typeAnnot, Http private record ResponseFunction( Function> responseFunction, @Nullable ReactiveAdapter returnTypeAdapter, - boolean blockForOptional, Duration blockTimeout) { + boolean blockForOptional, @Nullable Duration blockTimeout) { private ResponseFunction( Function> responseFunction, @@ -298,8 +299,10 @@ public Object execute(HttpRequestValues requestValues) { } return (this.blockForOptional ? - ((Mono) responsePublisher).blockOptional(this.blockTimeout) : - ((Mono) responsePublisher).block(this.blockTimeout)); + (this.blockTimeout != null ? ((Mono) responsePublisher).blockOptional(this.blockTimeout) : + ((Mono) responsePublisher).blockOptional()) : + (this.blockTimeout != null ? ((Mono) responsePublisher).block(this.blockTimeout) : + ((Mono) responsePublisher).block())); } @@ -364,7 +367,8 @@ private static Function> initResponseEntityFunct "ResponseEntity body must be a concrete value or a multi-value Publisher"); ParameterizedTypeReference bodyType = - ParameterizedTypeReference.forType(isSuspending ? methodParam.nested().getGenericParameterType() : + ParameterizedTypeReference.forType(isSuspending ? methodParam.nested() + .getGenericParameterType() : methodParam.nested().getNestedGenericParameterType()); // Shortcut for Flux diff --git a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java index 8c1d9a90cb48..3e2ef8fcaf98 100644 --- a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java +++ b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java @@ -66,13 +66,14 @@ public final class HttpServiceProxyFactory { private final ReactiveAdapterRegistry reactiveAdapterRegistry; + @Nullable private final Duration blockTimeout; private HttpServiceProxyFactory( HttpClientAdapter clientAdapter, List argumentResolvers, @Nullable StringValueResolver embeddedValueResolver, - ReactiveAdapterRegistry reactiveAdapterRegistry, Duration blockTimeout) { + ReactiveAdapterRegistry reactiveAdapterRegistry, @Nullable Duration blockTimeout) { this.clientAdapter = clientAdapter; this.argumentResolvers = argumentResolvers; @@ -92,7 +93,8 @@ private HttpServiceProxyFactory( public S createClient(Class serviceType) { List httpServiceMethods = - MethodIntrospector.selectMethods(serviceType, this::isExchangeMethod).stream() + MethodIntrospector.selectMethods(serviceType, this::isExchangeMethod) + .stream() .map(method -> createHttpServiceMethod(serviceType, method)) .toList(); @@ -208,7 +210,7 @@ public Builder reactiveAdapterRegistry(ReactiveAdapterRegistry registry) { /** * Configure how long to wait for a response for an HTTP service method * with a synchronous (blocking) method signature. - *

By default this is 5 seconds. + *

By default this is {@code null}, in which case means blocking on publishers is done without a timeout. * @param blockTimeout the timeout value * @return this same builder instance */ @@ -226,7 +228,7 @@ public HttpServiceProxyFactory build() { return new HttpServiceProxyFactory( this.clientAdapter, initArgumentResolvers(), this.embeddedValueResolver, this.reactiveAdapterRegistry, - (this.blockTimeout != null ? this.blockTimeout : Duration.ofSeconds(5))); + this.blockTimeout); } private List initArgumentResolvers() { From 9f1b711ea0266c0500bca82a26f1a596503eb700 Mon Sep 17 00:00:00 2001 From: Olga MaciaszekSharma Date: Fri, 28 Apr 2023 17:57:44 +0200 Subject: [PATCH 2/2] Remove default blocking timeout on RSocketServiceMethod. --- framework-docs/src/docs/asciidoc/rsocket.adoc | 5 +++++ .../rsocket/service/RSocketServiceMethod.java | 10 ++++++---- .../rsocket/service/RSocketServiceProxyFactory.java | 10 ++++++---- .../web/service/invoker/HttpServiceMethod.java | 6 ++---- .../web/service/invoker/HttpServiceProxyFactory.java | 6 +++--- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/framework-docs/src/docs/asciidoc/rsocket.adoc b/framework-docs/src/docs/asciidoc/rsocket.adoc index 4da739d73c71..3805a2be238d 100644 --- a/framework-docs/src/docs/asciidoc/rsocket.adoc +++ b/framework-docs/src/docs/asciidoc/rsocket.adoc @@ -979,3 +979,8 @@ method parameters: Annotated, RSocket exchange methods support return values that are concrete value(s), or any producer of value(s) that can be adapted to a Reactive Streams `Publisher` via `ReactiveAdapterRegistry`. + +TIP: For non-reactive types, blocking from a reactive publisher is performed +under the hood by the framework. By default, it is done without a timeout. +You can set a timeout for blocking by calling `blockTimeout(Duration blockTimeout)` +on `RSocketServiceProxyFactory.Builder`. \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java index 29b0c796e95b..db26f744f8a5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java @@ -67,7 +67,7 @@ final class RSocketServiceMethod { RSocketServiceMethod( Method method, Class containingClass, List argumentResolvers, RSocketRequester rsocketRequester, @Nullable StringValueResolver embeddedValueResolver, - ReactiveAdapterRegistry reactiveRegistry, Duration blockTimeout) { + ReactiveAdapterRegistry reactiveRegistry, @Nullable Duration blockTimeout) { this.method = method; this.parameters = initMethodParameters(method); @@ -125,7 +125,7 @@ private static String initRoute( private static Function initResponseFunction( RSocketRequester requester, Method method, - ReactiveAdapterRegistry reactiveRegistry, Duration blockTimeout) { + ReactiveAdapterRegistry reactiveRegistry, @Nullable Duration blockTimeout) { MethodParameter returnParam = new MethodParameter(method, -1); Class returnType = returnParam.getParameterType(); @@ -164,8 +164,10 @@ else if (reactiveAdapter == null) { return reactiveAdapter.fromPublisher(responsePublisher); } return (blockForOptional ? - ((Mono) responsePublisher).blockOptional(blockTimeout) : - ((Mono) responsePublisher).block(blockTimeout)); + (blockTimeout != null ? ((Mono) responsePublisher).blockOptional(blockTimeout) : + ((Mono) responsePublisher).blockOptional()) : + (blockTimeout != null ? ((Mono) responsePublisher).block(blockTimeout) : + ((Mono) responsePublisher).block())); }); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceProxyFactory.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceProxyFactory.java index f4480923f5b6..076cb862b2aa 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceProxyFactory.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceProxyFactory.java @@ -59,13 +59,14 @@ public final class RSocketServiceProxyFactory { private final ReactiveAdapterRegistry reactiveAdapterRegistry; + @Nullable private final Duration blockTimeout; private RSocketServiceProxyFactory( RSocketRequester rsocketRequester, List argumentResolvers, @Nullable StringValueResolver embeddedValueResolver, - ReactiveAdapterRegistry reactiveAdapterRegistry, Duration blockTimeout) { + ReactiveAdapterRegistry reactiveAdapterRegistry, @Nullable Duration blockTimeout) { this.rsocketRequester = rsocketRequester; this.argumentResolvers = argumentResolvers; @@ -139,7 +140,7 @@ public static final class Builder { private ReactiveAdapterRegistry reactiveAdapterRegistry = ReactiveAdapterRegistry.getSharedInstance(); @Nullable - private Duration blockTimeout = Duration.ofSeconds(5); + private Duration blockTimeout; private Builder() { } @@ -189,7 +190,8 @@ public Builder reactiveAdapterRegistry(ReactiveAdapterRegistry registry) { /** * Configure how long to wait for a response for an HTTP service method * with a synchronous (blocking) method signature. - *

By default this is 5 seconds. + *

By default this is {@code null}, + * in which case means blocking on publishers is done without a timeout. * @param blockTimeout the timeout value * @return this same builder instance */ @@ -207,7 +209,7 @@ public RSocketServiceProxyFactory build() { return new RSocketServiceProxyFactory( this.rsocketRequester, initArgumentResolvers(), this.embeddedValueResolver, this.reactiveAdapterRegistry, - (this.blockTimeout != null ? this.blockTimeout : Duration.ofSeconds(5))); + this.blockTimeout); } private List initArgumentResolvers() { diff --git a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java index b6341c8a4a17..2a01b0baeee5 100644 --- a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java +++ b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java @@ -129,8 +129,7 @@ private void applyArguments(HttpRequestValues.Builder requestValues, Object[] ar private static String formatArgumentError(MethodParameter param, String message) { return "Could not resolve parameter [" + param.getParameterIndex() + "] in " + - param.getExecutable() - .toGenericString() + (StringUtils.hasText(message) ? ": " + message : ""); + param.getExecutable().toGenericString() + (StringUtils.hasText(message) ? ": " + message : ""); } @@ -367,8 +366,7 @@ private static Function> initResponseEntityFunct "ResponseEntity body must be a concrete value or a multi-value Publisher"); ParameterizedTypeReference bodyType = - ParameterizedTypeReference.forType(isSuspending ? methodParam.nested() - .getGenericParameterType() : + ParameterizedTypeReference.forType(isSuspending ? methodParam.nested().getGenericParameterType() : methodParam.nested().getNestedGenericParameterType()); // Shortcut for Flux diff --git a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java index 3e2ef8fcaf98..51bb1e285b7b 100644 --- a/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java +++ b/spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java @@ -93,8 +93,7 @@ private HttpServiceProxyFactory( public S createClient(Class serviceType) { List httpServiceMethods = - MethodIntrospector.selectMethods(serviceType, this::isExchangeMethod) - .stream() + MethodIntrospector.selectMethods(serviceType, this::isExchangeMethod).stream() .map(method -> createHttpServiceMethod(serviceType, method)) .toList(); @@ -210,7 +209,8 @@ public Builder reactiveAdapterRegistry(ReactiveAdapterRegistry registry) { /** * Configure how long to wait for a response for an HTTP service method * with a synchronous (blocking) method signature. - *

By default this is {@code null}, in which case means blocking on publishers is done without a timeout. + *

By default this is {@code null}, + * in which case means blocking on publishers is done without a timeout. * @param blockTimeout the timeout value * @return this same builder instance */