|
20 | 20 |
|
21 | 21 | import org.elasticsearch.Version;
|
22 | 22 | import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
| 23 | +import org.elasticsearch.action.support.PlainActionFuture; |
23 | 24 | import org.elasticsearch.client.Client;
|
24 | 25 | import org.elasticsearch.cluster.ClusterName;
|
25 | 26 | import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
31 | 32 | import org.elasticsearch.threadpool.ThreadPool;
|
32 | 33 |
|
33 | 34 | import java.util.Collections;
|
34 |
| -import java.util.concurrent.Semaphore; |
35 | 35 | import java.util.concurrent.TimeUnit;
|
36 | 36 |
|
37 | 37 | import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport;
|
@@ -86,37 +86,27 @@ public void testEnsureWeReconnect() throws Exception {
|
86 | 86 | .put("cluster.remote.test.seeds",
|
87 | 87 | remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
|
88 | 88 | try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
|
89 |
| - Semaphore semaphore = new Semaphore(1); |
90 | 89 | service.start();
|
91 |
| - service.getRemoteClusterService().getConnections().forEach(con -> { |
92 |
| - con.getConnectionManager().addListener(new TransportConnectionListener() { |
93 |
| - @Override |
94 |
| - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { |
95 |
| - if (remoteNode.equals(node)) { |
96 |
| - semaphore.release(); |
97 |
| - } |
98 |
| - } |
99 |
| - }); |
100 |
| - }); |
101 | 90 | // this test is not perfect since we might reconnect concurrently but it will fail most of the time if we don't have
|
102 | 91 | // the right calls in place in the RemoteAwareClient
|
103 | 92 | service.acceptIncomingRequests();
|
| 93 | + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); |
| 94 | + assertBusy(() -> assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode))); |
104 | 95 | for (int i = 0; i < 10; i++) {
|
105 |
| - semaphore.acquire(); |
106 |
| - try { |
107 |
| - service.getRemoteClusterService().getConnections().forEach(con -> { |
108 |
| - con.getConnectionManager().disconnectFromNode(remoteNode); |
109 |
| - }); |
110 |
| - semaphore.acquire(); |
111 |
| - RemoteClusterService remoteClusterService = service.getRemoteClusterService(); |
112 |
| - Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); |
113 |
| - ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); |
114 |
| - assertNotNull(clusterStateResponse); |
115 |
| - assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); |
116 |
| - assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); |
117 |
| - } finally { |
118 |
| - semaphore.release(); |
119 |
| - } |
| 96 | + RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test"); |
| 97 | + assertBusy(remoteClusterConnection::assertNoRunningConnections); |
| 98 | + ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager(); |
| 99 | + Transport.Connection connection = connectionManager.getConnection(remoteNode); |
| 100 | + PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture(); |
| 101 | + connection.addCloseListener(closeFuture); |
| 102 | + connectionManager.disconnectFromNode(remoteNode); |
| 103 | + closeFuture.get(); |
| 104 | + |
| 105 | + Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); |
| 106 | + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); |
| 107 | + assertNotNull(clusterStateResponse); |
| 108 | + assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); |
| 109 | + assertTrue(remoteClusterConnection.isNodeConnected(remoteNode)); |
120 | 110 | }
|
121 | 111 | }
|
122 | 112 | }
|
|
0 commit comments