18
18
*/
19
19
package org .elasticsearch .transport ;
20
20
21
+ import java .net .InetSocketAddress ;
21
22
import java .util .function .Supplier ;
22
23
import org .apache .logging .log4j .message .ParameterizedMessage ;
23
24
import org .apache .lucene .store .AlreadyClosedException ;
@@ -88,6 +89,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
88
89
private final int maxNumRemoteConnections ;
89
90
private final Predicate <DiscoveryNode > nodePredicate ;
90
91
private final ThreadPool threadPool ;
92
+ private volatile String proxyAddress ;
91
93
private volatile List <Supplier <DiscoveryNode >> seedNodes ;
92
94
private volatile boolean skipUnavailable ;
93
95
private final ConnectHandler connectHandler ;
@@ -106,6 +108,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
106
108
RemoteClusterConnection (Settings settings , String clusterAlias , List <Supplier <DiscoveryNode >> seedNodes ,
107
109
TransportService transportService , ConnectionManager connectionManager , int maxNumRemoteConnections ,
108
110
Predicate <DiscoveryNode > nodePredicate ) {
111
+ this (settings , clusterAlias , seedNodes , transportService , connectionManager , maxNumRemoteConnections , nodePredicate , null );
112
+ }
113
+
114
+ RemoteClusterConnection (Settings settings , String clusterAlias , List <Supplier <DiscoveryNode >> seedNodes ,
115
+ TransportService transportService , ConnectionManager connectionManager , int maxNumRemoteConnections , Predicate <DiscoveryNode >
116
+ nodePredicate ,
117
+ String proxyAddress ) {
109
118
super (settings );
110
119
this .transportService = transportService ;
111
120
this .maxNumRemoteConnections = maxNumRemoteConnections ;
@@ -130,13 +139,26 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
130
139
connectionManager .addListener (this );
131
140
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
132
141
connectionManager .addListener (transportService );
142
+ this .proxyAddress = proxyAddress ;
143
+ }
144
+
145
+ private static DiscoveryNode maybeAddProxyAddress (String proxyAddress , DiscoveryNode node ) {
146
+ if (proxyAddress == null || proxyAddress .isEmpty ()) {
147
+ return node ;
148
+ } else {
149
+ // resovle proxy address lazy here
150
+ InetSocketAddress proxyInetAddress = RemoteClusterAware .parseSeedAddress (proxyAddress );
151
+ return new DiscoveryNode (node .getName (), node .getId (), node .getEphemeralId (), node .getHostName (), node
152
+ .getHostAddress (), new TransportAddress (proxyInetAddress ), node .getAttributes (), node .getRoles (), node .getVersion ());
153
+ }
133
154
}
134
155
135
156
/**
136
157
* Updates the list of seed nodes for this cluster connection
137
158
*/
138
- synchronized void updateSeedNodes (List <Supplier <DiscoveryNode >> seedNodes , ActionListener <Void > connectListener ) {
159
+ synchronized void updateSeedNodes (String proxyAddress , List <Supplier <DiscoveryNode >> seedNodes , ActionListener <Void > connectListener ) {
139
160
this .seedNodes = Collections .unmodifiableList (new ArrayList <>(seedNodes ));
161
+ this .proxyAddress = proxyAddress ;
140
162
connectHandler .connect (connectListener );
141
163
}
142
164
@@ -281,6 +303,7 @@ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
281
303
return new ProxyConnection (connection , remoteClusterNode );
282
304
}
283
305
306
+
284
307
static final class ProxyConnection implements Transport .Connection {
285
308
private final Transport .Connection proxyConnection ;
286
309
private final DiscoveryNode targetNode ;
@@ -461,7 +484,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
461
484
try {
462
485
if (seedNodes .hasNext ()) {
463
486
cancellableThreads .executeIO (() -> {
464
- final DiscoveryNode seedNode = seedNodes .next ().get ();
487
+ final DiscoveryNode seedNode = maybeAddProxyAddress ( proxyAddress , seedNodes .next ().get () );
465
488
final TransportService .HandshakeResponse handshakeResponse ;
466
489
Transport .Connection connection = manager .openConnection (seedNode ,
467
490
ConnectionProfile .buildSingleChannelProfile (TransportRequestOptions .Type .REG , null , null ));
@@ -476,7 +499,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
476
499
throw ex ;
477
500
}
478
501
479
- final DiscoveryNode handshakeNode = handshakeResponse .getDiscoveryNode ();
502
+ final DiscoveryNode handshakeNode = maybeAddProxyAddress ( proxyAddress , handshakeResponse .getDiscoveryNode () );
480
503
if (nodePredicate .test (handshakeNode ) && connectedNodes .size () < maxNumRemoteConnections ) {
481
504
manager .connectToNode (handshakeNode , remoteProfile , transportService .connectionValidator (handshakeNode ));
482
505
if (remoteClusterName .get () == null ) {
@@ -583,7 +606,8 @@ public void handleResponse(ClusterStateResponse response) {
583
606
cancellableThreads .executeIO (() -> {
584
607
DiscoveryNodes nodes = response .getState ().nodes ();
585
608
Iterable <DiscoveryNode > nodesIter = nodes .getNodes ()::valuesIt ;
586
- for (DiscoveryNode node : nodesIter ) {
609
+ for (DiscoveryNode n : nodesIter ) {
610
+ DiscoveryNode node = maybeAddProxyAddress (proxyAddress , n );
587
611
if (nodePredicate .test (node ) && connectedNodes .size () < maxNumRemoteConnections ) {
588
612
try {
589
613
connectionManager .connectToNode (node , remoteProfile ,
@@ -646,7 +670,8 @@ void addConnectedNode(DiscoveryNode node) {
646
670
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
647
671
*/
648
672
public RemoteConnectionInfo getConnectionInfo () {
649
- List <TransportAddress > seedNodeAddresses = seedNodes .stream ().map (node -> node .get ().getAddress ()).collect (Collectors .toList ());
673
+ List <TransportAddress > seedNodeAddresses = seedNodes .stream ().map (node -> node .get ().getAddress ()).collect
674
+ (Collectors .toList ());
650
675
TimeValue initialConnectionTimeout = RemoteClusterService .REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING .get (settings );
651
676
return new RemoteConnectionInfo (clusterAlias , seedNodeAddresses , maxNumRemoteConnections , connectedNodes .size (),
652
677
initialConnectionTimeout , skipUnavailable );
0 commit comments