Skip to content

Commit 14a6ca0

Browse files
committed
Fix Watcher HTTP connection config for longevity (#72736)
Watcher uses a connection pool for outgoing HTTP traffic, which means that some HTTP connections may live for a long time, possibly in an idle state. Such connections may be silently torn down by a remote device, so that when we re-use them we encounter a `Connection reset` or similar error. This commit introduces a setting allowing users to set a finite expiry time on these connections, and also enables TCP keepalives on them by default so that a remote teardown will be actively detected sooner. Closes #52997
1 parent cf6099a commit 14a6ca0

File tree

6 files changed

+92
-2
lines changed

6 files changed

+92
-2
lines changed

docs/reference/settings/notification-settings.asciidoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,16 @@ connection is being initiated.
8080
The maximum period of inactivity between two data packets, before the
8181
request is aborted.
8282

83+
`xpack.http.tcp.keep_alive`
84+
(<<static-cluster-setting,Static>>)
85+
Whether to enable TCP keepalives on HTTP connections. Defaults to `true`.
86+
87+
`xpack.http.connection_pool_ttl`
88+
(<<static-cluster-setting,Static>>)
89+
The time-to-live of connections in the connection pool. If a connection is not
90+
re-used within this timeout, it is closed. By default, the time-to-live is
91+
infinite meaning that connections never expire.
92+
8393
`xpack.http.max_response_size`::
8494
(<<static-cluster-setting,Static>>)
8595
Specifies the maximum size an HTTP response is allowed to have, defaults to

x-pack/plugin/core/src/test/java/org/elasticsearch/test/http/MockRequest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import org.elasticsearch.common.SuppressForbidden;
1010

11+
import java.net.InetSocketAddress;
1112
import java.net.URI;
1213
import java.util.Locale;
1314

@@ -19,13 +20,15 @@ public class MockRequest {
1920
private final String method;
2021
private final URI uri;
2122
private final Headers headers;
23+
private final InetSocketAddress remoteAddress;
2224
private String body = null;
2325

2426
@SuppressForbidden(reason = "use http server header class")
25-
MockRequest(String method, URI uri, com.sun.net.httpserver.Headers headers) {
27+
MockRequest(String method, URI uri, com.sun.net.httpserver.Headers headers, InetSocketAddress remoteAddress) {
2628
this.method = method;
2729
this.uri = uri;
2830
this.headers = new Headers(headers);
31+
this.remoteAddress = remoteAddress;
2932
}
3033

3134
/**
@@ -63,6 +66,13 @@ public String getBody() {
6366
return body;
6467
}
6568

69+
/**
70+
* @return The address of the client
71+
*/
72+
public InetSocketAddress getRemoteAddress() {
73+
return remoteAddress;
74+
}
75+
6676
@Override
6777
public String toString() {
6878
return String.format(Locale.ROOT, "%s %s", method, uri);

x-pack/plugin/core/src/test/java/org/elasticsearch/test/http/MockWebServer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,11 @@ private void sleepIfNeeded(TimeValue timeValue) throws InterruptedException {
193193
* Creates a MockRequest from an incoming HTTP request, that can later be checked in your test assertions
194194
*/
195195
private MockRequest createRequest(HttpExchange exchange) throws IOException {
196-
MockRequest request = new MockRequest(exchange.getRequestMethod(), exchange.getRequestURI(), exchange.getRequestHeaders());
196+
MockRequest request = new MockRequest(
197+
exchange.getRequestMethod(),
198+
exchange.getRequestURI(),
199+
exchange.getRequestHeaders(),
200+
exchange.getRemoteAddress());
197201
if (exchange.getRequestBody() != null) {
198202
String body = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
199203
if (Strings.isEmpty(body) == false) {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.http.client.protocol.HttpClientContext;
2727
import org.apache.http.client.utils.URIBuilder;
2828
import org.apache.http.client.utils.URIUtils;
29+
import org.apache.http.config.SocketConfig;
2930
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
3031
import org.apache.http.entity.ByteArrayEntity;
3132
import org.apache.http.entity.ContentType;
@@ -74,6 +75,7 @@
7475
import java.util.HashMap;
7576
import java.util.List;
7677
import java.util.Map;
78+
import java.util.concurrent.TimeUnit;
7779
import java.util.concurrent.atomic.AtomicReference;
7880

7981
public class HttpClient implements Closeable {
@@ -90,13 +92,17 @@ public class HttpClient implements Closeable {
9092
private final HttpProxy settingsProxy;
9193
private final TimeValue defaultConnectionTimeout;
9294
private final TimeValue defaultReadTimeout;
95+
private final boolean tcpKeepaliveEnabled;
96+
private final TimeValue connectionPoolTtl;
9397
private final ByteSizeValue maxResponseSize;
9498
private final CryptoService cryptoService;
9599
private final SSLService sslService;
96100

97101
public HttpClient(Settings settings, SSLService sslService, CryptoService cryptoService, ClusterService clusterService) {
98102
this.defaultConnectionTimeout = HttpSettings.CONNECTION_TIMEOUT.get(settings);
99103
this.defaultReadTimeout = HttpSettings.READ_TIMEOUT.get(settings);
104+
this.tcpKeepaliveEnabled = HttpSettings.TCP_KEEPALIVE.get(settings);
105+
this.connectionPoolTtl = HttpSettings.CONNECTION_POOL_TTL.get(settings);
100106
this.maxResponseSize = HttpSettings.MAX_HTTP_RESPONSE_SIZE.get(settings);
101107
this.settingsProxy = getProxyFromSettings(settings);
102108
this.cryptoService = cryptoService;
@@ -116,6 +122,16 @@ private CloseableHttpClient createHttpClient() {
116122
SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslConfiguration), verifier);
117123
clientBuilder.setSSLSocketFactory(factory);
118124

125+
final SocketConfig.Builder socketConfigBuilder = SocketConfig.custom();
126+
if (tcpKeepaliveEnabled) {
127+
socketConfigBuilder.setSoKeepAlive(true);
128+
}
129+
clientBuilder.setDefaultSocketConfig(socketConfigBuilder.build());
130+
131+
if (connectionPoolTtl.millis() > 0) {
132+
clientBuilder.setConnectionTimeToLive(connectionPoolTtl.millis(), TimeUnit.MILLISECONDS);
133+
}
134+
119135
clientBuilder.evictExpiredConnections();
120136
clientBuilder.setMaxConnPerRoute(MAX_CONNECTIONS);
121137
clientBuilder.setMaxConnTotal(MAX_CONNECTIONS);

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/common/http/HttpSettings.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public class HttpSettings {
3030
DEFAULT_READ_TIMEOUT, Property.NodeScope);
3131
static final Setting<TimeValue> CONNECTION_TIMEOUT = Setting.timeSetting("xpack.http.default_connection_timeout",
3232
DEFAULT_CONNECTION_TIMEOUT, Property.NodeScope);
33+
static final Setting<Boolean> TCP_KEEPALIVE = Setting.boolSetting("xpack.http.tcp.keep_alive",
34+
true, Property.NodeScope);
35+
static final Setting<TimeValue> CONNECTION_POOL_TTL = Setting.timeSetting("xpack.http.connection_pool_ttl",
36+
TimeValue.MINUS_ONE, Property.NodeScope);
3337

3438
private static final String PROXY_HOST_KEY = "xpack.http.proxy.host";
3539
private static final String PROXY_PORT_KEY = "xpack.http.proxy.port";
@@ -55,6 +59,8 @@ public static List<? extends Setting<?>> getSettings() {
5559
settings.addAll(SSL.getAllSettings());
5660
settings.add(READ_TIMEOUT);
5761
settings.add(CONNECTION_TIMEOUT);
62+
settings.add(TCP_KEEPALIVE);
63+
settings.add(CONNECTION_POOL_TTL);
5864
settings.add(PROXY_HOST);
5965
settings.add(PROXY_PORT);
6066
settings.add(PROXY_SCHEME);

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/common/http/HttpClientTests.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,50 @@ public void testCreateUri() throws Exception {
758758
assertCreateUri("https://example.org", "");
759759
}
760760

761+
public void testConnectionReuse() throws Exception {
762+
final HttpRequest request = HttpRequest.builder("localhost", webServer.getPort())
763+
.method(HttpMethod.POST)
764+
.path("/" + randomAlphaOfLength(5))
765+
.build();
766+
767+
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("whatever"));
768+
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("whatever"));
769+
770+
httpClient.execute(request);
771+
httpClient.execute(request);
772+
773+
assertThat(webServer.requests(), hasSize(2));
774+
// by default we re-use connections forever
775+
assertThat(webServer.requests().get(0).getRemoteAddress(), equalTo(webServer.requests().get(1).getRemoteAddress()));
776+
webServer.clearRequests();
777+
778+
try (HttpClient unpooledHttpClient = new HttpClient(
779+
Settings.builder().put(HttpSettings.CONNECTION_POOL_TTL.getKey(), "99ms").build(),
780+
new SSLService(environment),
781+
null,
782+
mockClusterService())) {
783+
784+
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("whatever"));
785+
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("whatever"));
786+
787+
unpooledHttpClient.execute(request);
788+
789+
// Connection pool expiry is based on System.currentTimeMillis so wait for this clock to advance far enough for the connection
790+
// we just used to expire
791+
final long waitStartTime = System.currentTimeMillis();
792+
while (System.currentTimeMillis() <= waitStartTime + 100) {
793+
//noinspection BusyWait
794+
Thread.sleep(100);
795+
}
796+
797+
unpooledHttpClient.execute(request);
798+
799+
assertThat(webServer.requests(), hasSize(2));
800+
// the connection expired before re-use so we made a new one
801+
assertThat(webServer.requests().get(0).getRemoteAddress(), not(equalTo(webServer.requests().get(1).getRemoteAddress())));
802+
}
803+
}
804+
761805
private void assertCreateUri(String uri, String expectedPath) {
762806
final HttpRequest request = HttpRequest.builder().fromUrl(uri).build();
763807
final Tuple<HttpHost, URI> tuple = HttpClient.createURI(request);

0 commit comments

Comments
 (0)