30
30
import org .elasticsearch .threadpool .ThreadPool ;
31
31
32
32
import java .util .Collections ;
33
+ import java .util .concurrent .Semaphore ;
33
34
import java .util .concurrent .TimeUnit ;
34
35
35
36
import static org .elasticsearch .transport .RemoteClusterConnectionTests .startTransport ;
@@ -69,7 +70,6 @@ public void testConnectAndExecuteRequest() throws Exception {
69
70
}
70
71
}
71
72
72
- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/29547" )
73
73
public void testEnsureWeReconnect () throws Exception {
74
74
Settings remoteSettings = Settings .builder ().put (ClusterName .CLUSTER_NAME_SETTING .getKey (), "foo_bar_cluster" ).build ();
75
75
try (MockTransportService remoteTransport = startTransport ("remote_node" , Collections .emptyList (), Version .CURRENT , threadPool ,
@@ -79,17 +79,35 @@ public void testEnsureWeReconnect() throws Exception {
79
79
.put (RemoteClusterService .ENABLE_REMOTE_CLUSTERS .getKey (), true )
80
80
.put ("search.remote.test.seeds" , remoteNode .getAddress ().getAddress () + ":" + remoteNode .getAddress ().getPort ()).build ();
81
81
try (MockTransportService service = MockTransportService .createNewService (localSettings , Version .CURRENT , threadPool , null )) {
82
+ Semaphore semaphore = new Semaphore (1 );
82
83
service .start ();
84
+ service .addConnectionListener (new TransportConnectionListener () {
85
+ @ Override
86
+ public void onNodeDisconnected (DiscoveryNode node ) {
87
+ if (remoteNode .equals (node )) {
88
+ semaphore .release ();
89
+ }
90
+ }
91
+ });
92
+ // this test is not perfect since we might reconnect concurrently but it will fail most of the time if we don't have
93
+ // the right calls in place in the RemoteAwareClient
83
94
service .acceptIncomingRequests ();
84
- service .disconnectFromNode (remoteNode );
85
- RemoteClusterService remoteClusterService = service .getRemoteClusterService ();
86
- assertBusy (() -> assertFalse (remoteClusterService .isRemoteNodeConnected ("test" , remoteNode )));
87
- Client client = remoteClusterService .getRemoteClusterClient (threadPool , "test" );
88
- ClusterStateResponse clusterStateResponse = client .admin ().cluster ().prepareState ().execute ().get ();
89
- assertNotNull (clusterStateResponse );
90
- assertEquals ("foo_bar_cluster" , clusterStateResponse .getState ().getClusterName ().value ());
95
+ for (int i = 0 ; i < 10 ; i ++) {
96
+ semaphore .acquire ();
97
+ try {
98
+ service .disconnectFromNode (remoteNode );
99
+ semaphore .acquire ();
100
+ RemoteClusterService remoteClusterService = service .getRemoteClusterService ();
101
+ Client client = remoteClusterService .getRemoteClusterClient (threadPool , "test" );
102
+ ClusterStateResponse clusterStateResponse = client .admin ().cluster ().prepareState ().execute ().get ();
103
+ assertNotNull (clusterStateResponse );
104
+ assertEquals ("foo_bar_cluster" , clusterStateResponse .getState ().getClusterName ().value ());
105
+ assertTrue (remoteClusterService .isRemoteNodeConnected ("test" , remoteNode ));
106
+ } finally {
107
+ semaphore .release ();
108
+ }
109
+ }
91
110
}
92
111
}
93
112
}
94
-
95
113
}
0 commit comments