Skip to content

Commit 1ac03d9

Browse files
committed
Tests: Add TestZenDiscovery and replace uses of MockZenPing with it (elastic#21488)
This changes adds a test discovery (which internally uses the existing mock zenping by default). Having the mock the test framework selects be a discovery greatly simplifies discovery setup (no more weird callback to a Node method).
1 parent 0f20e45 commit 1ac03d9

File tree

23 files changed

+244
-177
lines changed

23 files changed

+244
-177
lines changed

core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,9 @@ public class DiscoveryModule {
5353
new Setting<>("discovery.zen.hosts_provider", (String)null, Optional::ofNullable, Property.NodeScope);
5454

5555
private final Discovery discovery;
56-
private final ZenPing zenPing;
5756

5857
public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService, NetworkService networkService,
59-
ClusterService clusterService, Function<UnicastHostsProvider, ZenPing> createZenPing,
60-
List<DiscoveryPlugin> plugins) {
58+
ClusterService clusterService, List<DiscoveryPlugin> plugins) {
6159
final UnicastHostsProvider hostsProvider;
6260

6361
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
@@ -79,14 +77,11 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
7977
hostsProvider = Collections::emptyList;
8078
}
8179

82-
zenPing = createZenPing.apply(hostsProvider);
83-
8480
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
85-
discoveryTypes.put("zen",
86-
() -> new ZenDiscovery(settings, threadPool, transportService, clusterService, clusterService.getClusterSettings(), zenPing));
81+
discoveryTypes.put("zen", () -> new ZenDiscovery(settings, threadPool, transportService, clusterService, hostsProvider));
8782
discoveryTypes.put("none", () -> new NoneDiscovery(settings, clusterService, clusterService.getClusterSettings()));
8883
for (DiscoveryPlugin plugin : plugins) {
89-
plugin.getDiscoveryTypes(threadPool, transportService, clusterService, zenPing).entrySet().forEach(entry -> {
84+
plugin.getDiscoveryTypes(threadPool, transportService, clusterService, hostsProvider).entrySet().forEach(entry -> {
9085
if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) {
9186
throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice");
9287
}
@@ -103,9 +98,4 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
10398
public Discovery getDiscovery() {
10499
return discovery;
105100
}
106-
107-
// TODO: remove this, it should be completely local to discovery, but service disruption tests want to mess with it
108-
public ZenPing getZenPing() {
109-
return zenPing;
110-
}
111101
}

core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
107107
private AllocationService allocationService;
108108
private final ClusterName clusterName;
109109
private final DiscoverySettings discoverySettings;
110-
private final ZenPing zenPing;
110+
protected final ZenPing zenPing; // protected to allow tests access
111111
private final MasterFaultDetection masterFD;
112112
private final NodesFaultDetection nodesFD;
113113
private final PublishClusterStateAction publishClusterState;
@@ -139,13 +139,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
139139
private volatile NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
140140

141141
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
142-
ClusterService clusterService, ClusterSettings clusterSettings, ZenPing zenPing) {
142+
ClusterService clusterService, UnicastHostsProvider hostsProvider) {
143143
super(settings);
144144
this.clusterService = clusterService;
145145
this.clusterName = clusterService.getClusterName();
146146
this.transportService = transportService;
147-
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
148-
this.zenPing = zenPing;
147+
this.discoverySettings = new DiscoverySettings(settings, clusterService.getClusterSettings());
148+
this.zenPing = newZenPing(settings, threadPool, transportService, hostsProvider);
149149
this.electMaster = new ElectMasterService(settings);
150150
this.pingTimeout = PING_TIMEOUT_SETTING.get(settings);
151151
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
@@ -160,12 +160,15 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
160160
logger.debug("using ping_timeout [{}], join.timeout [{}], master_election.ignore_non_master [{}]",
161161
this.pingTimeout, joinTimeout, masterElectionIgnoreNonMasters);
162162

163-
clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, this::handleMinimumMasterNodesChanged, (value) -> {
164-
final ClusterState clusterState = clusterService.state();
165-
int masterNodes = clusterState.nodes().getMasterNodes().size();
166-
if (value > masterNodes) {
167-
throw new IllegalArgumentException("cannot set " + ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey() + " to more than the current master nodes count [" + masterNodes + "]");
168-
}
163+
clusterService.getClusterSettings().addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
164+
this::handleMinimumMasterNodesChanged, (value) -> {
165+
final ClusterState clusterState = clusterService.state();
166+
int masterNodes = clusterState.nodes().getMasterNodes().size();
167+
if (value > masterNodes) {
168+
throw new IllegalArgumentException("cannot set "
169+
+ ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey() + " to more than the current" +
170+
" master nodes count [" + masterNodes + "]");
171+
}
169172
});
170173

171174
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, clusterService);
@@ -188,6 +191,12 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
188191
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
189192
}
190193

194+
// protected to allow overriding in tests
195+
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
196+
UnicastHostsProvider hostsProvider) {
197+
return new UnicastZenPing(settings, threadPool, transportService, hostsProvider);
198+
}
199+
191200
@Override
192201
public void setAllocationService(AllocationService allocationService) {
193202
this.allocationService = allocationService;

core/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -406,8 +406,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
406406
}
407407

408408
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService,
409-
networkService, clusterService, hostsProvider -> newZenPing(settings, threadPool, transportService, hostsProvider),
410-
pluginsService.filterPlugins(DiscoveryPlugin.class));
409+
networkService, clusterService, pluginsService.filterPlugins(DiscoveryPlugin.class));
411410
pluginsService.processModules(modules);
412411
modules.add(b -> {
413412
b.bind(IndicesQueriesRegistry.class).toInstance(searchModule.getQueryParserRegistry());
@@ -441,7 +440,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
441440
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings()));
442441
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
443442
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
444-
b.bind(ZenPing.class).toInstance(discoveryModule.getZenPing());
445443
{
446444
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
447445
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
@@ -880,12 +878,6 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc
880878
return customNameResolvers;
881879
}
882880

