diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 3c31cddb39945..ed26d0b07cdba 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -33,9 +33,11 @@ import java.io.Closeable; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -238,6 +240,13 @@ public int size() { return connectedNodes.size(); } + /** + * Returns the set of nodes this manager is connected to. + */ + public Set connectedNodes() { + return Collections.unmodifiableSet(connectedNodes.keySet()); + } + @Override public void close() { assert Transports.assertNotTransportThread("Closing ConnectionManager"); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 8587e963c26d9..68eb6bd1c2b99 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -50,16 +50,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -84,7 +83,6 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos private final TransportService transportService; private final ConnectionManager connectionManager; - private final ConnectedNodes connectedNodes; private final String clusterAlias; private final int maxNumRemoteConnections; private final Predicate nodePredicate; @@ -123,7 +121,6 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos this.nodePredicate = nodePredicate; this.clusterAlias = clusterAlias; this.connectionManager = connectionManager; - this.connectedNodes = new ConnectedNodes(clusterAlias); this.seedNodes = Collections.unmodifiableList(seedNodes); this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE .getConcreteSettingForNamespace(clusterAlias).get(settings); @@ -176,8 +173,7 @@ boolean isSkipUnavailable() { @Override public void onNodeDisconnected(DiscoveryNode node) { - boolean remove = connectedNodes.remove(node); - if (remove && connectedNodes.size() < maxNumRemoteConnections) { + if (connectionManager.size() < maxNumRemoteConnections) { // try to reconnect and fill up the slot of the disconnected node connectHandler.forceConnect(); } @@ -188,7 +184,7 @@ public void onNodeDisconnected(DiscoveryNode node) { * will invoke the listener immediately. */ void ensureConnected(ActionListener voidActionListener) { - if (connectedNodes.size() == 0) { + if (connectionManager.size() == 0) { connectHandler.connect(voidActionListener); } else { voidActionListener.onResponse(null); @@ -466,14 +462,13 @@ private void collectRemoteNodes(Iterator> seedNodes, fin } final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode()); - if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { + if (nodePredicate.test(handshakeNode) && manager.size() < maxNumRemoteConnections) { PlainActionFuture.get(fut -> manager.connectToNode(handshakeNode, null, transportService.connectionValidator(handshakeNode), ActionListener.map(fut, x -> null))); if (remoteClusterName.get() == null) { assert handshakeResponse.getClusterName().value() != null; remoteClusterName.set(handshakeResponse.getClusterName()); } - connectedNodes.add(handshakeNode); } ClusterStateRequest request = new ClusterStateRequest(); request.clear(); @@ -580,12 +575,11 @@ public void handleResponse(ClusterStateResponse response) { Iterable nodesIter = nodes.getNodes()::valuesIt; for (DiscoveryNode n : nodesIter) { DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n); - if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { + if (nodePredicate.test(node) && connectionManager.size() < maxNumRemoteConnections) { try { // noop if node is connected PlainActionFuture.get(fut -> connectionManager.connectToNode(node, null, transportService.connectionValidator(node), ActionListener.map(fut, x -> null))); - connectedNodes.add(node); } catch (ConnectTransportException | IllegalStateException ex) { // ISE if we fail the handshake with an version incompatible node // fair enough we can't connect just move on @@ -628,15 +622,20 @@ boolean assertNoRunningConnections() { // for testing only } boolean isNodeConnected(final DiscoveryNode node) { - return connectedNodes.contains(node); + return connectionManager.nodeConnected(node); } - DiscoveryNode getAnyConnectedNode() { - return connectedNodes.getAny(); - } + private final AtomicLong nextNodeId = new AtomicLong(); - void addConnectedNode(DiscoveryNode node) { - connectedNodes.add(node); + DiscoveryNode getAnyConnectedNode() { + List nodes = new ArrayList<>(connectionManager.connectedNodes()); + if (nodes.isEmpty()) { + throw new NoSuchRemoteClusterException(clusterAlias); + } else { + long curr; + while ((curr = nextNodeId.incrementAndGet()) == Long.MIN_VALUE); + return nodes.get(Math.floorMod(curr, nodes.size())); + } } /** @@ -647,67 +646,13 @@ public RemoteConnectionInfo getConnectionInfo() { clusterAlias, seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()), maxNumRemoteConnections, - connectedNodes.size(), + getNumNodesConnected(), initialConnectionTimeout, skipUnavailable); } int getNumNodesConnected() { - return connectedNodes.size(); - } - - private static final class ConnectedNodes { - - private final Set nodeSet = new HashSet<>(); - private final String clusterAlias; - - private Iterator currentIterator = null; - - private ConnectedNodes(String clusterAlias) { - this.clusterAlias = clusterAlias; - } - - public synchronized DiscoveryNode getAny() { - ensureIteratorAvailable(); - if (currentIterator.hasNext()) { - return currentIterator.next(); - } else { - throw new NoSuchRemoteClusterException(clusterAlias); - } - } - - synchronized boolean remove(DiscoveryNode node) { - final boolean setRemoval = nodeSet.remove(node); - if (setRemoval) { - currentIterator = null; - } - return setRemoval; - } - - synchronized boolean add(DiscoveryNode node) { - final boolean added = nodeSet.add(node); - if (added) { - currentIterator = null; - } - return added; - } - - synchronized int size() { - return nodeSet.size(); - } - - synchronized boolean contains(DiscoveryNode node) { - return nodeSet.contains(node); - } - - private synchronized void ensureIteratorAvailable() { - if (currentIterator == null) { - currentIterator = nodeSet.iterator(); - } else if (currentIterator.hasNext() == false && nodeSet.isEmpty() == false) { - // iterator rollover - currentIterator = nodeSet.iterator(); - } - } + return connectionManager.size(); } private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) { diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 33397ae580282..b4ead893846ba 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; @@ -214,11 +215,9 @@ public void setup() { = new ConnectionManager(settings, capturingTransport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport); - connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> { - final boolean isConnected = connectedNodes.contains(discoveryNode); - final boolean isDisconnected = disconnectedNodes.contains(discoveryNode); - assert isConnected != isDisconnected : discoveryNode + ": isConnected=" + isConnected + ", isDisconnected=" + isDisconnected; - return isConnected; + connectionManager.setDefaultNodeConnectedBehavior(cm -> { + assertTrue(Sets.haveEmptyIntersection(connectedNodes, disconnectedNodes)); + return connectedNodes; }); connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> capturingTransport.createConnection(discoveryNode)); transportService = new TransportService(settings, capturingTransport, deterministicTaskQueue.getThreadPool(), diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index d62bd37564d74..312720a50e34a 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -478,9 +479,10 @@ public void testConnectWithIncompatibleTransports() throws Exception { public void testRemoteConnectionVersionMatchesTransportConnectionVersion() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); - final Version previousVersion = VersionUtils.getPreviousVersion(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, previousVersion); - MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + final Version previousVersion = randomValueOtherThan(Version.CURRENT, () -> VersionUtils.randomVersionBetween(random(), + Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT)); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, previousVersion)) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); assertThat(seedNode, notNullValue()); @@ -519,12 +521,10 @@ public void sendRequest(long requestId, String action, TransportRequest request, service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) { - connection.addConnectedNode(seedNode); - for (DiscoveryNode node : knownNodes) { - final Transport.Connection transportConnection = connection.getConnection(node); - assertThat(transportConnection.getVersion(), equalTo(previousVersion)); - } + PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null))); assertThat(knownNodes, iterableWithSize(2)); + assertThat(connection.getConnection(seedNode).getVersion(), equalTo(Version.CURRENT)); + assertThat(connection.getConnection(oldVersionNode).getVersion(), equalTo(previousVersion)); } } } @@ -979,7 +979,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted discoverableTransports.add(transportService); } - List>> seedNodes = randomSubsetOf(discoverableNodes); + List>> seedNodes = new CopyOnWriteArrayList<>(randomSubsetOf(discoverableNodes)); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -1020,11 +1020,14 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted barrier.await(); for (int j = 0; j < numDisconnects; j++) { if (randomBoolean()) { + String node = "discoverable_node_added" + counter.incrementAndGet(); MockTransportService transportService = - startTransport("discoverable_node_added" + counter.incrementAndGet(), knownNodes, + startTransport(node, knownNodes, Version.CURRENT); discoverableTransports.add(transportService); - connection.addConnectedNode(transportService.getLocalDiscoNode()); + seedNodes.add(Tuple.tuple(node, () -> transportService.getLocalDiscoNode())); + PlainActionFuture.get(fut -> connection.updateSeedNodes(null, seedNodes, + ActionListener.map(fut, x -> null))); } else { DiscoveryNode node = randomFrom(discoverableNodes).v2().get(); connection.onNodeDisconnected(node); @@ -1133,8 +1136,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport); StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport); - connectionManager.addNodeConnectedBehavior(connectedNode.getAddress(), (cm, discoveryNode) - -> discoveryNode.equals(connectedNode)); + connectionManager.setDefaultNodeConnectedBehavior(cm -> Collections.singleton(connectedNode)); connectionManager.addConnectBehavior(connectedNode.getAddress(), (cm, discoveryNode) -> { if (discoveryNode == connectedNode) { @@ -1146,7 +1148,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", seedNodes(connectedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) { - connection.addConnectedNode(connectedNode); + PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null))); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected Transport.Connection remoteConnection = connection.getConnection(connectedNode); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index 93832833b7ff4..87e1f25336229 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -81,7 +81,7 @@ public TransportService createTransportService(Settings settings, ThreadPool thr @Nullable ClusterSettings clusterSettings, Set taskHeaders) { StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this), settings, this); - connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode)); + connectionManager.setDefaultNodeConnectedBehavior(cm -> Collections.emptySet()); connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode)); return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, connectionManager); @@ -186,10 +186,6 @@ public void sendRequest(long requestId, String action, TransportRequest request, protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { } - protected boolean nodeConnected(DiscoveryNode discoveryNode) { - return true; - } - @Override public TransportStats getStats() { throw new UnsupportedOperationException(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 18caa0edd96e0..49c12fb096859 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -499,15 +499,6 @@ public boolean addGetConnectionBehavior(StubbableConnectionManager.GetConnection return connectionManager().setDefaultGetConnectionBehavior(behavior); } - /** - * Adds a node connected behavior that is used for the given delegate address. - * - * @return {@code true} if no other node connected behavior was registered for this address before. - */ - public boolean addNodeConnectedBehavior(TransportAddress transportAddress, StubbableConnectionManager.NodeConnectedBehavior behavior) { - return connectionManager().addNodeConnectedBehavior(transportAddress, behavior); - } - /** * Adds a node connected behavior that is the default node connected behavior. * diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java index a14eaa691f43e..8f07bc19d0b11 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java @@ -28,6 +28,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -35,15 +36,13 @@ public class StubbableConnectionManager extends ConnectionManager { private final ConnectionManager delegate; private final ConcurrentMap getConnectionBehaviors; - private final ConcurrentMap nodeConnectedBehaviors; private volatile GetConnectionBehavior defaultGetConnectionBehavior = ConnectionManager::getConnection; - private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected; + private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::connectedNodes; public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport) { super(settings, transport); this.delegate = delegate; this.getConnectionBehaviors = new ConcurrentHashMap<>(); - this.nodeConnectedBehaviors = new ConcurrentHashMap<>(); } public boolean addConnectBehavior(TransportAddress transportAddress, GetConnectionBehavior connectBehavior) { @@ -56,10 +55,6 @@ public boolean setDefaultGetConnectionBehavior(GetConnectionBehavior behavior) { return prior == null; } - public boolean addNodeConnectedBehavior(TransportAddress transportAddress, NodeConnectedBehavior behavior) { - return nodeConnectedBehaviors.put(transportAddress, behavior) == null; - } - public boolean setDefaultNodeConnectedBehavior(NodeConnectedBehavior behavior) { NodeConnectedBehavior prior = defaultNodeConnectedBehavior; defaultNodeConnectedBehavior = behavior; @@ -69,13 +64,11 @@ public boolean setDefaultNodeConnectedBehavior(NodeConnectedBehavior behavior) { public void clearBehaviors() { defaultGetConnectionBehavior = ConnectionManager::getConnection; getConnectionBehaviors.clear(); - defaultNodeConnectedBehavior = ConnectionManager::nodeConnected; - nodeConnectedBehaviors.clear(); + defaultNodeConnectedBehavior = ConnectionManager::connectedNodes; } public void clearBehavior(TransportAddress transportAddress) { getConnectionBehaviors.remove(transportAddress); - nodeConnectedBehaviors.remove(transportAddress); } @Override @@ -92,9 +85,12 @@ public Transport.Connection getConnection(DiscoveryNode node) { @Override public boolean nodeConnected(DiscoveryNode node) { - TransportAddress address = node.getAddress(); - NodeConnectedBehavior behavior = nodeConnectedBehaviors.getOrDefault(address, defaultNodeConnectedBehavior); - return behavior.nodeConnected(delegate, node); + return defaultNodeConnectedBehavior.connectedNodes(delegate).contains(node); + } + + @Override + public Set connectedNodes() { + return defaultNodeConnectedBehavior.connectedNodes(delegate); } @Override @@ -136,6 +132,6 @@ public interface GetConnectionBehavior { @FunctionalInterface public interface NodeConnectedBehavior { - boolean nodeConnected(ConnectionManager connectionManager, DiscoveryNode discoveryNode); + Set connectedNodes(ConnectionManager connectionManager); } }