18
18
*/
19
19
package org .elasticsearch .transport ;
20
20
21
+ import java .net .InetSocketAddress ;
21
22
import java .util .function .Supplier ;
22
23
import org .apache .logging .log4j .message .ParameterizedMessage ;
23
24
import org .apache .lucene .store .AlreadyClosedException ;
42
43
import org .elasticsearch .common .io .stream .StreamInput ;
43
44
import org .elasticsearch .common .settings .Settings ;
44
45
import org .elasticsearch .common .transport .TransportAddress ;
46
+ import org .elasticsearch .common .unit .TimeValue ;
45
47
import org .elasticsearch .common .util .CancellableThreads ;
46
48
import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
47
49
import org .elasticsearch .common .util .concurrent .ThreadContext ;
@@ -92,6 +94,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
92
94
private final int maxNumRemoteConnections ;
93
95
private final Predicate <DiscoveryNode > nodePredicate ;
94
96
private final ThreadPool threadPool ;
97
+ private volatile String proxyAddress ;
95
98
private volatile List <Supplier <DiscoveryNode >> seedNodes ;
96
99
private volatile boolean skipUnavailable ;
97
100
private final ConnectHandler connectHandler ;
@@ -110,6 +113,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
110
113
RemoteClusterConnection (Settings settings , String clusterAlias , List <Supplier <DiscoveryNode >> seedNodes ,
111
114
TransportService transportService , ConnectionManager connectionManager , int maxNumRemoteConnections ,
112
115
Predicate <DiscoveryNode > nodePredicate ) {
116
+ this (settings , clusterAlias , seedNodes , transportService , connectionManager , maxNumRemoteConnections , nodePredicate , null );
117
+ }
118
+
119
+ RemoteClusterConnection (Settings settings , String clusterAlias , List <Supplier <DiscoveryNode >> seedNodes ,
120
+ TransportService transportService , ConnectionManager connectionManager , int maxNumRemoteConnections , Predicate <DiscoveryNode >
121
+ nodePredicate ,
122
+ String proxyAddress ) {
113
123
super (settings );
114
124
this .transportService = transportService ;
115
125
this .maxNumRemoteConnections = maxNumRemoteConnections ;
@@ -134,13 +144,26 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
134
144
connectionManager .addListener (this );
135
145
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
136
146
connectionManager .addListener (transportService );
147
+ this .proxyAddress = proxyAddress ;
148
+ }
149
+
150
+ private static DiscoveryNode maybeAddProxyAddress (String proxyAddress , DiscoveryNode node ) {
151
+ if (proxyAddress == null || proxyAddress .isEmpty ()) {
152
+ return node ;
153
+ } else {
154
+ // resovle proxy address lazy here
155
+ InetSocketAddress proxyInetAddress = RemoteClusterAware .parseSeedAddress (proxyAddress );
156
+ return new DiscoveryNode (node .getName (), node .getId (), node .getEphemeralId (), node .getHostName (), node
157
+ .getHostAddress (), new TransportAddress (proxyInetAddress ), node .getAttributes (), node .getRoles (), node .getVersion ());
158
+ }
137
159
}
138
160
139
161
/**
140
162
* Updates the list of seed nodes for this cluster connection
141
163
*/
142
- synchronized void updateSeedNodes (List <Supplier <DiscoveryNode >> seedNodes , ActionListener <Void > connectListener ) {
164
+ synchronized void updateSeedNodes (String proxyAddress , List <Supplier <DiscoveryNode >> seedNodes , ActionListener <Void > connectListener ) {
143
165
this .seedNodes = Collections .unmodifiableList (new ArrayList <>(seedNodes ));
166
+ this .proxyAddress = proxyAddress ;
144
167
connectHandler .connect (connectListener );
145
168
}
146
169
@@ -285,6 +308,7 @@ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
285
308
return new ProxyConnection (connection , remoteClusterNode );
286
309
}
287
310
311
+
288
312
static final class ProxyConnection implements Transport .Connection {
289
313
private final Transport .Connection proxyConnection ;
290
314
private final DiscoveryNode targetNode ;
@@ -465,7 +489,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
465
489
try {
466
490
if (seedNodes .hasNext ()) {
467
491
cancellableThreads .executeIO (() -> {
468
- final DiscoveryNode seedNode = seedNodes .next ().get ();
492
+ final DiscoveryNode seedNode = maybeAddProxyAddress ( proxyAddress , seedNodes .next ().get () );
469
493
final TransportService .HandshakeResponse handshakeResponse ;
470
494
Transport .Connection connection = manager .openConnection (seedNode ,
471
495
ConnectionProfile .buildSingleChannelProfile (TransportRequestOptions .Type .REG , null , null ));
@@ -480,7 +504,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
480
504
throw ex ;
481
505
}
482
506
483
- final DiscoveryNode handshakeNode = handshakeResponse .getDiscoveryNode ();
507
+ final DiscoveryNode handshakeNode = maybeAddProxyAddress ( proxyAddress , handshakeResponse .getDiscoveryNode () );
484
508
if (nodePredicate .test (handshakeNode ) && connectedNodes .size () < maxNumRemoteConnections ) {
485
509
manager .connectToNode (handshakeNode , remoteProfile , transportService .connectionValidator (handshakeNode ));
486
510
if (remoteClusterName .get () == null ) {
@@ -587,7 +611,8 @@ public void handleResponse(ClusterStateResponse response) {
587
611
cancellableThreads .executeIO (() -> {
588
612
DiscoveryNodes nodes = response .getState ().nodes ();
589
613
Iterable <DiscoveryNode > nodesIter = nodes .getNodes ()::valuesIt ;
590
- for (DiscoveryNode node : nodesIter ) {
614
+ for (DiscoveryNode n : nodesIter ) {
615
+ DiscoveryNode node = maybeAddProxyAddress (proxyAddress , n );
591
616
if (nodePredicate .test (node ) && connectedNodes .size () < maxNumRemoteConnections ) {
592
617
try {
593
618
connectionManager .connectToNode (node , remoteProfile ,
@@ -709,6 +734,14 @@ public String executor() {
709
734
}
710
735
}
711
736
737
+ RemoteConnectionInfo getLocalConnectionInfo () { // for tests
738
+ List <TransportAddress > seedNodeAddresses = seedNodes .stream ().map (node -> node .get ().getAddress ()).collect
739
+ (Collectors .toList ());
740
+ TimeValue initialConnectionTimeout = RemoteClusterService .REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING .get (settings );
741
+ return new RemoteConnectionInfo (clusterAlias , Collections .emptyList (),
742
+ seedNodeAddresses , maxNumRemoteConnections , connectedNodes .size (), initialConnectionTimeout , skipUnavailable );
743
+ }
744
+
712
745
int getNumNodesConnected () {
713
746
return connectedNodes .size ();
714
747
}
0 commit comments