Skip to content

Commit ce14d2e

Browse files
committed
Get test working
1 parent b7fc71e commit ce14d2e

File tree

1 file changed

+87
-41
lines changed

1 file changed

+87
-41
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/support/master/TransportMasterNodeActionIT.java

Lines changed: 87 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@
99
package org.elasticsearch.action.support.master;
1010

1111
import org.elasticsearch.ElasticsearchException;
12-
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
1314
import org.elasticsearch.cluster.ClusterState;
14-
import org.elasticsearch.cluster.ClusterStateApplier;
1515
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1616
import org.elasticsearch.cluster.coordination.PublicationTransportHandler;
1717
import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector;
18+
import org.elasticsearch.cluster.node.DiscoveryNode;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.util.CollectionUtils;
2021
import org.elasticsearch.plugins.Plugin;
@@ -25,8 +26,10 @@
2526

2627
import java.util.Collection;
2728
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.CyclicBarrier;
2830
import java.util.concurrent.atomic.AtomicBoolean;
2931

32+
import static org.hamcrest.Matchers.equalTo;
3033
import static org.hamcrest.Matchers.greaterThan;
3134

3235
public class TransportMasterNodeActionIT extends ESIntegTestCase {
@@ -36,39 +39,49 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
3639
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
3740
}
3841

