Skip to content

Commit 0a5a8a8

Browse files
committed
Register TransportBootstrapClusterAction with TransportService
1 parent 1b62782 commit 0a5a8a8

File tree

2 files changed

+83
-60
lines changed

2 files changed

+83
-60
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.ElasticsearchException;
2222
import org.elasticsearch.action.ActionListener;
2323
import org.elasticsearch.action.support.ActionFilters;
24+
import org.elasticsearch.action.support.HandledTransportAction;
2425
import org.elasticsearch.action.support.TransportAction;
2526
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2627
import org.elasticsearch.cluster.coordination.Coordinator;
@@ -32,7 +33,7 @@
3233
import org.elasticsearch.tasks.Task;
3334
import org.elasticsearch.transport.TransportService;
3435

35-
public class TransportBootstrapClusterAction extends TransportAction<BootstrapClusterRequest, AcknowledgedResponse> {
36+
public class TransportBootstrapClusterAction extends HandledTransportAction<BootstrapClusterRequest, AcknowledgedResponse> {
3637

3738
@Nullable // TODO make this not nullable
3839
private final Coordinator coordinator;
@@ -41,7 +42,7 @@ public class TransportBootstrapClusterAction extends TransportAction<BootstrapCl
4142
@Inject
4243
public TransportBootstrapClusterAction(Settings settings, ActionFilters actionFilters, TransportService transportService,
4344
Discovery discovery) {
44-
super(settings, BootstrapClusterAction.NAME, actionFilters, transportService.getTaskManager());
45+
super(settings, BootstrapClusterAction.NAME, transportService, actionFilters, BootstrapClusterRequest::new);
4546
this.transportService = transportService;
4647
if (discovery instanceof Coordinator) {
4748
coordinator = (Coordinator) discovery;

server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java

Lines changed: 80 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
*/
1919
package org.elasticsearch.action.admin.cluster.bootstrap;
2020

21-
import org.elasticsearch.ElasticsearchException;
2221
import org.elasticsearch.Version;
23-
import org.elasticsearch.action.ActionListener;
2422
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription;
2523
import org.elasticsearch.action.support.ActionFilters;
2624
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -33,17 +31,23 @@
3331
import org.elasticsearch.cluster.node.DiscoveryNode;
3432
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
3533
import org.elasticsearch.cluster.service.MasterService;
34+
import org.elasticsearch.common.io.stream.StreamInput;
3635
import org.elasticsearch.common.settings.ClusterSettings;
3736
import org.elasticsearch.common.settings.Settings;
3837
import org.elasticsearch.discovery.Discovery;
39-
import org.elasticsearch.tasks.Task;
4038
import org.elasticsearch.test.ESTestCase;
4139
import org.elasticsearch.test.transport.MockTransport;
4240
import org.elasticsearch.threadpool.TestThreadPool;
4341
import org.elasticsearch.threadpool.ThreadPool;
42+
import org.elasticsearch.threadpool.ThreadPool.Names;
43+
import org.elasticsearch.transport.TransportException;
44+
import org.elasticsearch.transport.TransportResponseHandler;
4445
import org.elasticsearch.transport.TransportService;
4546

46-
import java.util.concurrent.atomic.AtomicBoolean;
47+
import java.io.IOException;
48+
import java.util.Random;
49+
import java.util.concurrent.CountDownLatch;
50+
import java.util.concurrent.TimeUnit;
4751

4852
import static java.util.Collections.emptyList;
4953
import static java.util.Collections.emptyMap;
@@ -55,11 +59,14 @@
5559
import static org.mockito.Mockito.verifyZeroInteractions;
5660

5761
public class TransportBootstrapClusterActionTests extends ESTestCase {
62+
63+
private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet());
64+
5865
private static BootstrapClusterRequest exampleRequest() {
5966
return new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription("id", "name"))));
6067
}
6168

62-
public void testHandlesNonstandardDiscoveryImplementation() {
69+
public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException {
6370
final MockTransport transport = new MockTransport();
6471
final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY);
6572
final DiscoveryNode discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
@@ -69,133 +76,148 @@ public void testHandlesNonstandardDiscoveryImplementation() {
6976
final Discovery discovery = mock(Discovery.class);
7077
verifyZeroInteractions(discovery);
7178

72-
final TransportBootstrapClusterAction transportBootstrapClusterAction
73-
= new TransportBootstrapClusterAction(Settings.EMPTY, mock(ActionFilters.class), transportService, discovery);
79+
new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, discovery); // registers action
80+
transportService.start();
81+
transportService.acceptIncomingRequests();
7482

75-
final ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() {
83+
final CountDownLatch countDownLatch = new CountDownLatch(1);
84+
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() {
7685
@Override
77-
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
86+
public void handleResponse(AcknowledgedResponse response) {
7887
throw new AssertionError("should not be called");
7988
}
8089

8190
@Override
82-
public void onFailure(Exception e) {
83-
throw new AssertionError("should not be called");
91+
public void handleException(TransportException exp) {
92+
assertThat(exp.getRootCause().getMessage(), equalTo("cluster bootstrapping is not supported by this discovery type"));
93+
countDownLatch.countDown();
8494
}
85-
};
86-
87-
assertThat(expectThrows(IllegalStateException.class,
88-
() -> transportBootstrapClusterAction.doExecute(mock(Task.class), exampleRequest(), listener))
89-
.getMessage(), equalTo("cluster bootstrapping is not supported by this discovery type"));
95+
});
9096

97+
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
9198
threadPool.shutdown();
9299
}
93100

94-
public void testFailsOnNonMasterEligibleNodes() {
101+
public void testFailsOnNonMasterEligibleNodes() throws InterruptedException {
95102
final DiscoveryNode discoveryNode
96103
= new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
97104

98105
final MockTransport transport = new MockTransport();
99106
final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY);
100107
final TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool,
101108
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> discoveryNode, null, emptySet());
102-
transportService.start();
103-
transportService.acceptIncomingRequests();
104109

105110
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
106111
final ClusterState state = ClusterState.builder(new ClusterName("cluster")).build();
107112
final Coordinator coordinator = new Coordinator("local", Settings.EMPTY, clusterSettings, transportService,
108113
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
109114
new MasterService("local", Settings.EMPTY, threadPool),
110115
() -> new InMemoryPersistedState(0, state), r -> emptyList(),
111-
new NoOpClusterApplier(), random());
112-
coordinator.start();
116+
new NoOpClusterApplier(), new Random(random().nextLong()));
113117

