Skip to content

Commit d1acb76

Browse files
authored
Remove internal channel tracking in transports (elastic#27711)
This commit attempts to continue unifying the logic between different transport implementations. As transports call a `TcpTransport` callback when a new channel is accepted, there is no need to internally track channels accepted. Instead there is a set of accepted channels in `TcpTransport`. This set is used for metrics and shutting down channels.
1 parent f50f99e commit d1acb76

File tree

10 files changed

+52
-177
lines changed

10 files changed

+52
-177
lines changed

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -195,22 +195,22 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
195195
protected final NetworkService networkService;
196196
protected final Set<ProfileSettings> profileSettings;
197197

198-
protected volatile TransportService transportService;
199-
// node id to actual channel
200-
protected final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
198+
private volatile TransportService transportService;
201199

202-
protected final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
200+
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
201+
// node id to actual channel
202+
private final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
203203
private final Map<String, List<TcpChannel>> serverChannels = newConcurrentMap();
204204
private final Set<TcpChannel> acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
205205

206-
protected final KeyedLock<String> connectionLock = new KeyedLock<>();
206+
private final KeyedLock<String> connectionLock = new KeyedLock<>();
207207
private final NamedWriteableRegistry namedWriteableRegistry;
208208

209209
// this lock is here to make sure we close this transport and disconnect all the client nodes
210210
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
211-
protected final ReadWriteLock closeLock = new ReentrantReadWriteLock();
211+
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
212212
protected final boolean compress;
213-
protected volatile BoundTransportAddress boundAddress;
213+
private volatile BoundTransportAddress boundAddress;
214214
private final String transportName;
215215
protected final ConnectionProfile defaultConnectionProfile;
216216

@@ -438,7 +438,7 @@ public boolean allChannelsOpen() {
438438
}
439439

440440
@Override
441-
public void close() throws IOException {
441+
public void close() {
442442
if (closed.compareAndSet(false, true)) {
443443
try {
444444
if (lifecycle.stopped()) {
@@ -582,7 +582,7 @@ protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectio
582582
}
583583

584584
@Override
585-
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
585+
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
586586
if (node == null) {
587587
throw new ConnectTransportException(null, "can't open connection to a null node");
588588
}
@@ -602,6 +602,7 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
602602
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
603603
connectionFutures.add(connectFuture);
604604
TcpChannel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture);
605+
logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel));
605606
channels.add(channel);
606607
} catch (Exception e) {
607608
// If there was an exception when attempting to instantiate the raw channels, we close all of the channels
@@ -1041,6 +1042,7 @@ protected void serverAcceptedChannel(TcpChannel channel) {
10411042
boolean addedOnThisCall = acceptedChannels.add(channel);
10421043
assert addedOnThisCall : "Channel should only be added to accept channel set once";
10431044
channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
1045+
logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel));
10441046
}
10451047

