Skip to content

Remove RemoteClusterConnection.ConnectedNodes #44235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -238,6 +240,13 @@ public int size() {
return connectedNodes.size();
}

/**
* Returns the set of nodes this manager is connected to.
*/
public Set<DiscoveryNode> connectedNodes() {
return Collections.unmodifiableSet(connectedNodes.keySet());
}

@Override
public void close() {
assert Transports.assertNotTransportThread("Closing ConnectionManager");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,15 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand All @@ -84,7 +83,6 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos

private final TransportService transportService;
private final ConnectionManager connectionManager;
private final ConnectedNodes connectedNodes;
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
Expand Down Expand Up @@ -123,7 +121,6 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
this.connectionManager = connectionManager;
this.connectedNodes = new ConnectedNodes(clusterAlias);
this.seedNodes = Collections.unmodifiableList(seedNodes);
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
Expand Down Expand Up @@ -176,8 +173,7 @@ boolean isSkipUnavailable() {

@Override
public void onNodeDisconnected(DiscoveryNode node) {
boolean remove = connectedNodes.remove(node);
if (remove && connectedNodes.size() < maxNumRemoteConnections) {
if (connectionManager.size() < maxNumRemoteConnections) {
// try to reconnect and fill up the slot of the disconnected node
connectHandler.forceConnect();
}
Expand All @@ -188,7 +184,7 @@ public void onNodeDisconnected(DiscoveryNode node) {
* will invoke the listener immediately.
*/
void ensureConnected(ActionListener<Void> voidActionListener) {
if (connectedNodes.size() == 0) {
if (connectionManager.size() == 0) {
connectHandler.connect(voidActionListener);
} else {
voidActionListener.onResponse(null);
Expand Down Expand Up @@ -466,14 +462,13 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, fin
}

final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode());
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
if (nodePredicate.test(handshakeNode) && manager.size() < maxNumRemoteConnections) {
PlainActionFuture.get(fut -> manager.connectToNode(handshakeNode, null,
transportService.connectionValidator(handshakeNode), ActionListener.map(fut, x -> null)));
if (remoteClusterName.get() == null) {
assert handshakeResponse.getClusterName().value() != null;
remoteClusterName.set(handshakeResponse.getClusterName());
}
connectedNodes.add(handshakeNode);
}
ClusterStateRequest request = new ClusterStateRequest();
request.clear();
Expand Down Expand Up @@ -580,12 +575,11 @@ public void handleResponse(ClusterStateResponse response) {
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
for (DiscoveryNode n : nodesIter) {
DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n);
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
if (nodePredicate.test(node) && connectionManager.size() < maxNumRemoteConnections) {
try {
// noop if node is connected
PlainActionFuture.get(fut -> connectionManager.connectToNode(node, null,
transportService.connectionValidator(node), ActionListener.map(fut, x -> null)));
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
// fair enough we can't connect just move on
Expand Down Expand Up @@ -628,15 +622,20 @@ boolean assertNoRunningConnections() { // for testing only
}

boolean isNodeConnected(final DiscoveryNode node) {
return connectedNodes.contains(node);
return connectionManager.nodeConnected(node);
}

DiscoveryNode getAnyConnectedNode() {
return connectedNodes.getAny();
}
private final AtomicLong nextNodeId = new AtomicLong();

void addConnectedNode(DiscoveryNode node) {
connectedNodes.add(node);
DiscoveryNode getAnyConnectedNode() {
List<DiscoveryNode> nodes = new ArrayList<>(connectionManager.connectedNodes());
if (nodes.isEmpty()) {
throw new NoSuchRemoteClusterException(clusterAlias);
} else {
long curr;
while ((curr = nextNodeId.incrementAndGet()) == Long.MIN_VALUE);
return nodes.get(Math.floorMod(curr, nodes.size()));
}
}

/**
Expand All @@ -647,67 +646,13 @@ public RemoteConnectionInfo getConnectionInfo() {
clusterAlias,
seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()),
maxNumRemoteConnections,
connectedNodes.size(),
getNumNodesConnected(),
initialConnectionTimeout,
skipUnavailable);
}

int getNumNodesConnected() {
return connectedNodes.size();
}

private static final class ConnectedNodes {

private final Set<DiscoveryNode> nodeSet = new HashSet<>();
private final String clusterAlias;

private Iterator<DiscoveryNode> currentIterator = null;

private ConnectedNodes(String clusterAlias) {
this.clusterAlias = clusterAlias;
}

public synchronized DiscoveryNode getAny() {
ensureIteratorAvailable();
if (currentIterator.hasNext()) {
return currentIterator.next();
} else {
throw new NoSuchRemoteClusterException(clusterAlias);
}
}

synchronized boolean remove(DiscoveryNode node) {
final boolean setRemoval = nodeSet.remove(node);
if (setRemoval) {
currentIterator = null;
}
return setRemoval;
}

synchronized boolean add(DiscoveryNode node) {
final boolean added = nodeSet.add(node);
if (added) {
currentIterator = null;
}
return added;
}

synchronized int size() {
return nodeSet.size();
}

synchronized boolean contains(DiscoveryNode node) {
return nodeSet.contains(node);
}

private synchronized void ensureIteratorAvailable() {
if (currentIterator == null) {
currentIterator = nodeSet.iterator();
} else if (currentIterator.hasNext() == false && nodeSet.isEmpty() == false) {
// iterator rollover
currentIterator = nodeSet.iterator();
}
}
return connectionManager.size();
}

private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
Expand Down Expand Up @@ -214,11 +215,9 @@ public void setup() {
= new ConnectionManager(settings, capturingTransport);
StubbableConnectionManager connectionManager
= new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport);
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> {
final boolean isConnected = connectedNodes.contains(discoveryNode);
final boolean isDisconnected = disconnectedNodes.contains(discoveryNode);
assert isConnected != isDisconnected : discoveryNode + ": isConnected=" + isConnected + ", isDisconnected=" + isDisconnected;
return isConnected;
connectionManager.setDefaultNodeConnectedBehavior(cm -> {
assertTrue(Sets.haveEmptyIntersection(connectedNodes, disconnectedNodes));
return connectedNodes;
});
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> capturingTransport.createConnection(discoveryNode));
transportService = new TransportService(settings, capturingTransport, deterministicTaskQueue.getThreadPool(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -478,9 +479,10 @@ public void testConnectWithIncompatibleTransports() throws Exception {

public void testRemoteConnectionVersionMatchesTransportConnectionVersion() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
final Version previousVersion = VersionUtils.getPreviousVersion();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, previousVersion);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
final Version previousVersion = randomValueOtherThan(Version.CURRENT, () -> VersionUtils.randomVersionBetween(random(),
Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT));
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, previousVersion)) {

DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
assertThat(seedNode, notNullValue());
Expand Down Expand Up @@ -519,12 +521,10 @@ public void sendRequest(long requestId, String action, TransportRequest request,
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) {
connection.addConnectedNode(seedNode);
for (DiscoveryNode node : knownNodes) {
final Transport.Connection transportConnection = connection.getConnection(node);
assertThat(transportConnection.getVersion(), equalTo(previousVersion));
}
PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null)));
assertThat(knownNodes, iterableWithSize(2));
assertThat(connection.getConnection(seedNode).getVersion(), equalTo(Version.CURRENT));
assertThat(connection.getConnection(oldVersionNode).getVersion(), equalTo(previousVersion));
}
}
}
Expand Down Expand Up @@ -979,7 +979,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted
discoverableTransports.add(transportService);
}

List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = randomSubsetOf(discoverableNodes);
List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = new CopyOnWriteArrayList<>(randomSubsetOf(discoverableNodes));
Collections.shuffle(seedNodes, random());

try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
Expand Down Expand Up @@ -1020,11 +1020,14 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted
barrier.await();
for (int j = 0; j < numDisconnects; j++) {
if (randomBoolean()) {
String node = "discoverable_node_added" + counter.incrementAndGet();
MockTransportService transportService =
startTransport("discoverable_node_added" + counter.incrementAndGet(), knownNodes,
startTransport(node, knownNodes,
Version.CURRENT);
discoverableTransports.add(transportService);
connection.addConnectedNode(transportService.getLocalDiscoNode());
seedNodes.add(Tuple.tuple(node, () -> transportService.getLocalDiscoNode()));
PlainActionFuture.get(fut -> connection.updateSeedNodes(null, seedNodes,
ActionListener.map(fut, x -> null)));
} else {
DiscoveryNode node = randomFrom(discoverableNodes).v2().get();
connection.onNodeDisconnected(node);
Expand Down Expand Up @@ -1133,8 +1136,7 @@ public void sendRequest(long requestId, String action, TransportRequest request,
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport);

connectionManager.addNodeConnectedBehavior(connectedNode.getAddress(), (cm, discoveryNode)
-> discoveryNode.equals(connectedNode));
connectionManager.setDefaultNodeConnectedBehavior(cm -> Collections.singleton(connectedNode));

connectionManager.addConnectBehavior(connectedNode.getAddress(), (cm, discoveryNode) -> {
if (discoveryNode == connectedNode) {
Expand All @@ -1146,7 +1148,7 @@ public void sendRequest(long requestId, String action, TransportRequest request,
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes(connectedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) {
connection.addConnectedNode(connectedNode);
PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null)));
for (int i = 0; i < 10; i++) {
//always a direct connection as the remote node is already connected
Transport.Connection remoteConnection = connection.getConnection(connectedNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public TransportService createTransportService(Settings settings, ThreadPool thr
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this),
settings, this);
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode));
connectionManager.setDefaultNodeConnectedBehavior(cm -> Collections.emptySet());
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode));
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
connectionManager);
Expand Down Expand Up @@ -186,10 +186,6 @@ public void sendRequest(long requestId, String action, TransportRequest request,
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
}

protected boolean nodeConnected(DiscoveryNode discoveryNode) {
return true;
}

@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,15 +499,6 @@ public boolean addGetConnectionBehavior(StubbableConnectionManager.GetConnection
return connectionManager().setDefaultGetConnectionBehavior(behavior);
}

/**
* Adds a node connected behavior that is used for the given delegate address.
*
* @return {@code true} if no other node connected behavior was registered for this address before.
*/
public boolean addNodeConnectedBehavior(TransportAddress transportAddress, StubbableConnectionManager.NodeConnectedBehavior behavior) {
return connectionManager().addNodeConnectedBehavior(transportAddress, behavior);
}

/**
* Adds a node connected behavior that is the default node connected behavior.
*
Expand Down
Loading