39-
@TestLogging(reason = "wip", value = "org.elasticsearch.transport.TransportService.tracer:TRACE")
42+
@TestLogging(reason = "wip", value = "org.elasticsearch.action.support.master.TransportMasterNodeAction:DEBUG")
4043
public void testRoutingLoopProtection() {
4144

4245
final var newMaster = internalCluster().startMasterOnlyNode();
4346
final var enoughVotingMastersLatch = new CountDownLatch(1);
44-
final var electionWonLatch = new CountDownLatch(1);
45-
final var releaseMasterLatch = new CountDownLatch(1);
46-
47-
final ClusterStateApplier blockingApplier = event -> {
48-
if (3 <= event.state().coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size()) {
49-
enoughVotingMastersLatch.countDown();
50-
}
51-
52-
if (event.state().nodes().isLocalNodeElectedMaster()) {
53-
logger.info("--> new master elected as planned");
54-
electionWonLatch.countDown();
55-
safeAwait(releaseMasterLatch);
56-
logger.info("--> cluster state applications released on new master");
57-
}
58-
};
47+
final var newMasterReceivedReroutedMessageLatch = new CountDownLatch(1);
5948

6049
try {
61-
internalCluster().getInstance(ClusterService.class, newMaster).addStateApplier(blockingApplier);
62-
63-
// need at least 3 voting master nodes for failover
50+
/*
51+
* Ensure that we've got 5 voting nodes in the cluster, this means even if the original
52+
* master manages to accept its own failed state update before standing down, we can still
53+
* establish a quorum without its (or our own) join.
54+
*/
55+
internalCluster().getInstance(ClusterService.class, newMaster).addStateApplier(event -> {
56+
if (5 <= event.state().coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size()) {
57+
enoughVotingMastersLatch.countDown();
58+
}
59+
});
60+
internalCluster().startMasterOnlyNode();
61+
internalCluster().startMasterOnlyNode();
6462
internalCluster().startMasterOnlyNode();
6563
safeAwait(enoughVotingMastersLatch);
64+
long originalTerm = internalCluster().masterClient().admin().cluster().prepareState().get().getState().term();
65+
66+
// Configure a latch that will be released when the existing master knows of the new master's election
67+
final String originalMasterName = internalCluster().getMasterName();
68+
logger.info("Original master was {}, new master will be {}", originalMasterName, newMaster);
69+
final var previousMasterKnowsNewMasterIsElectedLatch = new CountDownLatch(1);
70+
internalCluster().getInstance(ClusterService.class, originalMasterName).addStateApplier(event -> {
71+
DiscoveryNode masterNode = event.state().nodes().getMasterNode();
72+
if (masterNode != null && masterNode.getName().equals(newMaster)) {
73+
previousMasterKnowsNewMasterIsElectedLatch.countDown();
74+
}
75+
});
6676

67-
final var reroutedMessageReceived = new AtomicBoolean(false);
6877
for (final var transportService : internalCluster().getInstances(TransportService.class)) {
6978
if (transportService.getLocalNode().getName().equals(newMaster)) {
7079
continue;
7180
}
81+
82+
/*
83+
* Disable every other nodes' ability to send pre-vote and publish requests
84+
*/
7285
final var mockTransportService = asInstanceOf(MockTransportService.class, transportService);
7386
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
7487
if (action.equals(StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME)
@@ -78,21 +91,53 @@ public void testRoutingLoopProtection() {
7891
connection.sendRequest(requestId, action, request, options);
7992
}
8093
});
81-
mockTransportService.addRequestHandlingBehavior(ClusterStateAction.NAME, (handler, request, channel, task) -> {
82-
// assertThat(asInstanceOf(MasterNodeRequest.class, request).masterTerm(), equalTo(originalTerm)) TODO
83-
assertTrue("rerouted message received exactly once", reroutedMessageReceived.compareAndSet(false, true));
84-
handler.messageReceived(request, channel, task);
85-
});
94+
95+
/*
96+
* Assert that no other node receives the re-routed message more than once, and only
97+
* from a node in the original term
98+
*/
99+
final var reroutedMessageReceived = new AtomicBoolean(false);
100+
mockTransportService.addRequestHandlingBehavior(
101+
TransportClusterHealthAction.TYPE.name(),
102+
(handler, request, channel, task) -> {
103+
assertThat(asInstanceOf(MasterNodeRequest.class, request).masterTerm(), equalTo(originalTerm));
104+
assertTrue("rerouted message received exactly once", reroutedMessageReceived.compareAndSet(false, true));
105+
handler.messageReceived(request, channel, task);
106+
}
107+
);
86108
}
87109

88-
final var doubleReroutedMessageLatch = new CountDownLatch(1);
110+
/*
111+
* Count down latch when the new master receives the re-routed message, ensure it only receives it once, and
112+
* only from a node in the newMaster term
113+
*/
89114
MockTransportService.getInstance(newMaster)
90-
.addRequestHandlingBehavior(ClusterStateAction.NAME, (handler, request, channel, task) -> {
91-
// assertThat(asInstanceOf(MasterNodeRequest.class, request).masterTerm(), greaterThan(originalTerm)) TODO
92-
assertThat(doubleReroutedMessageLatch.getCount(), greaterThan(0L));
93-
doubleReroutedMessageLatch.countDown();
115+
.addRequestHandlingBehavior(TransportClusterHealthAction.TYPE.name(), (handler, request, channel, task) -> {
116+
assertThat(asInstanceOf(MasterNodeRequest.class, request).masterTerm(), greaterThan(originalTerm));
117+
assertThat(newMasterReceivedReroutedMessageLatch.getCount(), greaterThan(0L));
118+
newMasterReceivedReroutedMessageLatch.countDown();
119+
handler.messageReceived(request, channel, task);
94120
});
95121

122+
/*
123+
* Block cluster state applier on newMaster to delay clearing of old master, and identifying self as
124+
* new master
125+
*/
126+
final var stateApplierBarrier = new CyclicBarrier(2);
127+
final var blockingStateApplier = new AtomicBoolean(true);
128+
internalCluster().getInstance(ClusterService.class, newMaster).getClusterApplierService().onNewClusterState("test", () -> {
129+
if (blockingStateApplier.get()) {
130+
// Meet to signify application is blocked
131+
safeAwait(stateApplierBarrier);
132+
// Wait for the signal to unblock
133+
safeAwait(stateApplierBarrier);
134+
}
135+
return null;
136+
}, ActionListener.noop());
137+
138+
// Wait until state application is blocked
139+
safeAwait(stateApplierBarrier);
140+
96141
// trigger a cluster state update, which fails, causing a master failover
97142
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
98143
.submitUnbatchedStateUpdateTask("no-op", new ClusterStateUpdateTask() {
@@ -107,24 +152,25 @@ public void onFailure(Exception e) {
107152
}
108153
});
109154

110-
safeAwait(electionWonLatch);
155+
// Wait until the old master has acknowledged the new master's election
156+
safeAwait(previousMasterKnowsNewMasterIsElectedLatch);
157+
logger.info("New master is elected");
111158

112159
// perform a TransportMasterNodeAction on the new master, which doesn't know it's the master yet
113-
final var stateFuture = client(newMaster).admin().cluster().prepareState().clear().execute();
160+
final var stateFuture = client(newMaster).admin().cluster().prepareHealth().execute();
114161

115-
// wait for the request to come back to the new master, which should now wait for its local term to advance
116-
safeAwait(doubleReroutedMessageLatch);
162+
// wait for the request to come back to the new master
163+
safeAwait(newMasterReceivedReroutedMessageLatch);
164+
165+
// Unblock state application on new master, allow it to know of its election win
166+
blockingStateApplier.set(false);
167+
safeAwait(stateApplierBarrier);
117168

118169
assertFalse(stateFuture.isDone());
119170

120-
releaseMasterLatch.countDown();
121171
safeGet(stateFuture);
122-
123172
} finally {
124173
enoughVotingMastersLatch.countDown();
125-
electionWonLatch.countDown();
126-
releaseMasterLatch.countDown();
127-
internalCluster().getInstance(ClusterService.class, newMaster).removeApplier(blockingApplier);
128174
for (final var transportService : internalCluster().getInstances(TransportService.class)) {
129175
asInstanceOf(MockTransportService.class, transportService).clearAllRules();
130176
}

0 commit comments

Comments
 (0)