Skip to content

HTTP interface client should not have blockingTimeout set and leave it to the underlying client by default #30403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -497,6 +497,11 @@ Annotated, HTTP exchange methods support the following return values:
TIP: You can also use any other async or reactive types registered in the
`ReactiveAdapterRegistry`.

TIP: For non-reactive types, blocking from a reactive publisher is performed
under the hood by the framework. By default, it is done without a timeout.
You can set a timeout for blocking by calling `blockTimeout(Duration blockTimeout)`
on `HttpServiceProxyFactory.Builder`.


[[rest-http-interface-exceptions]]
=== Exception Handling
5 changes: 5 additions & 0 deletions framework-docs/src/docs/asciidoc/rsocket.adoc
Original file line number Diff line number Diff line change
@@ -979,3 +979,8 @@ method parameters:
Annotated, RSocket exchange methods support return values that are concrete value(s), or
any producer of value(s) that can be adapted to a Reactive Streams `Publisher` via
`ReactiveAdapterRegistry`.

TIP: For non-reactive types, blocking from a reactive publisher is performed
under the hood by the framework. By default, it is done without a timeout.
You can set a timeout for blocking by calling `blockTimeout(Duration blockTimeout)`
on `RSocketServiceProxyFactory.Builder`.
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ final class RSocketServiceMethod {
RSocketServiceMethod(
Method method, Class<?> containingClass, List<RSocketServiceArgumentResolver> argumentResolvers,
RSocketRequester rsocketRequester, @Nullable StringValueResolver embeddedValueResolver,
ReactiveAdapterRegistry reactiveRegistry, Duration blockTimeout) {
ReactiveAdapterRegistry reactiveRegistry, @Nullable Duration blockTimeout) {

this.method = method;
this.parameters = initMethodParameters(method);
@@ -125,7 +125,7 @@ private static String initRoute(

private static Function<RSocketRequestValues, Object> initResponseFunction(
RSocketRequester requester, Method method,
ReactiveAdapterRegistry reactiveRegistry, Duration blockTimeout) {
ReactiveAdapterRegistry reactiveRegistry, @Nullable Duration blockTimeout) {

MethodParameter returnParam = new MethodParameter(method, -1);
Class<?> returnType = returnParam.getParameterType();
@@ -164,8 +164,10 @@ else if (reactiveAdapter == null) {
return reactiveAdapter.fromPublisher(responsePublisher);
}
return (blockForOptional ?
((Mono<?>) responsePublisher).blockOptional(blockTimeout) :
((Mono<?>) responsePublisher).block(blockTimeout));
(blockTimeout != null ? ((Mono<?>) responsePublisher).blockOptional(blockTimeout) :
((Mono<?>) responsePublisher).blockOptional()) :
(blockTimeout != null ? ((Mono<?>) responsePublisher).block(blockTimeout) :
((Mono<?>) responsePublisher).block()));
});
}

