Skip to content

Commit faa42de

Browse files
authored
Pass DiscoveryNode to initiateChannel (#32958)
This is related to #32517. This commit passes the DiscoveryNode to the initiateChannel method for different Transport implementation. This will allow additional attributes (besides just the socket address) to be used when opening channels.
1 parent eef0e35 commit faa42de

File tree

6 files changed

+16
-8
lines changed

6 files changed

+16
-8
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.logging.log4j.message.ParameterizedMessage;
4040
import org.apache.logging.log4j.util.Supplier;
4141
import org.elasticsearch.action.ActionListener;
42+
import org.elasticsearch.cluster.node.DiscoveryNode;
4243
import org.elasticsearch.common.SuppressForbidden;
4344
import org.elasticsearch.common.collect.Tuple;
4445
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -222,7 +223,8 @@ protected ChannelHandler getClientChannelInitializer() {
222223
static final AttributeKey<Netty4TcpServerChannel> SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel");
223224

224225
@Override
225-
protected Netty4TcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> listener) throws IOException {
226+
protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> listener) throws IOException {
227+
InetSocketAddress address = node.getAddress().address();
226228
ChannelFuture channelFuture = bootstrap.connect(address);
227229
Channel channel = channelFuture.channel();
228230
if (channel == null) {

plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.cluster.node.DiscoveryNode;
2425
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2526
import org.elasticsearch.common.network.NetworkService;
2627
import org.elasticsearch.common.recycler.Recycler;
@@ -82,7 +83,8 @@ protected NioTcpServerChannel bind(String name, InetSocketAddress address) throw
8283
}
8384

8485
@Override
85-
protected NioTcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
86+
protected NioTcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
87+
InetSocketAddress address = node.getAddress().address();
8688
NioTcpChannel channel = nioGroup.openChannel(address, clientChannelFactory);
8789
channel.addConnectListener(ActionListener.toBiConsumer(connectListener));
8890
return channel;

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect
441441
try {
442442
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
443443
connectionFutures.add(connectFuture);
444-
TcpChannel channel = initiateChannel(node.getAddress().address(), connectFuture);
444+
TcpChannel channel = initiateChannel(node, connectFuture);
445445
logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel));
446446
channels.add(channel);
447447
} catch (Exception e) {
@@ -841,12 +841,12 @@ protected void serverAcceptedChannel(TcpChannel channel) {
841841
/**
842842
* Initiate a single tcp socket channel.
843843
*
844-
* @param address address for the initiated connection
844+
* @param node for the initiated connection
845845
* @param connectListener listener to be called when connection complete
846846
* @return the pending connection
847847
* @throws IOException if an I/O exception occurs while opening the channel
848848
*/
849-
protected abstract TcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException;
849+
protected abstract TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException;
850850

851851
/**
852852
* Called to tear down internal resources

server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ protected FakeChannel bind(String name, InetSocketAddress address) throws IOExce
188188
}
189189

190190
@Override
191-
protected FakeChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
191+
protected FakeChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
192192
return new FakeChannel(messageCaptor);
193193
}
194194

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.elasticsearch.transport;
2020

2121
import org.elasticsearch.cli.SuppressForbidden;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
2223
import org.elasticsearch.core.internal.io.IOUtils;
2324
import org.elasticsearch.Version;
2425
import org.elasticsearch.action.ActionListener;
@@ -162,7 +163,8 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx
162163

163164
@Override
164165
@SuppressForbidden(reason = "real socket for mocking remote connections")
165-
protected MockChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
166+
protected MockChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
167+
InetSocketAddress address = node.getAddress().address();
166168
final MockSocket socket = new MockSocket();
167169
final MockChannel channel = new MockChannel(socket, address, "none");
168170

test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.elasticsearch.ElasticsearchException;
2424
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.common.bytes.BytesReference;
2627
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2728
import org.elasticsearch.common.network.NetworkService;
@@ -85,7 +86,8 @@ protected MockServerChannel bind(String name, InetSocketAddress address) throws
8586
}
8687

8788
@Override
88-
protected MockSocketChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
89+
protected MockSocketChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
90+
InetSocketAddress address = node.getAddress().address();
8991
MockSocketChannel channel = nioGroup.openChannel(address, clientChannelFactory);
9092
channel.addConnectListener(ActionListener.toBiConsumer(connectListener));
9193
return channel;

0 commit comments

Comments
 (0)