883-
/** Create a new ZenPing instance for use in zen discovery. */
884-
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
885-
UnicastHostsProvider hostsProvider) {
886-
return new UnicastZenPing(settings, threadPool, transportService, hostsProvider);
887-
}
888-
889881
/** Constructs an internal node used as a client into a cluster fronted by this tribe node. */
890882
protected Node newTribeClientNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
891883
return new Node(new Environment(settings), classpathPlugins);

core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ public interface DiscoveryPlugin {
5757
* @param threadPool Use to schedule ping actions
5858
* @param transportService Use to communicate with other nodes
5959
* @param clusterService Use to find current nodes in the cluster
60-
* @param zenPing Use to ping other nodes with zen unicast host list
60+
* @param hostsProvider Use to find configured hosts which should be pinged for initial discovery
6161
*/
6262
default Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
63-
ClusterService clusterService, ZenPing zenPing) {
63+
ClusterService clusterService, UnicastHostsProvider hostsProvider) {
6464
return Collections.emptyMap();
6565
}
6666

core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.discovery.zen.FaultDetection;
2828
import org.elasticsearch.plugins.Plugin;
2929
import org.elasticsearch.test.ESIntegTestCase;
30+
import org.elasticsearch.test.discovery.TestZenDiscovery;
3031
import org.elasticsearch.test.disruption.NetworkDisruption;
3132
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
3233
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
@@ -46,18 +47,19 @@
4647
@ESIntegTestCase.SuppressLocalMode
4748
public class IndexingMasterFailoverIT extends ESIntegTestCase {
4849

49-
@Override
50-
protected boolean addMockZenPings() {
51-
return false;
52-
}
53-
5450
@Override
5551
protected Collection<Class<? extends Plugin>> nodePlugins() {
5652
final HashSet<Class<? extends Plugin>> classes = new HashSet<>(super.nodePlugins());
5753
classes.add(MockTransportService.TestPlugin.class);
5854
return classes;
5955
}
6056

57+
@Override
58+
protected Settings nodeSettings(int nodeOrdinal) {
59+
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
60+
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
61+
}
62+
6163
/**
6264
* Indexing operations which entail mapping changes require a blocking request to the master node to update the mapping.
6365
* If the master node is being disrupted or if it cannot commit cluster state changes, it needs to retry within timeout limits.

core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
package org.elasticsearch.client.transport;
2121

22+
import java.io.IOException;
23+
import java.util.Collections;
24+
2225
import org.elasticsearch.Version;
2326
import org.elasticsearch.client.Client;
2427
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -32,13 +35,10 @@
3235
import org.elasticsearch.test.ESIntegTestCase;
3336
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
3437
import org.elasticsearch.test.ESIntegTestCase.Scope;
35-
import org.elasticsearch.test.discovery.MockZenPing;
38+
import org.elasticsearch.test.discovery.TestZenDiscovery;
3639
import org.elasticsearch.transport.MockTransportClient;
3740
import org.elasticsearch.transport.TransportService;
3841

39-
import java.io.IOException;
40-
import java.util.Collections;
41-
4242
import static org.hamcrest.Matchers.equalTo;
4343
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4444
import static org.hamcrest.Matchers.is;
@@ -65,7 +65,7 @@ public void testNodeVersionIsUpdated() throws IOException, NodeValidationExcepti
6565
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
6666
.put(Node.NODE_DATA_SETTING.getKey(), false)
6767
.put("cluster.name", "foobar")
68-
.build(), Collections.singletonList(MockZenPing.TestPlugin.class)).start()) {
68+
.build(), Collections.singletonList(TestZenDiscovery.TestPlugin.class)).start()) {
6969
TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress();
7070
client.addTransportAddress(transportAddress);
7171
// since we force transport clients there has to be one node started that we connect to.

core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.test.ESIntegTestCase;
3737
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
3838
import org.elasticsearch.test.ESIntegTestCase.Scope;
39+
import org.elasticsearch.test.discovery.TestZenDiscovery;
3940
import org.elasticsearch.test.disruption.NetworkDisruption;
4041
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDelay;
4142
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
@@ -75,8 +76,9 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
7576
}
7677

7778
@Override
78-
protected boolean addMockZenPings() {
79-
return false;
79+
protected Settings nodeSettings(int nodeOrdinal) {
80+
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
81+
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
8082
}
8183

8284
public void testSimpleMinimumMasterNodes() throws Exception {

core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.plugins.Plugin;
3535
import org.elasticsearch.test.ESIntegTestCase;
3636
import org.elasticsearch.test.InternalTestCluster;
37+
import org.elasticsearch.test.discovery.TestZenDiscovery;
3738
import org.elasticsearch.test.disruption.NetworkDisruption;
3839
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
3940
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
@@ -63,8 +64,9 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
6364
}
6465

6566
@Override
66-
protected boolean addMockZenPings() {
67-
return false;
67+
protected Settings nodeSettings(int nodeOrdinal) {
68+
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
69+
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
6870
}
6971

7072
private void createStaleReplicaScenario() throws Exception {

0 commit comments

Comments
 (0)