114-
final TransportBootstrapClusterAction transportBootstrapClusterAction
115-
= new TransportBootstrapClusterAction(Settings.EMPTY, mock(ActionFilters.class), transportService, coordinator);
118+
new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
119+
transportService.start();
120+
transportService.acceptIncomingRequests();
121+
coordinator.start();
116122

117-
final ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() {
123+
final CountDownLatch countDownLatch = new CountDownLatch(1);
124+
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() {
118125
@Override
119-
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
126+
public void handleResponse(AcknowledgedResponse response) {
120127
throw new AssertionError("should not be called");
121128
}
122129

123130
@Override
124-
public void onFailure(Exception e) {
125-
throw new AssertionError("should not be called");
131+
public void handleException(TransportException exp) {
132+
assertThat(exp.getRootCause().getMessage(), equalTo("this node is not master-eligible"));
133+
countDownLatch.countDown();
126134
}
127-
};
128-
129-
assertThat(expectThrows(ElasticsearchException.class,
130-
() -> transportBootstrapClusterAction.doExecute(mock(Task.class), exampleRequest(), listener)).getMessage(),
131-
equalTo("this node is not master-eligible"));
135+
});
132136

137+
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
133138
threadPool.shutdown();
134139
}
135140

136-
public void testSetsInitialConfiguration() {
141+
public void testSetsInitialConfiguration() throws InterruptedException {
137142
final DiscoveryNode discoveryNode
138143
= new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT);
139144

140145
final MockTransport transport = new MockTransport();
141146
final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY);
142147
final TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool,
143148
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> discoveryNode, null, emptySet());
144-
transportService.start();
145-
transportService.acceptIncomingRequests();
146149

