|
19 | 19 |
|
20 | 20 | package org.elasticsearch.discovery.zen;
|
21 | 21 |
|
22 |
| -import org.elasticsearch.action.ActionListener; |
23 |
| -import org.elasticsearch.core.internal.io.IOUtils; |
24 | 22 | import org.elasticsearch.Version;
|
| 23 | +import org.elasticsearch.action.ActionListener; |
25 | 24 | import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
26 | 25 | import org.elasticsearch.cluster.ClusterChangedEvent;
|
27 | 26 | import org.elasticsearch.cluster.ClusterModule;
|
|
44 | 43 | import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
45 | 44 | import org.elasticsearch.common.settings.ClusterSettings;
|
46 | 45 | import org.elasticsearch.common.settings.Settings;
|
| 46 | +import org.elasticsearch.core.internal.io.IOUtils; |
47 | 47 | import org.elasticsearch.discovery.Discovery;
|
48 | 48 | import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
|
49 | 49 | import org.elasticsearch.index.shard.ShardId;
|
|
85 | 85 | import static org.elasticsearch.cluster.service.MasterServiceTests.discoveryState;
|
86 | 86 | import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
|
87 | 87 | import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
|
88 |
| -import static org.elasticsearch.test.ClusterServiceUtils.setState; |
89 |
| -import static org.hamcrest.Matchers.arrayWithSize; |
90 | 88 | import static org.hamcrest.Matchers.containsString;
|
91 | 89 | import static org.hamcrest.Matchers.emptyArray;
|
92 | 90 | import static org.hamcrest.Matchers.equalTo;
|
93 | 91 | import static org.hamcrest.Matchers.hasToString;
|
| 92 | +import static org.hamcrest.Matchers.is; |
| 93 | +import static org.hamcrest.Matchers.notNullValue; |
94 | 94 |
|
95 | 95 | public class ZenDiscoveryUnitTests extends ESTestCase {
|
96 | 96 |
|
@@ -228,16 +228,19 @@ public void testNodesUpdatedAfterClusterStatePublished() throws Exception {
|
228 | 228 | DiscoveryNodes.builder(state.nodes()).add(otherNode).masterNodeId(masterNode.getId())
|
229 | 229 | ).build();
|
230 | 230 |
|
231 |
| - try { |
232 |
| - // publishing a new cluster state |
233 |
| - ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); |
234 |
| - AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); |
235 |
| - expectedFDNodes = masterZen.getFaultDetectionNodes(); |
236 |
| - masterZen.publish(clusterChangedEvent, getNoopPublishListener(), listener); |
| 231 | + // publishing a new cluster state |
| 232 | + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); |
| 233 | + AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); |
| 234 | + expectedFDNodes = masterZen.getFaultDetectionNodes(); |
| 235 | + AwaitingPublishListener awaitingPublishListener = new AwaitingPublishListener(); |
| 236 | + masterZen.publish(clusterChangedEvent, awaitingPublishListener, listener); |
| 237 | + awaitingPublishListener.await(); |
| 238 | + if (awaitingPublishListener.getException() == null) { |
| 239 | + // publication succeeded, wait for acks |
237 | 240 | listener.await(10, TimeUnit.SECONDS);
|
238 | 241 | // publish was a success, update expected FD nodes based on new cluster state
|
239 | 242 | expectedFDNodes = fdNodesForState(newState, masterNode);
|
240 |
| - } catch (Discovery.FailedToCommitClusterStateException e) { |
| 243 | + } else { |
241 | 244 | // not successful, so expectedFDNodes above should remain what it was originally assigned
|
242 | 245 | assertEquals(3, minMasterNodes); // ensure min master nodes is the higher value, otherwise we shouldn't fail
|
243 | 246 | }
|
@@ -281,35 +284,48 @@ public void testPendingCSQueueIsClearedWhenClusterStatePublished() throws Except
|
281 | 284 | DiscoveryNodes.builder(discoveryState(masterMasterService).nodes()).masterNodeId(masterNode.getId())
|
282 | 285 | ).build();
|
283 | 286 |
|
284 |
| - |
285 |
| - try { |
286 |
| - // publishing a new cluster state |
287 |
| - ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); |
288 |
| - AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); |
289 |
| - masterZen.publish(clusterChangedEvent, getNoopPublishListener(), listener); |
| 287 | + // publishing a new cluster state |
| 288 | + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); |
| 289 | + AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); |
| 290 | + AwaitingPublishListener awaitingPublishListener = new AwaitingPublishListener(); |
| 291 | + masterZen.publish(clusterChangedEvent, awaitingPublishListener, listener); |
| 292 | + awaitingPublishListener.await(); |
| 293 | + if (awaitingPublishListener.getException() == null) { |
| 294 | + // publication succeeded, wait for acks |
290 | 295 | listener.await(1, TimeUnit.HOURS);
|
291 |
| - // publish was a success, check that queue as cleared |
292 |
| - assertThat(masterZen.pendingClusterStates(), emptyArray()); |
293 |
| - } catch (Discovery.FailedToCommitClusterStateException e) { |
294 |
| - // not successful, so the pending queue should be cleaned |
295 |
| - assertThat(Arrays.toString(masterZen.pendingClusterStates()), masterZen.pendingClusterStates(), arrayWithSize(0)); |
296 | 296 | }
|
| 297 | + // queue should be cleared whether successful or not |
| 298 | + assertThat(Arrays.toString(masterZen.pendingClusterStates()), masterZen.pendingClusterStates(), emptyArray()); |
297 | 299 | } finally {
|
298 | 300 | IOUtils.close(toClose);
|
299 | 301 | terminate(threadPool);
|
300 | 302 | }
|
301 | 303 | }
|
302 | 304 |
|
303 |
| - private ActionListener<Void> getNoopPublishListener() { |
304 |
| - return new ActionListener<Void>() { |
305 |
| - @Override |
306 |
| - public void onResponse(Void aVoid) { |
307 |
| - } |
| 305 | + private class AwaitingPublishListener implements ActionListener<Void> { |
| 306 | + private final CountDownLatch countDownLatch = new CountDownLatch(1); |
| 307 | + private Discovery.FailedToCommitClusterStateException exception; |
308 | 308 |
|
309 |
| - @Override |
310 |
| - public void onFailure(Exception e) { |
311 |
| - } |
312 |
| - }; |
| 309 | + @Override |
| 310 | + public void onResponse(Void aVoid) { |
| 311 | + assertThat(countDownLatch.getCount(), is(1L)); |
| 312 | + countDownLatch.countDown(); |
| 313 | + } |
| 314 | + |
| 315 | + @Override |
| 316 | + public void onFailure(Exception e) { |
| 317 | + assertThat(e, notNullValue()); |
| 318 | + exception = (Discovery.FailedToCommitClusterStateException) e; // fail on other exception types |
| 319 | + onResponse(null); |
| 320 | + } |
| 321 | + |
| 322 | + public void await() throws InterruptedException { |
| 323 | + countDownLatch.await(); |
| 324 | + } |
| 325 | + |
| 326 | + public Discovery.FailedToCommitClusterStateException getException() { |
| 327 | + return exception; |
| 328 | + } |
313 | 329 | }
|
314 | 330 |
|
315 | 331 | private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, MasterService masterService,
|
|
0 commit comments