diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index c11afa088aa53..110053bcee77b 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -31,8 +31,10 @@ import org.elasticsearch.core.internal.io.IOUtils; import java.io.Closeable; +import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -216,6 +218,10 @@ public int size() { return connectedNodes.size(); } + public Set getAllConnectedNodes() { + return Collections.unmodifiableSet(connectedNodes.keySet()); + } + @Override public void close() { internalClose(true); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 191484deff2f5..d327a171920e0 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -39,17 +39,20 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable { - protected static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class); + private static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class); private static final int MAX_LISTENERS = 100; private final AtomicBoolean closed = new AtomicBoolean(false); private final Object mutex = new Object(); - private final ThreadPool threadPool; - protected final RemoteConnectionManager connectionManager; private List> listeners = new ArrayList<>(); - RemoteConnectionStrategy(ThreadPool threadPool, RemoteConnectionManager connectionManager) { - this.threadPool = threadPool; + protected final TransportService transportService; + protected final RemoteConnectionManager connectionManager; + protected final String clusterAlias; + + RemoteConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager) { + this.clusterAlias = clusterAlias; + this.transportService = transportService; this.connectionManager = connectionManager; connectionManager.getConnectionManager().addListener(this); } @@ -61,7 +64,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis void connect(ActionListener connectListener) { boolean runConnect = false; final ActionListener listener = - ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext()); + ContextPreservingActionListener.wrapPreservingContext(connectListener, transportService.getThreadPool().getThreadContext()); boolean closed; synchronized (mutex) { closed = this.closed.get(); @@ -83,7 +86,7 @@ void connect(ActionListener connectListener) { return; } if (runConnect) { - ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT); + ExecutorService executor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT); executor.submit(new AbstractRunnable() { @Override public void onFailure(Exception e) { diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java new file mode 100644 index 0000000000000..24e9e18c8dc10 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -0,0 +1,161 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.CountDown; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class SimpleConnectionStrategy extends RemoteConnectionStrategy { + + private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3; + private static final Logger logger = LogManager.getLogger(SimpleConnectionStrategy.class); + + private final int maxNumRemoteConnections; + private final AtomicLong counter = new AtomicLong(0); + private final List> addresses; + private final AtomicReference remoteClusterName = new AtomicReference<>(); + private final ConnectionProfile profile; + private final ConnectionManager.ConnectionValidator clusterNameValidator; + + SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, + int maxNumRemoteConnections, List> addresses) { + super(clusterAlias, transportService, connectionManager); + this.maxNumRemoteConnections = maxNumRemoteConnections; + assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses"; + this.addresses = addresses; + // TODO: Move into the ConnectionManager + this.profile = new ConnectionProfile.Builder() + .addConnections(1, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) + .addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY) + .build(); + this.clusterNameValidator = (newConnection, actualProfile, listener) -> + transportService.handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, + ActionListener.map(listener, resp -> { + ClusterName remote = resp.getClusterName(); + if (remoteClusterName.compareAndSet(null, remote)) { + return null; + } else { + if (remoteClusterName.get().equals(remote) == false) { + DiscoveryNode node = newConnection.getNode(); + throw new ConnectTransportException(node, "handshake failed. unexpected remote cluster name " + remote); + } + return null; + } + })); + } + + @Override + protected boolean shouldOpenMoreConnections() { + return connectionManager.size() < maxNumRemoteConnections; + } + + @Override + protected void connectImpl(ActionListener listener) { + performSimpleConnectionProcess(addresses.iterator(), listener); + } + + private void performSimpleConnectionProcess(Iterator> addressIter, ActionListener listener) { + openConnections(listener, 1); + } + + private void openConnections(ActionListener finished, int attemptNumber) { + if (attemptNumber <= MAX_CONNECT_ATTEMPTS_PER_RUN) { + List resolved = addresses.stream().map(Supplier::get).collect(Collectors.toList()); + + int remaining = maxNumRemoteConnections - connectionManager.size(); + ActionListener compositeListener = new ActionListener<>() { + + private final AtomicInteger successfulConnections = new AtomicInteger(0); + private final CountDown countDown = new CountDown(remaining); + + @Override + public void onResponse(Void v) { + successfulConnections.incrementAndGet(); + if (countDown.countDown()) { + if (shouldOpenMoreConnections()) { + openConnections(finished, attemptNumber + 1); + } else { + finished.onResponse(v); + } + } + } + + @Override + public void onFailure(Exception e) { + if (countDown.countDown()) { + openConnections(finished, attemptNumber + 1); + } + } + }; + + + for (int i = 0; i < remaining; ++i) { + TransportAddress address = nextAddress(resolved); + String id = clusterAlias + "#" + address; + DiscoveryNode node = new DiscoveryNode(id, address, Version.CURRENT.minimumCompatibilityVersion()); + + connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<>() { + @Override + public void onResponse(Void v) { + compositeListener.onResponse(v); + } + + @Override + public void onFailure(Exception e) { + logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]", + clusterAlias, address), e); + compositeListener.onFailure(e); + } + }); + } + } else { + int openConnections = connectionManager.size(); + if (openConnections == 0) { + finished.onFailure(new IllegalStateException("Unable to open any simple connections to remote cluster [" + clusterAlias + + "]")); + } else { + logger.debug("unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]", clusterAlias, + openConnections, maxNumRemoteConnections); + finished.onResponse(null); + } + } + } + + private TransportAddress nextAddress(List resolvedAddresses) { + long curr; + while ((curr = counter.getAndIncrement()) == Long.MIN_VALUE) ; + return resolvedAddresses.get(Math.floorMod(curr, resolvedAddresses.size())); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index ce820b744bda8..f71ce576a3c22 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -19,6 +19,8 @@ package org.elasticsearch.transport; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; @@ -45,9 +47,9 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { - private final String clusterAlias; + private static final Logger logger = LogManager.getLogger(SniffConnectionStrategy.class); + private final List>> seedNodes; - private final TransportService transportService; private final int maxNumRemoteConnections; private final Predicate nodePredicate; private final SetOnce remoteClusterName = new SetOnce<>(); @@ -56,9 +58,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, String proxyAddress, int maxNumRemoteConnections, Predicate nodePredicate, List>> seedNodes) { - super(transportService.getThreadPool(), connectionManager); - this.clusterAlias = clusterAlias; - this.transportService = transportService; + super(clusterAlias, transportService, connectionManager); this.proxyAddress = proxyAddress; this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; @@ -109,15 +109,15 @@ private void collectRemoteNodes(Iterator> seedNodes, Act onFailure.accept(e); } - final StepListener handShakeStep = new StepListener<>(); + final StepListener handshakeStep = new StepListener<>(); openConnectionStep.whenComplete(connection -> { ConnectionProfile connectionProfile = connectionManager.getConnectionManager().getConnectionProfile(); transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), - getRemoteClusterNamePredicate(), handShakeStep); + getRemoteClusterNamePredicate(), handshakeStep); }, onFailure); final StepListener fullConnectionStep = new StepListener<>(); - handShakeStep.whenComplete(handshakeResponse -> { + handshakeStep.whenComplete(handshakeResponse -> { final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode()); if (nodePredicate.test(handshakeNode) && shouldOpenMoreConnections()) { @@ -135,7 +135,7 @@ private void collectRemoteNodes(Iterator> seedNodes, Act fullConnectionStep.whenComplete(aVoid -> { if (remoteClusterName.get() == null) { - TransportService.HandshakeResponse handshakeResponse = handShakeStep.result(); + TransportService.HandshakeResponse handshakeResponse = handshakeStep.result(); assert handshakeResponse.getClusterName().value() != null; remoteClusterName.set(handshakeResponse.getClusterName()); } diff --git a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java new file mode 100644 index 0000000000000..68e2622c040fd --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java @@ -0,0 +1,268 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.StubbableTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class SimpleConnectionStrategyTests extends ESTestCase { + + private final String clusterAlias = "cluster-alias"; + private final ConnectionProfile profile = RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster"); + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private MockTransportService startTransport(String id, Version version) { + return startTransport(id, version, Settings.EMPTY); + } + + public MockTransportService startTransport(final String id, final Version version, final Settings settings) { + boolean success = false; + final Settings s = Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterAlias) + .put("node.name", id) + .put(settings) + .build(); + MockTransportService newService = MockTransportService.createNewService(settings, version, threadPool); + try { + newService.start(); + newService.acceptIncomingRequests(); + success = true; + return newService; + } finally { + if (success == false) { + newService.close(); + } + } + } + + public void testSimpleStrategyWillOpenExpectedNumberOfConnectionsToAddresses() { + try (MockTransportService transport1 = startTransport("node1", Version.CURRENT); + MockTransportService transport2 = startTransport("node2", Version.CURRENT)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + TransportAddress address2 = transport2.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1, address2))) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + assertEquals(numOfConnections, connectionManager.size()); + assertTrue(strategy.assertNoRunningConnections()); + } + } + } + } + + public void testSimpleStrategyWillOpenNewConnectionsOnDisconnect() throws Exception { + try (MockTransportService transport1 = startTransport("node1", Version.CURRENT); + MockTransportService transport2 = startTransport("node2", Version.CURRENT)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + TransportAddress address2 = transport2.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1, address2))) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + long initialConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream() + .filter(n -> n.getAddress().equals(address2)) + .count(); + assertNotEquals(0, initialConnectionsToTransport2); + assertEquals(numOfConnections, connectionManager.size()); + assertTrue(strategy.assertNoRunningConnections()); + + transport1.close(); + + assertBusy(() -> { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + // More connections now pointing to transport2 + long finalConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream() + .filter(n -> n.getAddress().equals(address2)) + .count(); + assertTrue(finalConnectionsToTransport2 > initialConnectionsToTransport2); + assertTrue(strategy.assertNoRunningConnections()); + }); + } + } + } + } + + public void testConnectWithSingleIncompatibleNode() { + Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); + try (MockTransportService transport1 = startTransport("compatible-node", Version.CURRENT); + MockTransportService transport2 = startTransport("incompatible-node", incompatibleVersion)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + TransportAddress address2 = transport2.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + StubbableTransport stubbableTransport = new StubbableTransport(localService.transport); + ConnectionManager connectionManager = new ConnectionManager(profile, stubbableTransport); + AtomicInteger address1Attempts = new AtomicInteger(0); + AtomicInteger address2Attempts = new AtomicInteger(0); + stubbableTransport.setDefaultConnectBehavior((transport, discoveryNode, profile, listener) -> { + if (discoveryNode.getAddress().equals(address1)) { + address1Attempts.incrementAndGet(); + transport.openConnection(discoveryNode, profile, listener); + } else if (discoveryNode.getAddress().equals(address2)) { + address2Attempts.incrementAndGet(); + transport.openConnection(discoveryNode, profile, listener); + } else { + throw new AssertionError("Unexpected address"); + } + }); + int numOfConnections = 5; + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1, address2))) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + assertEquals(4 ,connectionManager.size()); + assertEquals(4 ,connectionManager.getAllConnectedNodes().stream().map(n -> n.getAddress().equals(address1)).count()); + // Three attempts on first round, one attempts on second round, zero attempts on third round + assertEquals(4, address1Attempts.get()); + // Two attempts on first round, one attempt on second round, one attempt on third round + assertEquals(4, address2Attempts.get()); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + assertTrue(strategy.assertNoRunningConnections()); + } + } + } + } + + public void testConnectFailsWithIncompatibleNodes() { + Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); + try (MockTransportService transport1 = startTransport("incompatible-node", incompatibleVersion)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1))) { + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + expectThrows(Exception.class, connectFuture::actionGet); + + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertEquals(0, connectionManager.size()); + assertTrue(strategy.assertNoRunningConnections()); + } + } + } + } + + public void testClusterNameValidationPreventConnectingToDifferentClusters() throws Exception { + Settings otherSettings = Settings.builder().put("cluster.name", "otherCluster").build(); + + try (MockTransportService transport1 = startTransport("cluster1", Version.CURRENT); + MockTransportService transport2 = startTransport("cluster2", Version.CURRENT, otherSettings)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + TransportAddress address2 = transport2.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1, address2))) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + if (connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + } else { + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + } + assertTrue(strategy.assertNoRunningConnections()); + } + } + } + } + + private static List> addresses(final TransportAddress... addresses) { + return Arrays.stream(addresses).map(s -> (Supplier) () -> s).collect(Collectors.toList()); + } +}