147150
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
148151
final ClusterState state = ClusterState.builder(new ClusterName("cluster")).build();
149152
final Coordinator coordinator = new Coordinator("local", Settings.EMPTY, clusterSettings, transportService,
150153
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
151154
new MasterService("local", Settings.EMPTY, threadPool),
152155
() -> new InMemoryPersistedState(0, state), r -> emptyList(),
153-
new NoOpClusterApplier(), random());
156+
new NoOpClusterApplier(), new Random(random().nextLong()));
157+
158+
new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
159+
transportService.start();
160+
transportService.acceptIncomingRequests();
154161
coordinator.start();
155162
coordinator.startInitialJoin();
156163

157-
final TransportBootstrapClusterAction transportBootstrapClusterAction
158-
= new TransportBootstrapClusterAction(Settings.EMPTY, mock(ActionFilters.class), transportService, coordinator);
159-
160-
final AtomicBoolean responseReceived = new AtomicBoolean();
161-
162164
assertFalse(coordinator.isInitialConfigurationSet());
163165

164166
final BootstrapClusterRequest request
165167
= new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription(discoveryNode))));
166168

167-
transportBootstrapClusterAction.doExecute(mock(Task.class), request,
168-
new ActionListener<AcknowledgedResponse>() {
169+
{
170+
final CountDownLatch countDownLatch = new CountDownLatch(1);
171+
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() {
169172
@Override
170-
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
171-
assertTrue(acknowledgedResponse.isAcknowledged());
172-
responseReceived.set(true);
173+
public void handleResponse(AcknowledgedResponse response) {
174+
assertTrue(response.isAcknowledged());
175+
countDownLatch.countDown();
173176
}
174177

175178
@Override
176-
public void onFailure(Exception e) {
177-
throw new AssertionError("should not be called");
179+
public void handleException(TransportException exp) {
180+
throw new AssertionError("should not be called", exp);
178181
}
179182
});
180-
assertTrue(responseReceived.get());
183+
184+
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
185+
}
186+
181187
assertTrue(coordinator.isInitialConfigurationSet());
182188

183-
responseReceived.set(false);
184-
transportBootstrapClusterAction.doExecute(mock(Task.class), request,
185-
new ActionListener<AcknowledgedResponse>() {
189+
{
190+
final CountDownLatch countDownLatch = new CountDownLatch(1);
191+
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() {
186192
@Override
187-
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
188-
assertFalse(acknowledgedResponse.isAcknowledged());
189-
responseReceived.set(true);
193+
public void handleResponse(AcknowledgedResponse response) {
194+
assertFalse(response.isAcknowledged());
195+
countDownLatch.countDown();
190196
}
191197

192198
@Override
193-
public void onFailure(Exception e) {
194-
throw new AssertionError("should not be called");
199+
public void handleException(TransportException exp) {
200+
throw new AssertionError("should not be called", exp);
195201
}
196202
});
197-
assertTrue(responseReceived.get());
203+
204+
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
205+
}
198206

199207
threadPool.shutdown();
200208
}
209+
210+
private abstract class ResponseHandler implements TransportResponseHandler<AcknowledgedResponse> {
211+
@Override
212+
public String executor() {
213+
return Names.SAME;
214+
}
215+
216+
@Override
217+
public AcknowledgedResponse read(StreamInput in) throws IOException {
218+
AcknowledgedResponse acknowledgedResponse = new AcknowledgedResponse();
219+
acknowledgedResponse.readFrom(in);
220+
return acknowledgedResponse;
221+
}
222+
}
201223
}

0 commit comments

Comments
 (0)