28
28
import org .elasticsearch .cluster .coordination .NoOpClusterApplier ;
29
29
import org .elasticsearch .cluster .coordination .PeersResponse ;
30
30
import org .elasticsearch .cluster .node .DiscoveryNode ;
31
- import org .elasticsearch .cluster .node .DiscoveryNode .Role ;
32
31
import org .elasticsearch .cluster .service .MasterService ;
33
32
import org .elasticsearch .common .io .stream .StreamInput ;
34
33
import org .elasticsearch .common .settings .ClusterSettings ;
37
36
import org .elasticsearch .discovery .Discovery ;
38
37
import org .elasticsearch .discovery .PeersRequest ;
39
38
import org .elasticsearch .test .ESTestCase ;
40
- import org .elasticsearch .test .junit .annotations .TestLogging ;
41
39
import org .elasticsearch .test .transport .MockTransport ;
42
40
import org .elasticsearch .threadpool .TestThreadPool ;
43
41
import org .elasticsearch .threadpool .ThreadPool ;
47
45
import org .elasticsearch .transport .TransportResponseHandler ;
48
46
import org .elasticsearch .transport .TransportService ;
49
47
import org .elasticsearch .transport .TransportService .HandshakeResponse ;
48
+ import org .junit .After ;
49
+ import org .junit .Before ;
50
50
51
51
import java .io .IOException ;
52
52
import java .util .Random ;
56
56
import static java .util .Collections .emptyList ;
57
57
import static java .util .Collections .emptyMap ;
58
58
import static java .util .Collections .emptySet ;
59
- import static java .util .Collections .singleton ;
60
59
import static org .elasticsearch .cluster .ClusterName .CLUSTER_NAME_SETTING ;
61
60
import static org .elasticsearch .discovery .PeerFinder .REQUEST_PEERS_ACTION_NAME ;
62
61
import static org .elasticsearch .transport .TransportService .HANDSHAKE_ACTION_NAME ;
69
68
public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
70
69
71
70
private final ActionFilters EMPTY_FILTERS = new ActionFilters (emptySet ());
71
+ private DiscoveryNode localNode ;
72
+ private ThreadPool threadPool ;
73
+ private String clusterName ;
74
+ private TransportService transportService ;
75
+ private Coordinator coordinator ;
76
+ private DiscoveryNode otherNode ;
77
+
78
+ @ Before
79
+ public void setupTest () {
80
+ clusterName = randomAlphaOfLength (10 );
81
+ localNode = new DiscoveryNode ("local" , buildNewFakeTransportAddress (), Version .CURRENT );
82
+ otherNode = new DiscoveryNode ("other" , buildNewFakeTransportAddress (), Version .CURRENT );
72
83
73
- public void testHandlesNonstandardDiscoveryImplementation () throws InterruptedException {
74
- final MockTransport transport = new MockTransport ();
75
- final ThreadPool threadPool = new TestThreadPool ("test" , Settings .EMPTY );
76
- final DiscoveryNode discoveryNode = new DiscoveryNode ("local" , buildNewFakeTransportAddress (), Version .CURRENT );
77
- final TransportService transportService = transport .createTransportService (Settings .EMPTY , threadPool ,
78
- TransportService .NOOP_TRANSPORT_INTERCEPTOR , boundTransportAddress -> discoveryNode , null , emptySet ());
84
+ final MockTransport transport = new MockTransport () {
85
+ @ Override
86
+ protected void onSendRequest (long requestId , String action , TransportRequest request , DiscoveryNode node ) {
87
+ if (action .equals (HANDSHAKE_ACTION_NAME ) && node .getAddress ().equals (otherNode .getAddress ())) {
88
+ handleResponse (requestId , new HandshakeResponse (otherNode , new ClusterName (clusterName ), Version .CURRENT ));
89
+ }
90
+ }
91
+ };
92
+ threadPool = new TestThreadPool ("test" , Settings .EMPTY );
93
+ transportService = transport .createTransportService (
94
+ Settings .builder ().put (CLUSTER_NAME_SETTING .getKey (), clusterName ).build (), threadPool ,
95
+ TransportService .NOOP_TRANSPORT_INTERCEPTOR , boundTransportAddress -> localNode , null , emptySet ());
79
96
97
+ final ClusterSettings clusterSettings = new ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS );
98
+ coordinator = new Coordinator ("local" , Settings .EMPTY , clusterSettings , transportService ,
99
+ ESAllocationTestCase .createAllocationService (Settings .EMPTY ),
100
+ new MasterService ("local" , Settings .EMPTY , threadPool ),
101
+ () -> new InMemoryPersistedState (0 , ClusterState .builder (new ClusterName (clusterName )).build ()), r -> emptyList (),
102
+ new NoOpClusterApplier (), new Random (random ().nextLong ()));
103
+ }
104
+
105
+ @ After
106
+ public void cleanUp () {
107
+ threadPool .shutdown ();
108
+ }
109
+
110
+ public void testHandlesNonstandardDiscoveryImplementation () throws InterruptedException {
80
111
final Discovery discovery = mock (Discovery .class );
81
112
verifyZeroInteractions (discovery );
82
113
@@ -86,7 +117,7 @@ public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedEx
86
117
transportService .acceptIncomingRequests ();
87
118
88
119
final CountDownLatch countDownLatch = new CountDownLatch (1 );
89
- transportService .sendRequest (discoveryNode , GetDiscoveredNodesAction .NAME , new GetDiscoveredNodesRequest (), new ResponseHandler () {
120
+ transportService .sendRequest (localNode , GetDiscoveredNodesAction .NAME , new GetDiscoveredNodesRequest (), new ResponseHandler () {
90
121
@ Override
91
122
public void handleResponse (GetDiscoveredNodesResponse response ) {
92
123
throw new AssertionError ("should not be called" );
@@ -100,34 +131,19 @@ public void handleException(TransportException exp) {
100
131
});
101
132
102
133
assertTrue (countDownLatch .await (10 , TimeUnit .SECONDS ));
103
- threadPool .shutdown ();
104
134
}
105
135
106
136
public void testFailsOnNonMasterEligibleNodes () throws InterruptedException {
107
- final DiscoveryNode discoveryNode
108
- = new DiscoveryNode ("local" , buildNewFakeTransportAddress (), emptyMap (), emptySet (), Version .CURRENT );
109
-
110
- final MockTransport transport = new MockTransport ();
111
- final ThreadPool threadPool = new TestThreadPool ("test" , Settings .EMPTY );
112
- final TransportService transportService = transport .createTransportService (Settings .EMPTY , threadPool ,
113
- TransportService .NOOP_TRANSPORT_INTERCEPTOR , boundTransportAddress -> discoveryNode , null , emptySet ());
114
-
115
- final ClusterSettings clusterSettings = new ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS );
116
- final ClusterState state = ClusterState .builder (new ClusterName ("cluster" )).build ();
117
- final Coordinator coordinator = new Coordinator ("local" , Settings .EMPTY , clusterSettings , transportService ,
118
- ESAllocationTestCase .createAllocationService (Settings .EMPTY ),
119
- new MasterService ("local" , Settings .EMPTY , threadPool ),
120
- () -> new InMemoryPersistedState (0 , state ), r -> emptyList (),
121
- new NoOpClusterApplier (), new Random (random ().nextLong ()));
137
+ localNode = new DiscoveryNode ("local" , buildNewFakeTransportAddress (), emptyMap (), emptySet (), Version .CURRENT );
138
+ // transport service only picks up local node when started, so we can change it here ^
122
139
123
140
new TransportGetDiscoveredNodesAction (Settings .EMPTY , EMPTY_FILTERS , transportService , coordinator ); // registers action
124
141
transportService .start ();
125
142
transportService .acceptIncomingRequests ();
126
143
coordinator .start ();
127
144
128
-
129
145
final CountDownLatch countDownLatch = new CountDownLatch (1 );
130
- transportService .sendRequest (discoveryNode , GetDiscoveredNodesAction .NAME , new GetDiscoveredNodesRequest (), new ResponseHandler () {
146
+ transportService .sendRequest (localNode , GetDiscoveredNodesAction .NAME , new GetDiscoveredNodesRequest (), new ResponseHandler () {
131
147
@ Override
132
148
public void handleResponse (GetDiscoveredNodesResponse response ) {
133
149
throw new AssertionError ("should not be called" );
@@ -141,26 +157,9 @@ public void handleException(TransportException exp) {
141
157
});
142
158
143
159
assertTrue (countDownLatch .await (10 , TimeUnit .SECONDS ));
144
- threadPool .shutdown ();
145
160
}
146
161
147
162
public void testFailsQuicklyWithZeroTimeout () throws InterruptedException {
148
- final DiscoveryNode localNode
149
- = new DiscoveryNode ("local" , buildNewFakeTransportAddress (), emptyMap (), singleton (Role .MASTER ), Version .CURRENT );
150
-
151
- final MockTransport transport = new MockTransport ();
152
- final ThreadPool threadPool = new TestThreadPool ("test" , Settings .EMPTY );
153
- final TransportService transportService = transport .createTransportService (Settings .EMPTY , threadPool ,
154
- TransportService .NOOP_TRANSPORT_INTERCEPTOR , boundTransportAddress -> localNode , null , emptySet ());
155
-
156
- final ClusterSettings clusterSettings = new ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS );
157
- final ClusterState state = ClusterState .builder (new ClusterName ("cluster" )).build ();
158
- final Coordinator coordinator = new Coordinator ("local" , Settings .EMPTY , clusterSettings , transportService ,
159
- ESAllocationTestCase .createAllocationService (Settings .EMPTY ),
160
- new MasterService ("local" , Settings .EMPTY , threadPool ),
161
- () -> new InMemoryPersistedState (0 , state ), r -> emptyList (),
162
- new NoOpClusterApplier (), new Random (random ().nextLong ()));
163
-
164
163
new TransportGetDiscoveredNodesAction (Settings .EMPTY , EMPTY_FILTERS , transportService , coordinator ); // registers action
165
164
transportService .start ();
166
165
transportService .acceptIncomingRequests ();
@@ -186,38 +185,9 @@ public void handleException(TransportException exp) {
186
185
});
187
186
188
187
assertTrue (countDownLatch .await (10 , TimeUnit .SECONDS ));
189
- threadPool .shutdown ();
190
188
}
191
189
192
- @ TestLogging ("org.elasticsearch.cluster.coordination:TRACE" )
193
190
public void testGetsDiscoveredNodes () throws InterruptedException {
194
- final DiscoveryNode localNode
195
- = new DiscoveryNode ("local" , buildNewFakeTransportAddress (), emptyMap (), singleton (Role .MASTER ), Version .CURRENT );
196
- final DiscoveryNode otherNode
197
- = new DiscoveryNode ("other" , buildNewFakeTransportAddress (), emptyMap (), singleton (Role .MASTER ), Version .CURRENT );
198
- final String clusterName = randomAlphaOfLength (10 );
199
-
200
- final MockTransport transport = new MockTransport () {
201
- @ Override
202
- protected void onSendRequest (long requestId , String action , TransportRequest request , DiscoveryNode node ) {
203
- if (action .equals (HANDSHAKE_ACTION_NAME ) && node .getAddress ().equals (otherNode .getAddress ())) {
204
- handleResponse (requestId , new HandshakeResponse (otherNode , new ClusterName (clusterName ), Version .CURRENT ));
205
- }
206
- }
207
- };
208
- final ThreadPool threadPool = new TestThreadPool ("test" , Settings .EMPTY );
209
- final TransportService transportService = transport .createTransportService (
210
- Settings .builder ().put (CLUSTER_NAME_SETTING .getKey (), clusterName ).build (), threadPool ,
211
- TransportService .NOOP_TRANSPORT_INTERCEPTOR , boundTransportAddress -> localNode , null , emptySet ());
212
-
213
- final ClusterSettings clusterSettings = new ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS );
214
- final ClusterState state = ClusterState .builder (new ClusterName (clusterName )).build ();
215
- final Coordinator coordinator = new Coordinator ("local" , Settings .EMPTY , clusterSettings , transportService ,
216
- ESAllocationTestCase .createAllocationService (Settings .EMPTY ),
217
- new MasterService ("local" , Settings .EMPTY , threadPool ),
218
- () -> new InMemoryPersistedState (0 , state ), r -> emptyList (),
219
- new NoOpClusterApplier (), new Random (random ().nextLong ()));
220
-
221
191
new TransportGetDiscoveredNodesAction (Settings .EMPTY , EMPTY_FILTERS , transportService , coordinator ); // registers action
222
192
transportService .start ();
223
193
transportService .acceptIncomingRequests ();
@@ -286,8 +256,6 @@ public void handleException(TransportException exp) {
286
256
287
257
assertTrue (countDownLatch .await (10 , TimeUnit .SECONDS ));
288
258
}
289
-
290
- threadPool .shutdown ();
291
259
}
292
260
293
261
private abstract class ResponseHandler implements TransportResponseHandler <GetDiscoveredNodesResponse > {
0 commit comments