10461048
/**
@@ -1738,15 +1740,9 @@ private void closeAndCallback(final Exception e) {
17381740
}
17391741
}
17401742

1741-
/**
1742-
* Returns count of currently open connections
1743-
*/
1744-
protected abstract long getNumOpenServerConnections();
1745-
17461743
@Override
17471744
public final TransportStats getStats() {
1748-
return new TransportStats(
1749-
getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
1745+
return new TransportStats(acceptedChannels.size(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
17501746
transmittedBytesMetric.sum());
17511747
}
17521748

core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,6 @@ protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeo
191191
return new FakeChannel(messageCaptor);
192192
}
193193

194-
@Override
195-
public long getNumOpenServerConnections() {
196-
return 0;
197-
}
198-
199194
@Override
200195
public NodeChannels getConnection(DiscoveryNode node) {
201196
int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections();

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,6 @@ public class Netty4Transport extends TcpTransport {
104104
protected final int workerCount;
105105
protected final ByteSizeValue receivePredictorMin;
106106
protected final ByteSizeValue receivePredictorMax;
107-
// package private for testing
108-
volatile Netty4OpenChannelsHandler serverOpenChannels;
109107
protected volatile Bootstrap bootstrap;
110108
protected final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();
111109

@@ -132,8 +130,6 @@ protected void doStart() {
132130
try {
133131
bootstrap = createBootstrap();
134132
if (NetworkService.NETWORK_SERVER.get(settings)) {
135-
final Netty4OpenChannelsHandler openChannels = new Netty4OpenChannelsHandler(logger);
136-
this.serverOpenChannels = openChannels;
137133
for (ProfileSettings profileSettings : profileSettings) {
138134
createServerBootstrap(profileSettings);
139135
bindServer(profileSettings);
@@ -242,12 +238,6 @@ protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
242238
onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t));
243239
}
244240

245-
@Override
246-
public long getNumOpenServerConnections() {
247-
Netty4OpenChannelsHandler channels = serverOpenChannels;
248-
return channels == null ? 0 : channels.numberOfOpenChannels();
249-
}
250-
251241
@Override
252242
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> listener)
253243
throws IOException {
@@ -294,7 +284,7 @@ ScheduledPing getPing() {
294284
@Override
295285
@SuppressForbidden(reason = "debug")
296286
protected void stopInternal() {
297-
Releasables.close(serverOpenChannels, () -> {
287+
Releasables.close(() -> {
298288
final List<Tuple<String, Future<?>>> serverBootstrapCloseFutures = new ArrayList<>(serverBootstraps.size());
299289
for (final Map.Entry<String, ServerBootstrap> entry : serverBootstraps.entrySet()) {
300290
serverBootstrapCloseFutures.add(
@@ -349,7 +339,6 @@ protected void initChannel(Channel ch) throws Exception {
349339
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
350340
serverAcceptedChannel(nettyTcpChannel);
351341
ch.pipeline().addLast("logging", new ESLoggingHandler());
352-
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
353342
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
354343
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
355344
}

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
9696
}
9797
});
9898
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise);
99-
99+
100100
if (channel.eventLoop().isShutdown()) {
101101
listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
102102
}
@@ -105,4 +105,12 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
105105
public Channel getLowLevelChannel() {
106106
return channel;
107107
}
108+
109+
@Override
110+
public String toString() {
111+
return "NettyTcpChannel{" +
112+
"localAddress=" + getLocalAddress() +
113+
", remoteAddress=" + channel.remoteAddress() +
114+
'}';
115+
}
108116
}

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
109109

110110
protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);
111111

112+
protected int channelsPerNodeConnection() {
113+
return 13;
114+
}
115+
112116
@Override
113117
@Before
114118
public void setUp() throws Exception {
@@ -2345,6 +2349,24 @@ public String executor() {
23452349
}
23462350
}
23472351

2352+
public void testAcceptedChannelCount() throws Exception {
2353+
assertBusy(() -> {
2354+
TransportStats transportStats = serviceA.transport.getStats();
2355+
assertEquals(channelsPerNodeConnection(), transportStats.getServerOpen());
2356+
});
2357+
assertBusy(() -> {
2358+
TransportStats transportStats = serviceB.transport.getStats();
2359+
assertEquals(channelsPerNodeConnection(), transportStats.getServerOpen());
2360+
});
2361+
2362+
serviceA.close();
2363+
2364+
assertBusy(() -> {
2365+
TransportStats transportStats = serviceB.transport.getStats();
2366+
assertEquals(0, transportStats.getServerOpen());
2367+
});
2368+
}
2369+
23482370
public void testTransportStatsWithException() throws Exception {
23492371
MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
23502372
CountDownLatch receivedLatch = new CountDownLatch(1);

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,6 @@ private void configureSocket(Socket socket) throws SocketException {
217217
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
218218
}
219219

220-
@Override
221-
public long getNumOpenServerConnections() {
222-
return 1;
223-
}
224-
225220
public final class MockChannel implements Closeable, TcpChannel {
226221
private final AtomicBoolean isOpen = new AtomicBoolean(true);
227222
private final InetSocketAddress localAddress;

test/framework/src/main/java/org/elasticsearch/transport/nio/NioShutdown.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,12 @@ public NioShutdown(Logger logger) {
3434
this.logger = logger;
3535
}
3636

37-
void orderlyShutdown(OpenChannels openChannels, ArrayList<AcceptingSelector> acceptors, ArrayList<SocketSelector> socketSelectors) {
38-
39-
// Start by closing the server channels. Once these are closed, we are guaranteed to no accept new connections
40-
openChannels.closeServerChannels();
37+
void orderlyShutdown(ArrayList<AcceptingSelector> acceptors, ArrayList<SocketSelector> socketSelectors) {
4138

4239
for (AcceptingSelector acceptor : acceptors) {
4340
shutdownSelector(acceptor);
4441
}
4542

46-
openChannels.close();
47-
4843
for (SocketSelector selector : socketSelectors) {
4944
shutdownSelector(selector);
5045
}

test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.elasticsearch.threadpool.ThreadPool;
3636
import org.elasticsearch.transport.TcpTransport;
3737
import org.elasticsearch.transport.Transports;
38-
import org.elasticsearch.transport.nio.channel.NioChannel;
3938
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
4039
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
4140
import org.elasticsearch.transport.nio.channel.TcpChannelFactory;
@@ -70,7 +69,6 @@ public class NioTransport extends TcpTransport {
7069
public static final Setting<Integer> NIO_ACCEPTOR_COUNT =
7170
intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);
7271

73-
private final OpenChannels openChannels = new OpenChannels(logger);
7472
private final PageCacheRecycler pageCacheRecycler;
7573
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
7674
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
@@ -86,27 +84,17 @@ public NioTransport(Settings settings, ThreadPool threadPool, NetworkService net
8684
this.pageCacheRecycler = pageCacheRecycler;
8785
}
8886

89-
@Override
90-
public long getNumOpenServerConnections() {
91-
return openChannels.serverChannelsCount();
92-
}
93-
9487
@Override
9588
protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException {
9689
TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name);
9790
AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings));
98-
TcpNioServerSocketChannel serverChannel = channelFactory.openNioServerSocketChannel(address, selector);
99-
openChannels.serverChannelOpened(serverChannel);
100-
serverChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(serverChannel)));
101-
return serverChannel;
91+
return channelFactory.openNioServerSocketChannel(address, selector);
10292
}
10393

10494
@Override
10595
protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
10696
throws IOException {
10797
TcpNioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
108-
openChannels.clientChannelOpened(channel);
109-
channel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel)));
11098
channel.addConnectListener(connectListener);
11199
return channel;
112100
}
@@ -175,7 +163,7 @@ protected void doStart() {
175163
@Override
176164
protected void stopInternal() {
177165
NioShutdown nioShutdown = new NioShutdown(logger);
178-
nioShutdown.orderlyShutdown(openChannels, acceptors, socketSelectors);
166+
nioShutdown.orderlyShutdown(acceptors, socketSelectors);
179167

180168
profileToChannelFactory.clear();
181169
socketSelectors.clear();
@@ -202,8 +190,6 @@ private Consumer<NioSocketChannel> getContextSetter(String profileName) {
202190

203191
private void acceptChannel(NioSocketChannel channel) {
204192
TcpNioSocketChannel tcpChannel = (TcpNioSocketChannel) channel;
205-
openChannels.acceptedChannelOpened(tcpChannel);
206-
tcpChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel)));
207193
serverAcceptedChannel(tcpChannel);
208194

209195
}

0 commit comments

Comments
 (0)