Skip to content

Commit c305f9d

Browse files
authored
Make keepalive pings bidirectional and optimizable (#35441)
This is related to #34405 and a follow-up to #34753. It makes a number of changes to our current keepalive pings. The ping interval configuration is moved to the ConnectionProfile. The server channel now responds to pings. This makes the keepalive pings bidirectional. On the client-side, the pings can now be optimized away. What this means is that if the channel has received a message or sent a message since the last pinging round, the ping is not sent for this round.
1 parent 5eb7040 commit c305f9d

File tree

27 files changed

+963
-486
lines changed

27 files changed

+963
-486
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,15 @@
3535
public class Netty4TcpChannel implements TcpChannel {
3636

3737
private final Channel channel;
38+
private final boolean isServer;
3839
private final String profile;
3940
private final CompletableContext<Void> connectContext;
4041
private final CompletableContext<Void> closeContext = new CompletableContext<>();
42+
private final ChannelStats stats = new ChannelStats();
4143

42-
Netty4TcpChannel(Channel channel, String profile, @Nullable ChannelFuture connectFuture) {
44+
Netty4TcpChannel(Channel channel, boolean isServer, String profile, @Nullable ChannelFuture connectFuture) {
4345
this.channel = channel;
46+
this.isServer = isServer;
4447
this.profile = profile;
4548
this.connectContext = new CompletableContext<>();
4649
this.channel.closeFuture().addListener(f -> {
@@ -77,6 +80,11 @@ public void close() {
7780
channel.close();
7881
}
7982

83+
@Override
84+
public boolean isServerChannel() {
85+
return isServer;
86+
}
87+
8088
@Override
8189
public String getProfile() {
8290
return profile;
@@ -92,6 +100,11 @@ public void addConnectListener(ActionListener<Void> listener) {
92100
connectContext.addListener(ActionListener.toBiConsumer(listener));
93101
}
94102

103+
@Override
104+
public ChannelStats getChannelStats() {
105+
return stats;
106+
}
107+
95108
@Override
96109
public boolean isOpen() {
97110
return channel.isOpen();

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

+2-10
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOExceptio
232232
}
233233
addClosedExceptionLogger(channel);
234234

235-
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default", connectFuture);
235+
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, false, "default", connectFuture);
236236
channel.attr(CHANNEL_KEY).set(nettyChannel);
237237

238238
return nettyChannel;
@@ -246,14 +246,6 @@ protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
246246
return esChannel;
247247
}
248248

249-
long successfulPingCount() {
250-
return successfulPings.count();
251-
}
252-
253-
long failedPingCount() {
254-
return failedPings.count();
255-
}
256-
257249
@Override
258250
@SuppressForbidden(reason = "debug")
259251
protected void stopInternal() {
@@ -297,7 +289,7 @@ protected ServerChannelInitializer(String name) {
297289
@Override
298290
protected void initChannel(Channel ch) throws Exception {
299291
addClosedExceptionLogger(ch);
300-
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name, ch.newSucceededFuture());
292+
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
301293
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
302294
ch.pipeline().addLast("logging", new ESLoggingHandler());
303295
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

-138
This file was deleted.

plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpChannel.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,25 @@
2828

2929
public class NioTcpChannel extends NioSocketChannel implements TcpChannel {
3030

31+
private final boolean isServer;
3132
private final String profile;
33+
private final ChannelStats stats = new ChannelStats();
3234

33-
public NioTcpChannel(String profile, SocketChannel socketChannel) {
35+
public NioTcpChannel(boolean isServer, String profile, SocketChannel socketChannel) {
3436
super(socketChannel);
37+
this.isServer = isServer;
3538
this.profile = profile;
3639
}
3740

3841
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
3942
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
4043
}
4144

45+
@Override
46+
public boolean isServerChannel() {
47+
return isServer;
48+
}
49+
4250
@Override
4351
public String getProfile() {
4452
return profile;
@@ -54,6 +62,11 @@ public void addConnectListener(ActionListener<Void> listener) {
5462
addConnectListener(ActionListener.toBiConsumer(listener));
5563
}
5664

65+
@Override
66+
public ChannelStats getChannelStats() {
67+
return stats;
68+
}
69+
5770
@Override
5871
public void close() {
5972
getContext().closeChannel();

plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
5959

6060
public class NioTransport extends TcpTransport {
61+
6162
private static final Logger logger = LogManager.getLogger(NioTransport.class);
6263

6364
public static final Setting<Integer> NIO_WORKER_COUNT =
@@ -135,11 +136,11 @@ protected void acceptChannel(NioSocketChannel channel) {
135136
}
136137

137138
protected TcpChannelFactory serverChannelFactory(ProfileSettings profileSettings) {
138-
return new TcpChannelFactoryImpl(profileSettings);
139+
return new TcpChannelFactoryImpl(profileSettings, false);
139140
}
140141

141142
protected Function<DiscoveryNode, TcpChannelFactory> clientChannelFactoryFunction(ProfileSettings profileSettings) {
142-
return (n) -> new TcpChannelFactoryImpl(profileSettings);
143+
return (n) -> new TcpChannelFactoryImpl(profileSettings, true);
143144
}
144145

145146
protected abstract class TcpChannelFactory extends ChannelFactory<NioTcpServerChannel, NioTcpChannel> {
@@ -151,20 +152,22 @@ protected TcpChannelFactory(RawChannelFactory rawChannelFactory) {
151152

152153
private class TcpChannelFactoryImpl extends TcpChannelFactory {
153154

155+
private final boolean isClient;
154156
private final String profileName;
155157

156-
private TcpChannelFactoryImpl(ProfileSettings profileSettings) {
158+
private TcpChannelFactoryImpl(ProfileSettings profileSettings, boolean isClient) {
157159
super(new RawChannelFactory(profileSettings.tcpNoDelay,
158160
profileSettings.tcpKeepAlive,
159161
profileSettings.reuseAddress,
160162
Math.toIntExact(profileSettings.sendBufferSize.getBytes()),
161163
Math.toIntExact(profileSettings.receiveBufferSize.getBytes())));
164+
this.isClient = isClient;
162165
this.profileName = profileSettings.profileName;
163166
}
164167

165168
@Override
166-
public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
167-
NioTcpChannel nioChannel = new NioTcpChannel(profileName, channel);
169+
public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) {
170+
NioTcpChannel nioChannel = new NioTcpChannel(isClient == false, profileName, channel);
168171
Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
169172
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
170173
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
@@ -178,7 +181,7 @@ public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel)
178181
}
179182

180183
@Override
181-
public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
184+
public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) {
182185
NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);
183186
Consumer<Exception> exceptionHandler = (e) -> onServerException(nioChannel, e);
184187
Consumer<NioSocketChannel> acceptor = NioTransport.this::acceptChannel;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.common;
20+
21+
import org.elasticsearch.action.ActionListener;
22+
23+
/**
24+
* A {@link java.util.function.BiFunction}-like interface designed to be used with asynchronous executions.
25+
*/
26+
public interface AsyncBiFunction<T,U,C> {
27+
28+
void apply(T t, U u, ActionListener<C> listener);
29+
}

0 commit comments

Comments
 (0)