Skip to content

Add read timeouts to http module #27713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -93,6 +95,9 @@ public final class HttpTransportSettings {
public static final Setting<Boolean> SETTING_HTTP_RESET_COOKIES =
Setting.boolSetting("http.reset_cookies", false, Property.NodeScope);

public static final Setting<TimeValue> SETTING_HTTP_READ_TIMEOUT =
Setting.timeSetting("http.read_timeout", new TimeValue(30, TimeUnit.SECONDS), new TimeValue(0), Property.NodeScope);

public static final Setting<Boolean> SETTING_HTTP_TCP_NO_DELAY =
boolSetting("http.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -86,7 +87,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS;
Expand All @@ -105,6 +105,7 @@
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PORT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_ALIVE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY;
Expand Down Expand Up @@ -172,6 +173,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
protected final ByteSizeValue tcpSendBufferSize;
protected final ByteSizeValue tcpReceiveBufferSize;
protected final RecvByteBufAllocator recvByteBufAllocator;
private final int readTimeoutMillis;

protected final int maxCompositeBufferComponents;
private final Dispatcher dispatcher;
Expand Down Expand Up @@ -220,6 +222,7 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic
this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
this.detailedErrorsEnabled = SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings);
this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());

ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt());
Expand Down Expand Up @@ -480,7 +483,7 @@ void dispatchBadRequest(final RestRequest request, final RestChannel channel, fi
protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof ReadTimeoutException) {
if (logger.isTraceEnabled()) {
logger.trace("Connection timeout [{}]", ctx.channel().remoteAddress());
logger.trace("Read timeout [{}]", ctx.channel().remoteAddress());
}
ctx.channel().close();
} else {
Expand Down Expand Up @@ -524,6 +527,7 @@ protected HttpChannelHandler(
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
Math.toIntExact(transport.maxInitialLineLength.getBytes()),
Math.toIntExact(transport.maxHeaderSize.getBytes()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@

package org.elasticsearch.http.netty4;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
Expand All @@ -39,6 +46,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.BindHttpException;
Expand All @@ -63,6 +71,8 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -313,4 +323,53 @@ public void dispatchBadRequest(final RestRequest request,
assertNull(threadPool.getThreadContext().getTransient("bar_bad"));
}
}

public void testReadTimeout() throws Exception {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {

@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
throw new AssertionError("Should not have received a dispatched request");
}

@Override
public void dispatchBadRequest(final RestRequest request,
final RestChannel channel,
final ThreadContext threadContext,
final Throwable cause) {
throw new AssertionError("Should not have received a dispatched request");
}

};

Settings settings = Settings.builder()
.put(HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300)))
.build();


NioEventLoopGroup group = new NioEventLoopGroup();
try (Netty4HttpServerTransport transport =
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress.boundAddresses());

AtomicBoolean channelClosed = new AtomicBoolean(false);

Bootstrap clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ChannelHandlerAdapter() {});

}
}).group(group);
ChannelFuture connect = clientBootstrap.connect(remoteAddress.address());
connect.channel().closeFuture().addListener(future -> channelClosed.set(true));

assertBusy(() -> assertTrue("Channel should be closed due to read timeout", channelClosed.get()), 5, TimeUnit.SECONDS);

} finally {
group.shutdownGracefully().await();
}
}
}