From 0cc13b9d5be130dd24e49d6cf7feb317b6d73a4d Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 19 Mar 2015 23:23:36 -0700 Subject: [PATCH] Schedule transport ping interval Sometimes, when using transport client for example, through a load balancer, there is a need to send a scheduled ping message to keep each channel alive. Add support for `transport.ping_schedule`, which controls the schedule (-1 for disabled) at which a ping message will be sent. For transport client case, it gets enabled automatically since almost always this is the desired behavior. We use the same 6 bytes header format for the ping message, with ES header and -1 for data length for ping message, and simply continue to process the next messages once this is encountered. closes #10189 --- .../client/transport/TransportClient.java | 5 +- .../transport/netty/NettyHeader.java | 21 +++ .../transport/netty/NettyTransport.java | 80 ++++++++-- .../netty/SizeHeaderFrameDecoder.java | 6 + .../test/ElasticsearchIntegrationTest.java | 4 + .../netty/NettyScheduledPingTests.java | 140 ++++++++++++++++++ 6 files changed, 239 insertions(+), 17 deletions(-) create mode 100644 src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 736764b709b2f..2e4b995bf465c 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -80,6 +80,7 @@ import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.netty.NettyTransport; import java.util.concurrent.TimeUnit; @@ -152,7 +153,9 @@ public TransportClient(Settings.Builder settings, boolean loadConfigSettings) th */ public TransportClient(Settings pSettings, boolean loadConfigSettings) throws ElasticsearchException { Tuple tuple = InternalSettingsPreparer.prepareSettings(pSettings, loadConfigSettings); - Settings settings = settingsBuilder().put(tuple.v1()) + Settings settings = settingsBuilder() + .put(NettyTransport.PING_SCHEDULE, "5s") // enable by default the transport schedule ping interval + .put(tuple.v1()) .put("network.server", false) .put("node.client", true) .put(CLIENT_TYPE_SETTING, CLIENT_TYPE) diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java b/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java index aa06dc1df2873..0e83911a772fd 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java @@ -21,6 +21,7 @@ import org.elasticsearch.Version; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; /** */ @@ -28,6 +29,26 @@ public class NettyHeader { public static final int HEADER_SIZE = 2 + 4 + 8 + 1 + 4; + /** + * The magic number (must be lower than 0) for a ping message. This is handled + * specifically in {@link org.elasticsearch.transport.netty.SizeHeaderFrameDecoder}. + */ + public static final int PING_DATA_SIZE = -1; + private final static ChannelBuffer pingHeader; + static { + pingHeader = ChannelBuffers.buffer(6); + pingHeader.writeByte('E'); + pingHeader.writeByte('S'); + pingHeader.writeInt(PING_DATA_SIZE); + } + + /** + * A ping header is same as regular header, just with -1 for the size of the message. + */ + public static ChannelBuffer pingHeader() { + return pingHeader.duplicate(); + } + public static void writeHeader(ChannelBuffer buffer, long requestId, byte status, Version version) { int index = buffer.readerIndex(); buffer.setByte(index, 'E'); diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 1ed1298b21443..34e9e79b2ecac 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.math.MathUtils; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.netty.NettyUtils; import org.elasticsearch.common.netty.OpenChannelsHandler; import org.elasticsearch.common.netty.ReleaseChannelFutureListener; @@ -76,6 +77,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -110,6 +112,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem public static final String CONNECTIONS_PER_NODE_REG = "transport.connections_per_node.reg"; public static final String CONNECTIONS_PER_NODE_STATE = "transport.connections_per_node.state"; public static final String CONNECTIONS_PER_NODE_PING = "transport.connections_per_node.ping"; + public static final String PING_SCHEDULE = "transport.ping_schedule"; // the scheduled internal ping interval setting + public static final TimeValue DEFAULT_PING_SCHEDULE = TimeValue.timeValueMillis(-1); // the default ping schedule, defaults to disabled (-1) public static final String DEFAULT_PORT_RANGE = "9300-9400"; public static final String DEFAULT_PROFILE = "default"; @@ -132,6 +136,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem protected final int connectionsPerNodeState; protected final int connectionsPerNodePing; + private final TimeValue pingSchedule; + protected final BigArrays bigArrays; protected final ThreadPool threadPool; protected volatile OpenChannelsHandler serverOpenChannels; @@ -149,6 +155,9 @@ public class NettyTransport extends AbstractLifecycleComponent implem // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) private final ReadWriteLock globalLock = new ReentrantReadWriteLock(); + // package visibility for tests + final ScheduledPing scheduledPing; + @Inject public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) { super(settings); @@ -200,6 +209,12 @@ public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService n } else { receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes()); } + + this.scheduledPing = new ScheduledPing(); + this.pingSchedule = settings.getAsTime(PING_SCHEDULE, DEFAULT_PING_SCHEDULE); + if (pingSchedule.millis() > 0) { + threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, scheduledPing); + } } public Settings settings() { @@ -752,6 +767,7 @@ public void connectToNode(DiscoveryNode node, boolean light) { } } // we acquire a connection lock, so no way there is an existing connection + nodeChannels.start(); connectedNodes.put(node, nodeChannels); if (logger.isDebugEnabled()) { logger.debug("connected to node [{}]", node); @@ -1047,6 +1063,7 @@ public void run() { public static class NodeChannels { + ImmutableList allChannels = ImmutableList.of(); private Channel[] recovery; private final AtomicInteger recoveryCounter = new AtomicInteger(); private Channel[] bulk; @@ -1066,12 +1083,12 @@ public NodeChannels(Channel[] recovery, Channel[] bulk, Channel[] reg, Channel[] this.ping = ping; } - public boolean hasChannel(Channel channel) { - return hasChannel(channel, recovery) || hasChannel(channel, bulk) || hasChannel(channel, reg) || hasChannel(channel, state) || hasChannel(channel, ping); + public void start() { + this.allChannels = ImmutableList.builder().add(recovery).add(bulk).add(reg).add(state).add(ping).build(); } - private boolean hasChannel(Channel channel, Channel[] channels) { - for (Channel channel1 : channels) { + public boolean hasChannel(Channel channel) { + for (Channel channel1 : allChannels) { if (channel.equals(channel1)) { return true; } @@ -1097,18 +1114,7 @@ public Channel channel(TransportRequestOptions.Type type) { public synchronized void close() { List futures = new ArrayList<>(); - closeChannelsAndWait(recovery, futures); - closeChannelsAndWait(bulk, futures); - closeChannelsAndWait(reg, futures); - closeChannelsAndWait(state, futures); - closeChannelsAndWait(ping, futures); - for (ChannelFuture future : futures) { - future.awaitUninterruptibly(); - } - } - - private void closeChannelsAndWait(Channel[] channels, List futures) { - for (Channel channel : channels) { + for (Channel channel : allChannels) { try { if (channel != null && channel.isOpen()) { futures.add(channel.close()); @@ -1117,6 +1123,48 @@ private void closeChannelsAndWait(Channel[] channels, List future //ignore } } + for (ChannelFuture future : futures) { + future.awaitUninterruptibly(); + } + } + } + + class ScheduledPing implements Runnable { + + final CounterMetric successfulPings = new CounterMetric(); + final CounterMetric failedPings = new CounterMetric(); + + @Override + public void run() { + if (lifecycle.stoppedOrClosed()) { + return; + } + for (Map.Entry entry : connectedNodes.entrySet()) { + DiscoveryNode node = entry.getKey(); + NodeChannels channels = entry.getValue(); + // we only support the ping message format since 1.6 + if (node.version().onOrAfter(Version.V_1_6_0)) { + for (Channel channel : channels.allChannels) { + try { + ChannelFuture future = channel.write(NettyHeader.pingHeader()); + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + successfulPings.inc(); + } + }); + } catch (Throwable t) { + if (channel.isOpen()) { + logger.debug("[{}] failed to send ping transport message", t, node); + failedPings.inc(); + } else { + logger.trace("[{}] failed to send ping transport message (channel closed)", t, node); + } + } + } + } + } + threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, this); } } } diff --git a/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java b/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java index d3fd096ffb8cb..aa98f74514015 100644 --- a/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java +++ b/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java @@ -68,6 +68,12 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe } int dataLen = buffer.getInt(buffer.readerIndex() + 2); + if (dataLen == NettyHeader.PING_DATA_SIZE) { + // discard the messages we read and continue, this is achieved by skipping the bytes + // and returning null + buffer.skipBytes(6); + return null; + } if (dataLen <= 0) { throw new StreamCorruptedException("invalid data length: " + dataLen); } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 2de2cf7cd9c05..899f156363340 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -114,6 +114,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.test.client.RandomizingClient; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; +import org.elasticsearch.transport.netty.NettyTransport; import org.hamcrest.Matchers; import org.joda.time.DateTimeZone; import org.junit.*; @@ -465,6 +466,9 @@ private static ImmutableSettings.Builder setRandomSettings(Random random, Immuta // see #7210 builder.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, false); } + if (random.nextBoolean()) { + builder.put(NettyTransport.PING_SCHEDULE, RandomInts.randomIntBetween(random, 100, 2000) + "ms"); + } return builder; } diff --git a/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java b/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java new file mode 100644 index 0000000000000..9e323f809ee70 --- /dev/null +++ b/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport.netty; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +/** + */ +public class NettyScheduledPingTests extends ElasticsearchTestCase { + + @Test + public void testScheduledPing() throws Exception { + ThreadPool threadPool = new ThreadPool(getClass().getName()); + + int startPort = 11000 + randomIntBetween(0, 255); + int endPort = startPort + 10; + Settings settings = ImmutableSettings.builder().put(NettyTransport.PING_SCHEDULE, "5ms").put("transport.tcp.port", startPort + "-" + endPort).build(); + + final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); + MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool); + serviceA.start(); + + final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); + MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool); + serviceB.start(); + + DiscoveryNode nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), ImmutableMap.of(), Version.CURRENT); + DiscoveryNode nodeB = new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), ImmutableMap.of(), Version.CURRENT); + + serviceA.connectToNode(nodeB); + serviceB.connectToNode(nodeA); + + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(100l)); + assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(100l)); + } + }); + assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0l)); + assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0l)); + + serviceA.registerHandler("sayHello", new BaseTransportRequestHandler() { + @Override + public TransportRequest.Empty newInstance() { + return TransportRequest.Empty.INSTANCE; + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { + try { + channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options()); + } catch (IOException e) { + e.printStackTrace(); + assertThat(e.getMessage(), false, equalTo(true)); + } + } + }); + + // send some messages while ping requests are going around + int rounds = scaledRandomIntBetween(100, 5000); + for (int i = 0; i < rounds; i++) { + serviceB.submitRequest(nodeA, "sayHello", + TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(randomBoolean()), new BaseTransportResponseHandler() { + @Override + public TransportResponse.Empty newInstance() { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public void handleResponse(TransportResponse.Empty response) { + } + + @Override + public void handleException(TransportException exp) { + exp.printStackTrace(); + assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true)); + } + }).txGet(); + } + + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(nettyA.scheduledPing.successfulPings.count(), greaterThan(200l)); + assertThat(nettyB.scheduledPing.successfulPings.count(), greaterThan(200l)); + } + }); + assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0l)); + assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0l)); + + Releasables.close(serviceA, serviceB); + terminate(threadPool); + } +}