Skip to content

Commit 70cf786

Browse files
authored
Pass DiscoveryNode to initiateChannel (#32958) (#33484)
This is related to #32517. This commit passes the DiscoveryNode to the initiateChannel method for different Transport implementation. This will allow additional attributes (besides just the socket address) to be used when opening channels.
1 parent bf72170 commit 70cf786

File tree

5 files changed

+13
-8
lines changed

5 files changed

+13
-8
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.ElasticsearchException;
4141
import org.elasticsearch.ExceptionsHelper;
4242
import org.elasticsearch.action.ActionListener;
43+
import org.elasticsearch.cluster.node.DiscoveryNode;
4344
import org.elasticsearch.common.SuppressForbidden;
4445
import org.elasticsearch.common.collect.Tuple;
4546
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -228,7 +229,8 @@ protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
228229
}
229230

230231
@Override
231-
protected NettyTcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> listener) throws IOException {
232+
protected NettyTcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> listener) throws IOException {
233+
InetSocketAddress address = node.getAddress().address();
232234
ChannelFuture channelFuture = bootstrap.connect(address);
233235
Channel channel = channelFuture.channel();
234236
if (channel == null) {

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect
439439
try {
440440
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
441441
connectionFutures.add(connectFuture);
442-
TcpChannel channel = initiateChannel(node.getAddress().address(), connectFuture);
442+
TcpChannel channel = initiateChannel(node, connectFuture);
443443
logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel));
444444
channels.add(channel);
445445
} catch (Exception e) {
@@ -825,12 +825,12 @@ protected void serverAcceptedChannel(TcpChannel channel) {
825825
/**
826826
* Initiate a single tcp socket channel.
827827
*
828-
* @param address address for the initiated connection
828+
* @param node for the initiated connection
829829
* @param connectListener listener to be called when connection complete
830830
* @return the pending connection
831831
* @throws IOException if an I/O exception occurs while opening the channel
832832
*/
833-
protected abstract TcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException;
833+
protected abstract TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException;
834834

835835
/**
836836
* Called to tear down internal resources

server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ protected FakeChannel bind(String name, InetSocketAddress address) throws IOExce
185185
}
186186

187187
@Override
188-
protected FakeChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
188+
protected FakeChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
189189
return new FakeChannel(messageCaptor);
190190
}
191191

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.elasticsearch.transport;
2020

2121
import org.elasticsearch.cli.SuppressForbidden;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
2223
import org.elasticsearch.core.internal.io.IOUtils;
2324
import org.elasticsearch.Version;
2425
import org.elasticsearch.action.ActionListener;
@@ -169,7 +170,8 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx
169170

170171
@Override
171172
@SuppressForbidden(reason = "real socket for mocking remote connections")
172-
protected MockChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
173+
protected MockChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
174+
InetSocketAddress address = node.getAddress().address();
173175
final MockSocket socket = new MockSocket();
174176
final MockChannel channel = new MockChannel(socket, address, "none");
175177

test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.elasticsearch.ElasticsearchException;
2424
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2627
import org.elasticsearch.common.network.NetworkService;
2728
import org.elasticsearch.common.recycler.Recycler;
@@ -85,8 +86,8 @@ protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address)
8586
}
8687

8788
@Override
88-
protected TcpNioSocketChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener)
89-
throws IOException {
89+
protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
90+
InetSocketAddress address = node.getAddress().address();
9091
TcpNioSocketChannel channel = nioGroup.openChannel(address, clientChannelFactory);
9192
channel.addConnectListener(connectListener);
9293
return channel;

0 commit comments

Comments
 (0)