Skip to content

Commit ad8a571

Browse files
authored
Add read timeouts to http module (#27713)
We currently do not have any server-side read timeouts implemented in elasticsearch. This commit adds a read timeout setting that defaults to 30 seconds. If after 30 seconds a read has not occurred, the channel will be closed. A timeout of value of 0 will disable the timeout.
1 parent ec5e540 commit ad8a571

File tree

3 files changed

+70
-2
lines changed

3 files changed

+70
-2
lines changed

core/src/main/java/org/elasticsearch/http/HttpTransportSettings.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import org.elasticsearch.common.transport.PortsRange;
2727
import org.elasticsearch.common.unit.ByteSizeUnit;
2828
import org.elasticsearch.common.unit.ByteSizeValue;
29+
import org.elasticsearch.common.unit.TimeValue;
2930

3031
import java.util.List;
32+
import java.util.concurrent.TimeUnit;
3133
import java.util.function.Function;
3234

3335
import static java.util.Collections.emptyList;
@@ -93,6 +95,9 @@ public final class HttpTransportSettings {
9395
public static final Setting<Boolean> SETTING_HTTP_RESET_COOKIES =
9496
Setting.boolSetting("http.reset_cookies", false, Property.NodeScope);
9597

98+
public static final Setting<TimeValue> SETTING_HTTP_READ_TIMEOUT =
99+
Setting.timeSetting("http.read_timeout", new TimeValue(30, TimeUnit.SECONDS), new TimeValue(0), Property.NodeScope);
100+
96101
public static final Setting<Boolean> SETTING_HTTP_TCP_NO_DELAY =
97102
boolSetting("http.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
98103
public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import io.netty.handler.codec.http.HttpRequestDecoder;
4141
import io.netty.handler.codec.http.HttpResponseEncoder;
4242
import io.netty.handler.timeout.ReadTimeoutException;
43+
import io.netty.handler.timeout.ReadTimeoutHandler;
4344
import org.apache.logging.log4j.message.ParameterizedMessage;
4445
import org.apache.logging.log4j.util.Supplier;
4546
import org.elasticsearch.common.Strings;
@@ -86,7 +87,6 @@
8687
import java.util.concurrent.atomic.AtomicReference;
8788
import java.util.regex.Pattern;
8889

89-
import static org.elasticsearch.common.settings.Setting.boolSetting;
9090
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
9191
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS;
9292
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS;
@@ -105,6 +105,7 @@
105105
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PORT;
106106
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST;
107107
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT;
108+
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT;
108109
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES;
109110
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_ALIVE;
110111
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY;
@@ -172,6 +173,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
172173
protected final ByteSizeValue tcpSendBufferSize;
173174
protected final ByteSizeValue tcpReceiveBufferSize;
174175
protected final RecvByteBufAllocator recvByteBufAllocator;
176+
private final int readTimeoutMillis;
175177

176178
protected final int maxCompositeBufferComponents;
177179
private final Dispatcher dispatcher;
@@ -220,6 +222,7 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic
220222
this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
221223
this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
222224
this.detailedErrorsEnabled = SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings);
225+
this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());
223226

224227
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
225228
recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt());
@@ -480,7 +483,7 @@ void dispatchBadRequest(final RestRequest request, final RestChannel channel, fi
480483
protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
481484
if (cause instanceof ReadTimeoutException) {
482485
if (logger.isTraceEnabled()) {
483-
logger.trace("Connection timeout [{}]", ctx.channel().remoteAddress());
486+
logger.trace("Read timeout [{}]", ctx.channel().remoteAddress());
484487
}
485488
ctx.channel().close();
486489
} else {
@@ -524,6 +527,7 @@ protected HttpChannelHandler(
524527
@Override
525528
protected void initChannel(Channel ch) throws Exception {
526529
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
530+
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
527531
final HttpRequestDecoder decoder = new HttpRequestDecoder(
528532
Math.toIntExact(transport.maxInitialLineLength.getBytes()),
529533
Math.toIntExact(transport.maxHeaderSize.getBytes()),

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,15 @@
1919

2020
package org.elasticsearch.http.netty4;
2121

22+
import io.netty.bootstrap.Bootstrap;
2223
import io.netty.buffer.ByteBufUtil;
2324
import io.netty.buffer.Unpooled;
25+
import io.netty.channel.ChannelFuture;
26+
import io.netty.channel.ChannelHandlerAdapter;
27+
import io.netty.channel.ChannelInitializer;
28+
import io.netty.channel.nio.NioEventLoopGroup;
29+
import io.netty.channel.socket.SocketChannel;
30+
import io.netty.channel.socket.nio.NioSocketChannel;
2431
import io.netty.handler.codec.TooLongFrameException;
2532
import io.netty.handler.codec.http.DefaultFullHttpRequest;
2633
import io.netty.handler.codec.http.FullHttpRequest;
@@ -39,6 +46,7 @@
3946
import org.elasticsearch.common.settings.Settings;
4047
import org.elasticsearch.common.transport.TransportAddress;
4148
import org.elasticsearch.common.unit.ByteSizeValue;
49+
import org.elasticsearch.common.unit.TimeValue;
4250
import org.elasticsearch.common.util.MockBigArrays;
4351
import org.elasticsearch.common.util.concurrent.ThreadContext;
4452
import org.elasticsearch.http.BindHttpException;
@@ -63,6 +71,8 @@
6371
import java.util.Collections;
6472
import java.util.HashSet;
6573
import java.util.Set;
74+
import java.util.concurrent.TimeUnit;
75+
import java.util.concurrent.atomic.AtomicBoolean;
6676
import java.util.concurrent.atomic.AtomicReference;
6777
import java.util.stream.Collectors;
6878

@@ -313,4 +323,53 @@ public void dispatchBadRequest(final RestRequest request,
313323
assertNull(threadPool.getThreadContext().getTransient("bar_bad"));
314324
}
315325
}
326+
327+
public void testReadTimeout() throws Exception {
328+
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
329+
330+
@Override
331+
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
332+
throw new AssertionError("Should not have received a dispatched request");
333+
}
334+
335+
@Override
336+
public void dispatchBadRequest(final RestRequest request,
337+
final RestChannel channel,
338+
final ThreadContext threadContext,
339+
final Throwable cause) {
340+
throw new AssertionError("Should not have received a dispatched request");
341+
}
342+
343+
};
344+
345+
Settings settings = Settings.builder()
346+
.put(HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300)))
347+
.build();
348+
349+
350+
NioEventLoopGroup group = new NioEventLoopGroup();
351+
try (Netty4HttpServerTransport transport =
352+
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) {
353+
transport.start();
354+
final TransportAddress remoteAddress = randomFrom(transport.boundAddress.boundAddresses());
355+
356+
AtomicBoolean channelClosed = new AtomicBoolean(false);
357+
358+
Bootstrap clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
359+
360+
@Override
361+
protected void initChannel(SocketChannel ch) {
362+
ch.pipeline().addLast(new ChannelHandlerAdapter() {});
363+
364+
}
365+
}).group(group);
366+
ChannelFuture connect = clientBootstrap.connect(remoteAddress.address());
367+
connect.channel().closeFuture().addListener(future -> channelClosed.set(true));
368+
369+
assertBusy(() -> assertTrue("Channel should be closed due to read timeout", channelClosed.get()), 5, TimeUnit.SECONDS);
370+
371+
} finally {
372+
group.shutdownGracefully().await();
373+
}
374+
}
316375
}

0 commit comments

Comments
 (0)