Skip to content

Commit d952840

Browse files
authored
Use default profile for remote connections (elastic#50828)
Currently, the connection manager is configured with a default profile for both the sniff and proxy connection stratgies. This profile correctly reflects the expected number of connection (6 for sniff, 18 for proxy). This commit removes the proxy strategy usages of the per connection attempt profile configuration. Additionally, it refactors other unnecessary code around the connection manager. The connection manager now can always be built inside the remote connection.
1 parent a6a3d2b commit d952840

File tree

3 files changed

+13
-53
lines changed

3 files changed

+13
-53
lines changed

server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
9494
private final boolean includeServerName;
9595
private final Supplier<TransportAddress> address;
9696
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
97-
private final ConnectionProfile profile;
9897
private final ConnectionManager.ConnectionValidator clusterNameValidator;
9998

10099
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
@@ -129,11 +128,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
129128
this.includeServerName = includeServerName;
130129
assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses";
131130
this.address = address;
132-
// TODO: Move into the ConnectionManager
133-
this.profile = new ConnectionProfile.Builder()
134-
.addConnections(1, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING)
135-
.addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY)
136-
.build();
137131
this.clusterNameValidator = (newConnection, actualProfile, listener) ->
138132
transportService.handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true,
139133
ActionListener.map(listener, resp -> {
@@ -231,7 +225,7 @@ public void onFailure(Exception e) {
231225
DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
232226
Version.CURRENT.minimumCompatibilityVersion());
233227

234-
connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<>() {
228+
connectionManager.connectToNode(node, null, clusterNameValidator, new ActionListener<>() {
235229
@Override
236230
public void onResponse(Void v) {
237231
compositeListener.onResponse(v);

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,13 @@ final class RemoteClusterConnection implements Closeable {
6666
* @param transportService the local nodes transport service
6767
*/
6868
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) {
69-
this(settings, clusterAlias, transportService,
70-
createConnectionManager(RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings), transportService));
71-
}
72-
73-
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService,
74-
ConnectionManager connectionManager) {
7569
this.transportService = transportService;
7670
this.clusterAlias = clusterAlias;
77-
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
71+
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings);
72+
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService));
7873
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
7974
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
80-
connectionManager.addListener(transportService);
75+
this.remoteConnectionManager.getConnectionManager().addListener(transportService);
8176
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
8277
.getConcreteSettingForNamespace(clusterAlias).get(settings);
8378
this.threadPool = transportService.threadPool;

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 9 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.elasticsearch.search.internal.InternalSearchResponse;
5757
import org.elasticsearch.test.ESTestCase;
5858
import org.elasticsearch.test.transport.MockTransportService;
59-
import org.elasticsearch.test.transport.StubbableConnectionManager;
6059
import org.elasticsearch.threadpool.TestThreadPool;
6160
import org.elasticsearch.threadpool.ThreadPool;
6261

@@ -84,7 +83,6 @@
8483
import static org.hamcrest.Matchers.containsString;
8584
import static org.hamcrest.Matchers.equalTo;
8685
import static org.hamcrest.Matchers.instanceOf;
87-
import static org.hamcrest.Matchers.notNullValue;
8886
import static org.hamcrest.Matchers.sameInstance;
8987

9088
public class RemoteClusterConnectionTests extends ESTestCase {
@@ -546,51 +544,24 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted
546544
public void testGetConnection() throws Exception {
547545
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
548546
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
549-
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
547+
MockTransportService disconnectedTransport = startTransport("disconnected_node", knownNodes, Version.CURRENT)) {
550548

551-
DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode();
552-
assertThat(connectedNode, notNullValue());
553-
knownNodes.add(connectedNode);
549+
DiscoveryNode seedNode = seedTransport.getLocalNode();
550+
knownNodes.add(seedNode);
554551

555-
DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode();
556-
assertThat(disconnectedNode, notNullValue());
557-
knownNodes.add(disconnectedNode);
552+
DiscoveryNode disconnectedNode = disconnectedTransport.getLocalNode();
558553

559554
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
560-
Transport.Connection seedConnection = new CloseableConnection() {
561-
@Override
562-
public DiscoveryNode getNode() {
563-
return connectedNode;
564-
}
565-
566-
@Override
567-
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
568-
throws TransportException {
569-
// no-op
570-
}
571-
};
572-
573-
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
574-
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport);
575-
576-
connectionManager.setDefaultNodeConnectedBehavior((cm, node) -> connectedNode.equals(node));
577-
578-
connectionManager.addGetConnectionBehavior(connectedNode.getAddress(), (cm, discoveryNode) -> seedConnection);
579-
580-
connectionManager.addGetConnectionBehavior(disconnectedNode.getAddress(), (cm, discoveryNode) -> {
581-
throw new NodeNotConnectedException(discoveryNode, "");
582-
});
583-
584555
service.start();
585556
service.acceptIncomingRequests();
586557
String clusterAlias = "test-cluster";
587-
Settings settings = buildRandomSettings(clusterAlias, addresses(connectedNode));
588-
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, connectionManager)) {
558+
Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
559+
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
589560
PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null)));
590561
for (int i = 0; i < 10; i++) {
591562
//always a direct connection as the remote node is already connected
592-
Transport.Connection remoteConnection = connection.getConnection(connectedNode);
593-
assertSame(seedConnection, remoteConnection);
563+
Transport.Connection remoteConnection = connection.getConnection(seedNode);
564+
assertEquals(seedNode, remoteConnection.getNode());
594565
}
595566
for (int i = 0; i < 10; i++) {
596567
// we don't use the transport service connection manager so we will get a proxy connection for the local node
@@ -599,7 +570,7 @@ public void sendRequest(long requestId, String action, TransportRequest request,
599570
assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode()));
600571
}
601572
for (int i = 0; i < 10; i++) {
602-
//always a proxy connection as the target node is not connected
573+
// always a proxy connection as the target node is not connected
603574
Transport.Connection remoteConnection = connection.getConnection(disconnectedNode);
604575
assertThat(remoteConnection, instanceOf(RemoteConnectionManager.ProxyConnection.class));
605576
assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode));

0 commit comments

Comments
 (0)