Skip to content

Commit f00b658

Browse files
authored
Remove RemoteClusterConnection.ConnectedNodes (#44235)
This instead exposes the set of connected nodes on ConnectionManager.
1 parent ac6b936 commit f00b658

File tree

7 files changed

+58
-120
lines changed

7 files changed

+58
-120
lines changed

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@
3333

3434
import java.io.Closeable;
3535
import java.util.ArrayList;
36+
import java.util.Collections;
3637
import java.util.Iterator;
3738
import java.util.List;
3839
import java.util.Map;
40+
import java.util.Set;
3941
import java.util.concurrent.ConcurrentMap;
4042
import java.util.concurrent.CopyOnWriteArrayList;
4143
import java.util.concurrent.CountDownLatch;
@@ -238,6 +240,13 @@ public int size() {
238240
return connectedNodes.size();
239241
}
240242

243+
/**
244+
* Returns the set of nodes this manager is connected to.
245+
*/
246+
public Set<DiscoveryNode> connectedNodes() {
247+
return Collections.unmodifiableSet(connectedNodes.keySet());
248+
}
249+
241250
@Override
242251
public void close() {
243252
assert Transports.assertNotTransportThread("Closing ConnectionManager");

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 18 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,15 @@
5050
import java.util.ArrayList;
5151
import java.util.Collection;
5252
import java.util.Collections;
53-
import java.util.HashSet;
5453
import java.util.Iterator;
5554
import java.util.List;
56-
import java.util.Set;
5755
import java.util.concurrent.ArrayBlockingQueue;
5856
import java.util.concurrent.BlockingQueue;
5957
import java.util.concurrent.ExecutorService;
6058
import java.util.concurrent.RejectedExecutionException;
6159
import java.util.concurrent.Semaphore;
6260
import java.util.concurrent.atomic.AtomicBoolean;
61+
import java.util.concurrent.atomic.AtomicLong;
6362
import java.util.function.Function;
6463
import java.util.function.Predicate;
6564
import java.util.function.Supplier;
@@ -84,7 +83,6 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
8483

8584
private final TransportService transportService;
8685
private final ConnectionManager connectionManager;
87-
private final ConnectedNodes connectedNodes;
8886
private final String clusterAlias;
8987
private final int maxNumRemoteConnections;
9088
private final Predicate<DiscoveryNode> nodePredicate;
@@ -123,7 +121,6 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
123121
this.nodePredicate = nodePredicate;
124122
this.clusterAlias = clusterAlias;
125123
this.connectionManager = connectionManager;
126-
this.connectedNodes = new ConnectedNodes(clusterAlias);
127124
this.seedNodes = Collections.unmodifiableList(seedNodes);
128125
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
129126
.getConcreteSettingForNamespace(clusterAlias).get(settings);
@@ -176,8 +173,7 @@ boolean isSkipUnavailable() {
176173

177174
@Override
178175
public void onNodeDisconnected(DiscoveryNode node) {
179-
boolean remove = connectedNodes.remove(node);
180-
if (remove && connectedNodes.size() < maxNumRemoteConnections) {
176+
if (connectionManager.size() < maxNumRemoteConnections) {
181177
// try to reconnect and fill up the slot of the disconnected node
182178
connectHandler.forceConnect();
183179
}
@@ -188,7 +184,7 @@ public void onNodeDisconnected(DiscoveryNode node) {
188184
* will invoke the listener immediately.
189185
*/
190186
void ensureConnected(ActionListener<Void> voidActionListener) {
191-
if (connectedNodes.size() == 0) {
187+
if (connectionManager.size() == 0) {
192188
connectHandler.connect(voidActionListener);
193189
} else {
194190
voidActionListener.onResponse(null);
@@ -466,14 +462,13 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, fin
466462
}
467463

468464
final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode());
469-
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
465+
if (nodePredicate.test(handshakeNode) && manager.size() < maxNumRemoteConnections) {
470466
PlainActionFuture.get(fut -> manager.connectToNode(handshakeNode, null,
471467
transportService.connectionValidator(handshakeNode), ActionListener.map(fut, x -> null)));
472468
if (remoteClusterName.get() == null) {
473469
assert handshakeResponse.getClusterName().value() != null;
474470
remoteClusterName.set(handshakeResponse.getClusterName());
475471
}
476-
connectedNodes.add(handshakeNode);
477472
}
478473
ClusterStateRequest request = new ClusterStateRequest();
479474
request.clear();
@@ -580,12 +575,11 @@ public void handleResponse(ClusterStateResponse response) {
580575
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
581576
for (DiscoveryNode n : nodesIter) {
582577
DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n);
583-
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
578+
if (nodePredicate.test(node) && connectionManager.size() < maxNumRemoteConnections) {
584579
try {
585580
// noop if node is connected
586581
PlainActionFuture.get(fut -> connectionManager.connectToNode(node, null,
587582
transportService.connectionValidator(node), ActionListener.map(fut, x -> null)));
588-
connectedNodes.add(node);
589583
} catch (ConnectTransportException | IllegalStateException ex) {
590584
// ISE if we fail the handshake with an version incompatible node
591585
// fair enough we can't connect just move on
@@ -628,15 +622,20 @@ boolean assertNoRunningConnections() { // for testing only
628622
}
629623

630624
boolean isNodeConnected(final DiscoveryNode node) {
631-
return connectedNodes.contains(node);
625+
return connectionManager.nodeConnected(node);
632626
}
633627

634-
DiscoveryNode getAnyConnectedNode() {
635-
return connectedNodes.getAny();
636-
}
628+
private final AtomicLong nextNodeId = new AtomicLong();
637629

638-
void addConnectedNode(DiscoveryNode node) {
639-
connectedNodes.add(node);
630+
DiscoveryNode getAnyConnectedNode() {
631+
List<DiscoveryNode> nodes = new ArrayList<>(connectionManager.connectedNodes());
632+
if (nodes.isEmpty()) {
633+
throw new NoSuchRemoteClusterException(clusterAlias);
634+
} else {
635+
long curr;
636+
while ((curr = nextNodeId.incrementAndGet()) == Long.MIN_VALUE);
637+
return nodes.get(Math.floorMod(curr, nodes.size()));
638+
}
640639
}
641640

642641
/**
@@ -647,67 +646,13 @@ public RemoteConnectionInfo getConnectionInfo() {
647646
clusterAlias,
648647
seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()),
649648
maxNumRemoteConnections,
650-
connectedNodes.size(),
649+
getNumNodesConnected(),
651650
initialConnectionTimeout,
652651
skipUnavailable);
653652
}
654653

655654
int getNumNodesConnected() {
656-
return connectedNodes.size();
657-
}
658-
659-
private static final class ConnectedNodes {
660-
661-
private final Set<DiscoveryNode> nodeSet = new HashSet<>();
662-
private final String clusterAlias;
663-
664-
private Iterator<DiscoveryNode> currentIterator = null;
665-
666-
private ConnectedNodes(String clusterAlias) {
667-
this.clusterAlias = clusterAlias;
668-
}
669-
670-
public synchronized DiscoveryNode getAny() {
671-
ensureIteratorAvailable();
672-
if (currentIterator.hasNext()) {
673-
return currentIterator.next();
674-
} else {
675-
throw new NoSuchRemoteClusterException(clusterAlias);
676-
}
677-
}
678-
679-
synchronized boolean remove(DiscoveryNode node) {
680-
final boolean setRemoval = nodeSet.remove(node);
681-
if (setRemoval) {
682-
currentIterator = null;
683-
}
684-
return setRemoval;
685-
}
686-
687-
synchronized boolean add(DiscoveryNode node) {
688-
final boolean added = nodeSet.add(node);
689-
if (added) {
690-
currentIterator = null;
691-
}
692-
return added;
693-
}
694-
695-
synchronized int size() {
696-
return nodeSet.size();
697-
}
698-
699-
synchronized boolean contains(DiscoveryNode node) {
700-
return nodeSet.contains(node);
701-
}
702-
703-
private synchronized void ensureIteratorAvailable() {
704-
if (currentIterator == null) {
705-
currentIterator = nodeSet.iterator();
706-
} else if (currentIterator.hasNext() == false && nodeSet.isEmpty() == false) {
707-
// iterator rollover
708-
currentIterator = nodeSet.iterator();
709-
}
710-
}
655+
return connectionManager.size();
711656
}
712657

713658
private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {

server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.io.stream.StreamInput;
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.common.transport.TransportAddress;
33+
import org.elasticsearch.common.util.set.Sets;
3334
import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
3435
import org.elasticsearch.test.ESTestCase;
3536
import org.elasticsearch.test.transport.CapturingTransport;
@@ -214,11 +215,9 @@ public void setup() {
214215
= new ConnectionManager(settings, capturingTransport);
215216
StubbableConnectionManager connectionManager
216217
= new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport);
217-
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> {
218-
final boolean isConnected = connectedNodes.contains(discoveryNode);
219-
final boolean isDisconnected = disconnectedNodes.contains(discoveryNode);
220-
assert isConnected != isDisconnected : discoveryNode + ": isConnected=" + isConnected + ", isDisconnected=" + isDisconnected;
221-
return isConnected;
218+
connectionManager.setDefaultNodeConnectedBehavior(cm -> {
219+
assertTrue(Sets.haveEmptyIntersection(connectedNodes, disconnectedNodes));
220+
return connectedNodes;
222221
});
223222
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> capturingTransport.createConnection(discoveryNode));
224223
transportService = new TransportService(settings, capturingTransport, deterministicTaskQueue.getThreadPool(),

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.action.search.SearchRequest;
3535
import org.elasticsearch.action.search.SearchResponse;
3636
import org.elasticsearch.action.search.ShardSearchFailure;
37+
import org.elasticsearch.action.support.PlainActionFuture;
3738
import org.elasticsearch.cluster.ClusterName;
3839
import org.elasticsearch.cluster.ClusterState;
3940
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -478,9 +479,10 @@ public void testConnectWithIncompatibleTransports() throws Exception {
478479

479480
public void testRemoteConnectionVersionMatchesTransportConnectionVersion() throws Exception {
480481
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
481-
final Version previousVersion = VersionUtils.getPreviousVersion();
482-
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, previousVersion);
483-
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
482+
final Version previousVersion = randomValueOtherThan(Version.CURRENT, () -> VersionUtils.randomVersionBetween(random(),
483+
Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT));
484+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
485+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, previousVersion)) {
484486

485487
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
486488
assertThat(seedNode, notNullValue());
@@ -519,12 +521,10 @@ public void sendRequest(long requestId, String action, TransportRequest request,
519521
service.acceptIncomingRequests();
520522
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
521523
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) {
522-
connection.addConnectedNode(seedNode);
523-
for (DiscoveryNode node : knownNodes) {
524-
final Transport.Connection transportConnection = connection.getConnection(node);
525-
assertThat(transportConnection.getVersion(), equalTo(previousVersion));
526-
}
524+
PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null)));
527525
assertThat(knownNodes, iterableWithSize(2));
526+
assertThat(connection.getConnection(seedNode).getVersion(), equalTo(Version.CURRENT));
527+
assertThat(connection.getConnection(oldVersionNode).getVersion(), equalTo(previousVersion));
528528
}
529529
}
530530
}
@@ -979,7 +979,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted
979979
discoverableTransports.add(transportService);
980980
}
981981

982-
List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = randomSubsetOf(discoverableNodes);
982+
List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = new CopyOnWriteArrayList<>(randomSubsetOf(discoverableNodes));
983983
Collections.shuffle(seedNodes, random());
984984

985985
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
@@ -1020,11 +1020,14 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted
10201020
barrier.await();
10211021
for (int j = 0; j < numDisconnects; j++) {
10221022
if (randomBoolean()) {
1023+
String node = "discoverable_node_added" + counter.incrementAndGet();
10231024
MockTransportService transportService =
1024-
startTransport("discoverable_node_added" + counter.incrementAndGet(), knownNodes,
1025+
startTransport(node, knownNodes,
10251026
Version.CURRENT);
10261027
discoverableTransports.add(transportService);
1027-
connection.addConnectedNode(transportService.getLocalDiscoNode());
1028+
seedNodes.add(Tuple.tuple(node, () -> transportService.getLocalDiscoNode()));
1029+
PlainActionFuture.get(fut -> connection.updateSeedNodes(null, seedNodes,
1030+
ActionListener.map(fut, x -> null)));
10281031
} else {
10291032
DiscoveryNode node = randomFrom(discoverableNodes).v2().get();
10301033
connection.onNodeDisconnected(node);
@@ -1133,8 +1136,7 @@ public void sendRequest(long requestId, String action, TransportRequest request,
11331136
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
11341137
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport);
11351138

1136-
connectionManager.addNodeConnectedBehavior(connectedNode.getAddress(), (cm, discoveryNode)
1137-
-> discoveryNode.equals(connectedNode));
1139+
connectionManager.setDefaultNodeConnectedBehavior(cm -> Collections.singleton(connectedNode));
11381140

11391141
connectionManager.addConnectBehavior(connectedNode.getAddress(), (cm, discoveryNode) -> {
11401142
if (discoveryNode == connectedNode) {
@@ -1146,7 +1148,7 @@ public void sendRequest(long requestId, String action, TransportRequest request,
11461148
service.acceptIncomingRequests();
11471149
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
11481150
seedNodes(connectedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) {
1149-
connection.addConnectedNode(connectedNode);
1151+
PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null)));
11501152
for (int i = 0; i < 10; i++) {
11511153
//always a direct connection as the remote node is already connected
11521154
Transport.Connection remoteConnection = connection.getConnection(connectedNode);

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public TransportService createTransportService(Settings settings, ThreadPool thr
8181
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
8282
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this),
8383
settings, this);
84-
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode));
84+
connectionManager.setDefaultNodeConnectedBehavior(cm -> Collections.emptySet());
8585
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode));
8686
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
8787
connectionManager);
@@ -186,10 +186,6 @@ public void sendRequest(long requestId, String action, TransportRequest request,
186186
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
187187
}
188188

189-
protected boolean nodeConnected(DiscoveryNode discoveryNode) {
190-
return true;
191-
}
192-
193189
@Override
194190
public TransportStats getStats() {
195191
throw new UnsupportedOperationException();

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -500,15 +500,6 @@ public boolean addGetConnectionBehavior(StubbableConnectionManager.GetConnection
500500
return connectionManager().setDefaultGetConnectionBehavior(behavior);
501501
}
502502

503-
/**
504-
* Adds a node connected behavior that is used for the given delegate address.
505-
*
506-
* @return {@code true} if no other node connected behavior was registered for this address before.
507-
*/
508-
public boolean addNodeConnectedBehavior(TransportAddress transportAddress, StubbableConnectionManager.NodeConnectedBehavior behavior) {
509-
return connectionManager().addNodeConnectedBehavior(transportAddress, behavior);
510-
}
511-
512503
/**
513504
* Adds a node connected behavior that is the default node connected behavior.
514505
*

0 commit comments

Comments
 (0)