Skip to content

Commit 1bc08c6

Browse files
committed
Add ReactorResourceFactory
Issue: SPR-16963
1 parent bdac391 commit 1bc08c6

File tree

2 files changed

+194
-6
lines changed

2 files changed

+194
-6
lines changed

Diff for: spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

+41-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import reactor.netty.http.client.HttpClient;
2727
import reactor.netty.http.client.HttpClientRequest;
2828
import reactor.netty.http.client.HttpClientResponse;
29+
import reactor.netty.resources.ConnectionProvider;
30+
import reactor.netty.resources.LoopResources;
31+
import reactor.netty.tcp.TcpClient;
2932

3033
import org.springframework.http.HttpMethod;
3134
import org.springframework.util.Assert;
@@ -39,21 +42,53 @@
3942
*/
4043
public class ReactorClientHttpConnector implements ClientHttpConnector {
4144

45+
private final static Function<HttpClient, HttpClient> defaultInitializer = HttpClient::compress;
46+
47+
4248
private final HttpClient httpClient;
4349

4450

4551
/**
46-
* Create a Reactor Netty {@link ClientHttpConnector}
47-
* with default configuration and HTTP compression support enabled.
52+
* Default constructor that initializes an {@link HttpClient} with:
53+
* <pre class="code">
54+
* HttpClient.create().compress()
55+
* </pre>
4856
*/
4957
public ReactorClientHttpConnector() {
50-
this.httpClient = HttpClient.create().compress();
58+
this.httpClient = defaultInitializer.apply(HttpClient.create());
59+
}
60+
61+
/**
62+
* Constructor with externally managed Reactor Netty resources, including
63+
* {@link LoopResources} for event loop threads, and {@link ConnectionProvider}
64+
* for the connection pool.
65+
* <p>This constructor should be used only when you don't want the client
66+
* to participate in the Reactor Netty global resources. By default the
67+
* client participates in the Reactor Netty global resources held in
68+
* {@link reactor.netty.http.HttpResources}, which is recommended since
69+
* fixed, shared resources are favored for event loop concurrency. However,
70+
* consider declaring a {@link ReactorResourceFactory} bean with
71+
* {@code globaResources=true} in order to ensure the Reactor Netty global
72+
* resources are shut down when the Spring ApplicationContext is closed.
73+
* @param factory the resource factory to obtain the resources from
74+
* @param mapper a mapper for further initialization of the created client
75+
* @since 5.1
76+
*/
77+
public ReactorClientHttpConnector(ReactorResourceFactory factory, Function<HttpClient, HttpClient> mapper) {
78+
this.httpClient = defaultInitializer.andThen(mapper).apply(initHttpClient(factory));
79+
}
80+
81+
private static HttpClient initHttpClient(ReactorResourceFactory resourceFactory) {
82+
ConnectionProvider provider = resourceFactory.getConnectionProvider();
83+
LoopResources resources = resourceFactory.getLoopResources();
84+
Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
85+
Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
86+
return HttpClient.create(provider).tcpConfiguration(tcpClient -> tcpClient.runOn(resources)).compress();
5187
}
5288

5389
/**
54-
* Create a Reactor Netty {@link ClientHttpConnector} with a fully
55-
* configured {@code HttpClient}.
56-
* @param httpClient the client instance to use
90+
* Constructor with a pre-configured {@code HttpClient} instance.
91+
* @param httpClient the client to use
5792
* @since 5.1
5893
*/
5994
public ReactorClientHttpConnector(HttpClient httpClient) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2002-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.http.client.reactive;
17+
18+
import reactor.netty.http.HttpResources;
19+
import reactor.netty.resources.ConnectionProvider;
20+
import reactor.netty.resources.LoopResources;
21+
22+
import org.springframework.beans.factory.DisposableBean;
23+
import org.springframework.beans.factory.InitializingBean;
24+
import org.springframework.lang.Nullable;
25+
import org.springframework.util.Assert;
26+
27+
/**
28+
* Factory to manage Reactor Netty resources, i.e. {@link LoopResources} for
29+
* event loop threads, and {@link ConnectionProvider} for the connection pool,
30+
* within the lifecycle of a Spring {@code ApplicationContext}.
31+
*
32+
* <p>This factory implements {@link InitializingBean} and {@link DisposableBean}
33+
* and is expected typically to be declared as a Spring-managed bean.
34+
*
35+
* @author Rossen Stoyanchev
36+
* @since 5.1
37+
*/
38+
public class ReactorResourceFactory implements InitializingBean, DisposableBean {
39+
40+
private boolean globalResources = true;
41+
42+
@Nullable
43+
private ConnectionProvider connectionProvider;
44+
45+
@Nullable
46+
private LoopResources loopResources;
47+
48+
private String threadPrefix = "reactor-http";
49+
50+
51+
/**
52+
* Whether to expose and manage the global Reactor Netty resources from the
53+
* {@link HttpResources} holder.
54+
* <p>Default is "true" in which case this factory helps to configure and
55+
* shut down the global Reactor Netty resources within the lifecycle of a
56+
* Spring {@code ApplicationContext}.
57+
* <p>If set to "false" then the factory creates and manages its own
58+
* {@link LoopResources} and {@link ConnectionProvider}, independent of the
59+
* global ones in the {@link HttpResources} holder.
60+
* @param globalResources whether to expose and manage the global resources
61+
*/
62+
public void setGlobalResources(boolean globalResources) {
63+
this.globalResources = globalResources;
64+
}
65+
66+
/**
67+
* Configure the {@link ConnectionProvider} to use.
68+
* <p>By default, initialized with {@link ConnectionProvider#elastic(String)}.
69+
* @param connectionProvider the connection provider to use
70+
*/
71+
public void setConnectionProvider(@Nullable ConnectionProvider connectionProvider) {
72+
this.connectionProvider = connectionProvider;
73+
}
74+
75+
/**
76+
* Configure the {@link LoopResources} to use.
77+
* <p>By default, initialized with {@link LoopResources#create(String)}.
78+
* @param loopResources the loop resources to use
79+
*/
80+
public void setLoopResources(@Nullable LoopResources loopResources) {
81+
this.loopResources = loopResources;
82+
}
83+
84+
/**
85+
* Configure the thread prefix to initialize {@link LoopResources} with. This
86+
* is used only when a {@link LoopResources} instance isn't
87+
* {@link #setLoopResources(LoopResources) provided}.
88+
* <p>By default set to "reactor-http".
89+
* @param threadPrefix the thread prefix to use
90+
*/
91+
public void setThreadPrefix(String threadPrefix) {
92+
Assert.notNull(threadPrefix, "Thread prefix is required");
93+
this.threadPrefix = threadPrefix;
94+
}
95+
96+
97+
/**
98+
* Whether this factory exposes the global
99+
* {@link reactor.netty.http.HttpResources HttpResources} holder.
100+
*/
101+
public boolean isGlobalResources() {
102+
return this.globalResources;
103+
}
104+
105+
/**
106+
* Return the configured {@link ConnectionProvider}.
107+
*/
108+
@Nullable
109+
public ConnectionProvider getConnectionProvider() {
110+
return this.connectionProvider;
111+
}
112+
113+
/**
114+
* Return the configured {@link LoopResources}.
115+
*/
116+
@Nullable
117+
public LoopResources getLoopResources() {
118+
return this.loopResources;
119+
}
120+
121+
/**
122+
* Return the configured prefix for event loop threads.
123+
*/
124+
public String getThreadPrefix() {
125+
return this.threadPrefix;
126+
}
127+
128+
129+
@Override
130+
public void afterPropertiesSet() throws Exception {
131+
if (this.loopResources == null) {
132+
this.loopResources = LoopResources.create(this.threadPrefix);
133+
}
134+
if (this.connectionProvider == null) {
135+
this.connectionProvider = ConnectionProvider.elastic("http");
136+
}
137+
if (this.globalResources) {
138+
HttpResources.set(this.loopResources);
139+
HttpResources.set(this.connectionProvider);
140+
}
141+
}
142+
143+
@Override
144+
public void destroy() {
145+
146+
Assert.notNull(this.connectionProvider, "No ConnectionProvider");
147+
this.connectionProvider.dispose();
148+
149+
Assert.notNull(this.loopResources, "No LoopResources");
150+
this.loopResources.dispose();
151+
}
152+
153+
}

0 commit comments

Comments
 (0)