Original file line number Diff line number Diff line change
@@ -59,13 +59,14 @@ public final class RSocketServiceProxyFactory {

private final ReactiveAdapterRegistry reactiveAdapterRegistry;

@Nullable
private final Duration blockTimeout;


private RSocketServiceProxyFactory(
RSocketRequester rsocketRequester, List<RSocketServiceArgumentResolver> argumentResolvers,
@Nullable StringValueResolver embeddedValueResolver,
ReactiveAdapterRegistry reactiveAdapterRegistry, Duration blockTimeout) {
ReactiveAdapterRegistry reactiveAdapterRegistry, @Nullable Duration blockTimeout) {

this.rsocketRequester = rsocketRequester;
this.argumentResolvers = argumentResolvers;
@@ -139,7 +140,7 @@ public static final class Builder {
private ReactiveAdapterRegistry reactiveAdapterRegistry = ReactiveAdapterRegistry.getSharedInstance();

@Nullable
private Duration blockTimeout = Duration.ofSeconds(5);
private Duration blockTimeout;

private Builder() {
}
@@ -189,7 +190,8 @@ public Builder reactiveAdapterRegistry(ReactiveAdapterRegistry registry) {
/**
* Configure how long to wait for a response for an HTTP service method
* with a synchronous (blocking) method signature.
* <p>By default this is 5 seconds.
* <p>By default this is {@code null},
* in which case means blocking on publishers is done without a timeout.
* @param blockTimeout the timeout value
* @return this same builder instance
*/
@@ -207,7 +209,7 @@ public RSocketServiceProxyFactory build() {
return new RSocketServiceProxyFactory(
this.rsocketRequester, initArgumentResolvers(),
this.embeddedValueResolver, this.reactiveAdapterRegistry,
(this.blockTimeout != null ? this.blockTimeout : Duration.ofSeconds(5)));
this.blockTimeout);
}

private List<RSocketServiceArgumentResolver> initArgumentResolvers() {
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ final class HttpServiceMethod {
HttpServiceMethod(
Method method, Class<?> containingClass, List<HttpServiceArgumentResolver> argumentResolvers,
HttpClientAdapter client, @Nullable StringValueResolver embeddedValueResolver,
ReactiveAdapterRegistry reactiveRegistry, Duration blockTimeout) {
ReactiveAdapterRegistry reactiveRegistry, @Nullable Duration blockTimeout) {

this.method = method;
this.parameters = initMethodParameters(method);
@@ -275,7 +275,7 @@ private static List<MediaType> initAccept(@Nullable HttpExchange typeAnnot, Http
private record ResponseFunction(
Function<HttpRequestValues, Publisher<?>> responseFunction,
@Nullable ReactiveAdapter returnTypeAdapter,
boolean blockForOptional, Duration blockTimeout) {
boolean blockForOptional, @Nullable Duration blockTimeout) {

private ResponseFunction(
Function<HttpRequestValues, Publisher<?>> responseFunction,
@@ -298,8 +298,10 @@ public Object execute(HttpRequestValues requestValues) {
}

return (this.blockForOptional ?
((Mono<?>) responsePublisher).blockOptional(this.blockTimeout) :
((Mono<?>) responsePublisher).block(this.blockTimeout));
(this.blockTimeout != null ? ((Mono<?>) responsePublisher).blockOptional(this.blockTimeout) :
((Mono<?>) responsePublisher).blockOptional()) :
(this.blockTimeout != null ? ((Mono<?>) responsePublisher).block(this.blockTimeout) :
((Mono<?>) responsePublisher).block()));
}


Original file line number Diff line number Diff line change
@@ -66,13 +66,14 @@ public final class HttpServiceProxyFactory {

private final ReactiveAdapterRegistry reactiveAdapterRegistry;

@Nullable
private final Duration blockTimeout;


private HttpServiceProxyFactory(
HttpClientAdapter clientAdapter, List<HttpServiceArgumentResolver> argumentResolvers,
@Nullable StringValueResolver embeddedValueResolver,
ReactiveAdapterRegistry reactiveAdapterRegistry, Duration blockTimeout) {
ReactiveAdapterRegistry reactiveAdapterRegistry, @Nullable Duration blockTimeout) {

this.clientAdapter = clientAdapter;
this.argumentResolvers = argumentResolvers;
@@ -208,7 +209,8 @@ public Builder reactiveAdapterRegistry(ReactiveAdapterRegistry registry) {
/**
* Configure how long to wait for a response for an HTTP service method
* with a synchronous (blocking) method signature.
* <p>By default this is 5 seconds.
* <p>By default this is {@code null},
* in which case means blocking on publishers is done without a timeout.
* @param blockTimeout the timeout value
* @return this same builder instance
*/
@@ -226,7 +228,7 @@ public HttpServiceProxyFactory build() {
return new HttpServiceProxyFactory(
this.clientAdapter, initArgumentResolvers(),
this.embeddedValueResolver, this.reactiveAdapterRegistry,
(this.blockTimeout != null ? this.blockTimeout : Duration.ofSeconds(5)));
this.blockTimeout);
}

private List<HttpServiceArgumentResolver> initArgumentResolvers() {