39
39
import org .elasticsearch .cluster .node .DiscoveryNode ;
40
40
import org .elasticsearch .cluster .node .DiscoveryNodes ;
41
41
import org .elasticsearch .common .component .AbstractComponent ;
42
+ import org .elasticsearch .common .io .stream .StreamInput ;
42
43
import org .elasticsearch .common .settings .Settings ;
43
44
import org .elasticsearch .common .transport .TransportAddress ;
44
45
import org .elasticsearch .common .util .CancellableThreads ;
84
85
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener , Closeable {
85
86
86
87
private final TransportService transportService ;
88
+ private final ConnectionManager connectionManager ;
87
89
private final ConnectionProfile remoteProfile ;
88
90
private final ConnectedNodes connectedNodes ;
89
91
private final String clusterAlias ;
90
92
private final int maxNumRemoteConnections ;
91
93
private final Predicate <DiscoveryNode > nodePredicate ;
94
+ private final ThreadPool threadPool ;
92
95
private volatile List <Supplier <DiscoveryNode >> seedNodes ;
93
96
private volatile boolean skipUnavailable ;
94
97
private final ConnectHandler connectHandler ;
95
98
private SetOnce <ClusterName > remoteClusterName = new SetOnce <>();
96
- private final ClusterName localClusterName ;
97
99
98
100
/**
99
101
* Creates a new {@link RemoteClusterConnection}
100
102
* @param settings the nodes settings object
101
103
* @param clusterAlias the configured alias of the cluster to connect to
102
104
* @param seedNodes a list of seed nodes to discover eligible nodes from
103
105
* @param transportService the local nodes transport service
106
+ * @param connectionManager the connection manager to use for this remote connection
104
107
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
105
108
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
106
109
*/
107
110
RemoteClusterConnection (Settings settings , String clusterAlias , List <Supplier <DiscoveryNode >> seedNodes ,
108
- TransportService transportService , int maxNumRemoteConnections , Predicate <DiscoveryNode > nodePredicate ) {
111
+ TransportService transportService , ConnectionManager connectionManager , int maxNumRemoteConnections ,
112
+ Predicate <DiscoveryNode > nodePredicate ) {
109
113
super (settings );
110
- this .localClusterName = ClusterName .CLUSTER_NAME_SETTING .get (settings );
111
114
this .transportService = transportService ;
112
115
this .maxNumRemoteConnections = maxNumRemoteConnections ;
113
116
this .nodePredicate = nodePredicate ;
@@ -126,7 +129,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
126
129
this .skipUnavailable = RemoteClusterService .REMOTE_CLUSTER_SKIP_UNAVAILABLE
127
130
.getConcreteSettingForNamespace (clusterAlias ).get (settings );
128
131
this .connectHandler = new ConnectHandler ();
129
- transportService .addConnectionListener (this );
132
+ this .threadPool = transportService .threadPool ;
133
+ this .connectionManager = connectionManager ;
134
+ connectionManager .addListener (this );
135
+ // we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
136
+ connectionManager .addListener (transportService );
130
137
}
131
138
132
139
/**
@@ -187,8 +194,9 @@ public void ensureConnected(ActionListener<Void> voidActionListener) {
187
194
188
195
private void fetchShardsInternal (ClusterSearchShardsRequest searchShardsRequest ,
189
196
final ActionListener <ClusterSearchShardsResponse > listener ) {
190
- final DiscoveryNode node = connectedNodes .getAny ();
191
- transportService .sendRequest (node , ClusterSearchShardsAction .NAME , searchShardsRequest ,
197
+ final DiscoveryNode node = getAnyConnectedNode ();
198
+ Transport .Connection connection = connectionManager .getConnection (node );
199
+ transportService .sendRequest (connection , ClusterSearchShardsAction .NAME , searchShardsRequest , TransportRequestOptions .EMPTY ,
192
200
new TransportResponseHandler <ClusterSearchShardsResponse >() {
193
201
194
202
@ Override
@@ -223,12 +231,16 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
223
231
request .clear ();
224
232
request .nodes (true );
225
233
request .local (true ); // run this on the node that gets the request it's as good as any other
226
- final DiscoveryNode node = connectedNodes .getAny ();
227
- transportService .sendRequest (node , ClusterStateAction .NAME , request , TransportRequestOptions .EMPTY ,
234
+ final DiscoveryNode node = getAnyConnectedNode ();
235
+ Transport .Connection connection = connectionManager .getConnection (node );
236
+ transportService .sendRequest (connection , ClusterStateAction .NAME , request , TransportRequestOptions .EMPTY ,
228
237
new TransportResponseHandler <ClusterStateResponse >() {
238
+
229
239
@ Override
230
- public ClusterStateResponse newInstance () {
231
- return new ClusterStateResponse ();
240
+ public ClusterStateResponse read (StreamInput in ) throws IOException {
241
+ ClusterStateResponse response = new ClusterStateResponse ();
242
+ response .readFrom (in );
243
+ return response ;
232
244
}
233
245
234
246
@ Override
@@ -265,11 +277,11 @@ public String executor() {
265
277
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
266
278
*/
267
279
Transport .Connection getConnection (DiscoveryNode remoteClusterNode ) {
268
- if (transportService .nodeConnected (remoteClusterNode )) {
269
- return transportService .getConnection (remoteClusterNode );
280
+ if (connectionManager .nodeConnected (remoteClusterNode )) {
281
+ return connectionManager .getConnection (remoteClusterNode );
270
282
}
271
- DiscoveryNode discoveryNode = connectedNodes . getAny ();
272
- Transport .Connection connection = transportService .getConnection (discoveryNode );
283
+ DiscoveryNode discoveryNode = getAnyConnectedNode ();
284
+ Transport .Connection connection = connectionManager .getConnection (discoveryNode );
273
285
return new ProxyConnection (connection , remoteClusterNode );
274
286
}
275
287
@@ -321,33 +333,18 @@ public Version getVersion() {
321
333
}
322
334
323
335
Transport .Connection getConnection () {
324
- return transportService .getConnection (getAnyConnectedNode ());
336
+ return connectionManager .getConnection (getAnyConnectedNode ());
325
337
}
326
338
327
339
@ Override
328
340
public void close () throws IOException {
329
- connectHandler .close ();
341
+ IOUtils .close (connectHandler , connectionManager );
330
342
}
331
343
332
344
public boolean isClosed () {
333
345
return connectHandler .isClosed ();
334
346
}
335
347
336
- private ConnectionProfile getRemoteProfile (ClusterName name ) {
337
- // we can only compare the cluster name to make a decision if we should use a remote profile
338
- // we can't use a cluster UUID here since we could be connecting to that remote cluster before
339
- // the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
340
- // rather smallish optimization on the connection layer under certain situations where remote clusters
341
- // have the same name as the local one is minor here.
342
- // the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
343
- // gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
344
- if (this .localClusterName .equals (name )) {
345
- return null ;
346
- } else {
347
- return remoteProfile ;
348
- }
349
- }
350
-
351
348
/**
352
349
* The connect handler manages node discovery and the actual connect to the remote cluster.
353
350
* There is at most one connect job running at any time. If such a connect job is triggered
@@ -391,7 +388,7 @@ private void connect(ActionListener<Void> connectListener, boolean forceRun) {
391
388
final boolean runConnect ;
392
389
final Collection <ActionListener <Void >> toNotify ;
393
390
final ActionListener <Void > listener = connectListener == null ? null :
394
- ContextPreservingActionListener .wrapPreservingContext (connectListener , transportService . getThreadPool () .getThreadContext ());
391
+ ContextPreservingActionListener .wrapPreservingContext (connectListener , threadPool .getThreadContext ());
395
392
synchronized (queue ) {
396
393
if (listener != null && queue .offer (listener ) == false ) {
397
394
listener .onFailure (new RejectedExecutionException ("connect queue is full" ));
@@ -419,7 +416,6 @@ private void connect(ActionListener<Void> connectListener, boolean forceRun) {
419
416
}
420
417
421
418
private void forkConnect (final Collection <ActionListener <Void >> toNotify ) {
422
- ThreadPool threadPool = transportService .getThreadPool ();
423
419
ExecutorService executor = threadPool .executor (ThreadPool .Names .MANAGEMENT );
424
420
executor .submit (new AbstractRunnable () {
425
421
@ Override
@@ -456,13 +452,13 @@ protected void doRun() {
456
452
maybeConnect ();
457
453
}
458
454
});
459
- collectRemoteNodes (seedNodes .iterator (), transportService , listener );
455
+ collectRemoteNodes (seedNodes .iterator (), transportService , connectionManager , listener );
460
456
}
461
457
});
462
458
}
463
459
464
460
private void collectRemoteNodes (Iterator <Supplier <DiscoveryNode >> seedNodes ,
465
- final TransportService transportService , ActionListener <Void > listener ) {
461
+ final TransportService transportService , final ConnectionManager manager , ActionListener <Void > listener ) {
466
462
if (Thread .currentThread ().isInterrupted ()) {
467
463
listener .onFailure (new InterruptedException ("remote connect thread got interrupted" ));
468
464
}
@@ -471,7 +467,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
471
467
cancellableThreads .executeIO (() -> {
472
468
final DiscoveryNode seedNode = seedNodes .next ().get ();
473
469
final TransportService .HandshakeResponse handshakeResponse ;
474
- Transport .Connection connection = transportService .openConnection (seedNode ,
470
+ Transport .Connection connection = manager .openConnection (seedNode ,
475
471
ConnectionProfile .buildSingleChannelProfile (TransportRequestOptions .Type .REG , null , null ));
476
472
boolean success = false ;
477
473
try {
@@ -486,7 +482,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
486
482
487
483
final DiscoveryNode handshakeNode = handshakeResponse .getDiscoveryNode ();
488
484
if (nodePredicate .test (handshakeNode ) && connectedNodes .size () < maxNumRemoteConnections ) {
489
- transportService .connectToNode (handshakeNode , getRemoteProfile ( handshakeResponse . getClusterName () ));
485
+ manager .connectToNode (handshakeNode , remoteProfile , transportService . connectionValidator ( handshakeNode ));
490
486
if (remoteClusterName .get () == null ) {
491
487
assert handshakeResponse .getClusterName ().value () != null ;
492
488
remoteClusterName .set (handshakeResponse .getClusterName ());
@@ -528,7 +524,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
528
524
// ISE if we fail the handshake with an version incompatible node
529
525
if (seedNodes .hasNext ()) {
530
526
logger .debug (() -> new ParameterizedMessage ("fetching nodes from external cluster {} failed" , clusterAlias ), ex );
531
- collectRemoteNodes (seedNodes , transportService , listener );
527
+ collectRemoteNodes (seedNodes , transportService , manager , listener );
532
528
} else {
533
529
listener .onFailure (ex );
534
530
}
@@ -556,7 +552,6 @@ final boolean isClosed() {
556
552
/* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
557
553
private class SniffClusterStateResponseHandler implements TransportResponseHandler <ClusterStateResponse > {
558
554
559
- private final TransportService transportService ;
560
555
private final Transport .Connection connection ;
561
556
private final ActionListener <Void > listener ;
562
557
private final Iterator <Supplier <DiscoveryNode >> seedNodes ;
@@ -565,7 +560,6 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl
565
560
SniffClusterStateResponseHandler (TransportService transportService , Transport .Connection connection ,
566
561
ActionListener <Void > listener , Iterator <Supplier <DiscoveryNode >> seedNodes ,
567
562
CancellableThreads cancellableThreads ) {
568
- this .transportService = transportService ;
569
563
this .connection = connection ;
570
564
this .listener = listener ;
571
565
this .seedNodes = seedNodes ;
@@ -596,8 +590,8 @@ public void handleResponse(ClusterStateResponse response) {
596
590
for (DiscoveryNode node : nodesIter ) {
597
591
if (nodePredicate .test (node ) && connectedNodes .size () < maxNumRemoteConnections ) {
598
592
try {
599
- transportService .connectToNode (node , getRemoteProfile ( remoteClusterName . get ())); // noop if node is
600
- // connected
593
+ connectionManager .connectToNode (node , remoteProfile ,
594
+ transportService . connectionValidator ( node )); // noop if node is connected
601
595
connectedNodes .add (node );
602
596
} catch (ConnectTransportException | IllegalStateException ex ) {
603
597
// ISE if we fail the handshake with an version incompatible node
@@ -613,7 +607,7 @@ public void handleResponse(ClusterStateResponse response) {
613
607
listener .onFailure (ex ); // we got canceled - fail the listener and step out
614
608
} catch (Exception ex ) {
615
609
logger .warn (() -> new ParameterizedMessage ("fetching nodes from external cluster {} failed" , clusterAlias ), ex );
616
- collectRemoteNodes (seedNodes , transportService , listener );
610
+ collectRemoteNodes (seedNodes , transportService , connectionManager , listener );
617
611
}
618
612
}
619
613
@@ -624,7 +618,7 @@ public void handleException(TransportException exp) {
624
618
IOUtils .closeWhileHandlingException (connection );
625
619
} finally {
626
620
// once the connection is closed lets try the next node
627
- collectRemoteNodes (seedNodes , transportService , listener );
621
+ collectRemoteNodes (seedNodes , transportService , connectionManager , listener );
628
622
}
629
623
}
630
624
@@ -781,4 +775,8 @@ private synchronized void ensureIteratorAvailable() {
781
775
}
782
776
}
783
777
}
778
+
779
+ ConnectionManager getConnectionManager () {
780
+ return connectionManager ;
781
+ }
784
782
}
0 commit comments