Skip to content

Commit e15bedf

Browse files
committed
Allow multiple unicast host providers (#31509)
Introduces support for multiple host providers, which allows the settings based hosts resolver to be treated just as any other UnicastHostsProvider. Also introduces the notion of a HostsResolver so that plugins such as FileBasedDiscovery do not need to create their own thread pool for resolving hosts, making it easier to add new similar kind of plugins.
1 parent f9de429 commit e15bedf

File tree

20 files changed

+227
-186
lines changed

20 files changed

+227
-186
lines changed

plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public AzureUnicastHostsProvider(Settings settings, AzureComputeService azureCom
132132
* Setting `cloud.azure.refresh_interval` to `0` will disable caching (default).
133133
*/
134134
@Override
135-
public List<TransportAddress> buildDynamicHosts() {
135+
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
136136
if (refreshInterval.millis() != 0) {
137137
if (dynamicHosts != null &&
138138
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {

plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
9292
}
9393

9494
@Override
95-
public List<TransportAddress> buildDynamicHosts() {
95+
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
9696
return dynamicHosts.getOrRefresh();
9797
}
9898

plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int no
9393
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
9494
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
9595
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service);
96-
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
96+
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts(null);
9797
logger.debug("--> addresses found: {}", dynamicHosts);
9898
return dynamicHosts;
9999
} catch (IOException e) {
@@ -307,7 +307,7 @@ protected List<TransportAddress> fetchDynamicNodes() {
307307
}
308308
};
309309
for (int i=0; i<3; i++) {
310-
provider.buildDynamicHosts();
310+
provider.buildDynamicHosts(null);
311311
}
312312
assertThat(provider.fetchCount, is(3));
313313
}
@@ -324,12 +324,12 @@ protected List<TransportAddress> fetchDynamicNodes() {
324324
}
325325
};
326326
for (int i=0; i<3; i++) {
327-
provider.buildDynamicHosts();
327+
provider.buildDynamicHosts(null);
328328
}
329329
assertThat(provider.fetchCount, is(1));
330330
Thread.sleep(1_000L); // wait for cache to expire
331331
for (int i=0; i<3; i++) {
332-
provider.buildDynamicHosts();
332+
provider.buildDynamicHosts(null);
333333
}
334334
assertThat(provider.fetchCount, is(2));
335335
}

plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java

+1-49
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,17 @@
1919

2020
package org.elasticsearch.discovery.file;
2121

22-
import org.apache.logging.log4j.Logger;
23-
import org.elasticsearch.client.Client;
24-
import org.elasticsearch.cluster.service.ClusterService;
25-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
26-
import org.elasticsearch.common.logging.DeprecationLogger;
27-
import org.elasticsearch.common.logging.Loggers;
2822
import org.elasticsearch.common.network.NetworkService;
2923
import org.elasticsearch.common.settings.Settings;
30-
import org.elasticsearch.common.util.concurrent.EsExecutors;
31-
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3224
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
33-
import org.elasticsearch.discovery.zen.UnicastZenPing;
3425
import org.elasticsearch.env.Environment;
35-
import org.elasticsearch.env.NodeEnvironment;
36-
import org.elasticsearch.node.Node;
3726
import org.elasticsearch.plugins.DiscoveryPlugin;
3827
import org.elasticsearch.plugins.Plugin;
39-
import org.elasticsearch.script.ScriptService;
40-
import org.elasticsearch.threadpool.ThreadPool;
4128
import org.elasticsearch.transport.TransportService;
42-
import org.elasticsearch.watcher.ResourceWatcherService;
4329

44-
import java.io.IOException;
4530
import java.nio.file.Path;
46-
import java.util.Collection;
4731
import java.util.Collections;
4832
import java.util.Map;
49-
import java.util.concurrent.ExecutorService;
50-
import java.util.concurrent.ThreadFactory;
51-
import java.util.concurrent.TimeUnit;
5233
import java.util.function.Supplier;
5334

5435
/**
@@ -58,48 +39,19 @@
5839
*/
5940
public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
6041

61-
private static final Logger logger = Loggers.getLogger(FileBasedDiscoveryPlugin.class);
62-
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
63-
6442
private final Settings settings;
6543
private final Path configPath;
66-
private ExecutorService fileBasedDiscoveryExecutorService;
6744

6845
public FileBasedDiscoveryPlugin(Settings settings, Path configPath) {
6946
this.settings = settings;
7047
this.configPath = configPath;
7148
}
7249

73-
@Override
74-
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
75-
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
76-
NamedXContentRegistry xContentRegistry, Environment environment,
77-
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
78-
final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
79-
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]");
80-
fileBasedDiscoveryExecutorService = EsExecutors.newScaling(
81-
Node.NODE_NAME_SETTING.get(settings) + "/" + "file_based_discovery_resolve",
82-
0,
83-
concurrentConnects,
84-
60,
85-
TimeUnit.SECONDS,
86-
threadFactory,
87-
threadPool.getThreadContext());
88-
89-
return Collections.emptyList();
90-
}
91-
92-
@Override
93-
public void close() throws IOException {
94-
ThreadPool.terminate(fileBasedDiscoveryExecutorService, 0, TimeUnit.SECONDS);
95-
}
96-
9750
@Override
9851
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
9952
NetworkService networkService) {
10053
return Collections.singletonMap(
10154
"file",
102-
() -> new FileBasedUnicastHostsProvider(
103-
new Environment(settings, configPath), transportService, fileBasedDiscoveryExecutorService));
55+
() -> new FileBasedUnicastHostsProvider(new Environment(settings, configPath)));
10456
}
10557
}

plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java

+3-34
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,19 @@
2323
import org.apache.logging.log4j.util.Supplier;
2424
import org.elasticsearch.common.component.AbstractComponent;
2525
import org.elasticsearch.common.transport.TransportAddress;
26-
import org.elasticsearch.common.unit.TimeValue;
2726
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
2827
import org.elasticsearch.env.Environment;
29-
import org.elasticsearch.transport.TransportService;
3028

3129
import java.io.FileNotFoundException;
3230
import java.io.IOException;
3331
import java.nio.file.Files;
3432
import java.nio.file.NoSuchFileException;
3533
import java.nio.file.Path;
36-
import java.util.ArrayList;
3734
import java.util.Collections;
3835
import java.util.List;
39-
import java.util.concurrent.ExecutorService;
40-
import java.util.concurrent.atomic.AtomicLong;
4136
import java.util.stream.Collectors;
4237
import java.util.stream.Stream;
4338

44-
import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT;
45-
import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists;
46-
4739
/**
4840
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
4941
* from {@link #UNICAST_HOSTS_FILE}.
@@ -60,25 +52,15 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
6052

6153
static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";
6254

63-
private final TransportService transportService;
64-
private final ExecutorService executorService;
65-
6655
private final Path unicastHostsFilePath;
6756

68-
private final AtomicLong nodeIdGenerator = new AtomicLong(); // generates unique ids for the node
69-
70-
private final TimeValue resolveTimeout;
71-
72-
FileBasedUnicastHostsProvider(Environment environment, TransportService transportService, ExecutorService executorService) {
57+
FileBasedUnicastHostsProvider(Environment environment) {
7358
super(environment.settings());
74-
this.transportService = transportService;
75-
this.executorService = executorService;
7659
this.unicastHostsFilePath = environment.configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE);
77-
this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
7860
}
7961

8062
@Override
81-
public List<TransportAddress> buildDynamicHosts() {
63+
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
8264
List<String> hostsList;
8365
try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
8466
hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
@@ -93,21 +75,8 @@ public List<TransportAddress> buildDynamicHosts() {
9375
hostsList = Collections.emptyList();
9476
}
9577

96-
final List<TransportAddress> dynamicHosts = new ArrayList<>();
97-
try {
98-
dynamicHosts.addAll(resolveHostsLists(
99-
executorService,
100-
logger,
101-
hostsList,
102-
1,
103-
transportService,
104-
resolveTimeout));
105-
} catch (InterruptedException e) {
106-
throw new RuntimeException(e);
107-
}
108-
78+
final List<TransportAddress> dynamicHosts = hostsResolver.resolveHosts(hostsList, 1);
10979
logger.debug("[discovery-file] Using dynamic discovery nodes {}", dynamicHosts);
110-
11180
return dynamicHosts;
11281
}
11382

plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.elasticsearch.common.settings.Settings;
2525
import org.elasticsearch.common.transport.BoundTransportAddress;
2626
import org.elasticsearch.common.transport.TransportAddress;
27+
import org.elasticsearch.common.unit.TimeValue;
2728
import org.elasticsearch.common.util.BigArrays;
29+
import org.elasticsearch.discovery.zen.UnicastZenPing;
2830
import org.elasticsearch.env.Environment;
2931
import org.elasticsearch.env.TestEnvironment;
3032
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@@ -123,8 +125,10 @@ public void testUnicastHostsDoesNotExist() throws Exception {
123125
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
124126
.build();
125127
final Environment environment = TestEnvironment.newEnvironment(settings);
126-
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment, transportService, executorService);
127-
final List<TransportAddress> addresses = provider.buildDynamicHosts();
128+
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment);
129+
final List<TransportAddress> addresses = provider.buildDynamicHosts((hosts, limitPortCounts) ->
130+
UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
131+
TimeValue.timeValueSeconds(10)));
128132
assertEquals(0, addresses.size());
129133
}
130134

@@ -163,6 +167,8 @@ private List<TransportAddress> setupAndRunHostProvider(final List<String> hostEn
163167
}
164168

165169
return new FileBasedUnicastHostsProvider(
166-
new Environment(settings, configPath), transportService, executorService).buildDynamicHosts();
170+
new Environment(settings, configPath)).buildDynamicHosts((hosts, limitPortCounts) ->
171+
UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
172+
TimeValue.timeValueSeconds(10)));
167173
}
168174
}

plugins/discovery-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public GceUnicastHostsProvider(Settings settings, GceInstancesService gceInstanc
9393
* Information can be cached using `cloud.gce.refresh_interval` property if needed.
9494
*/
9595
@Override
96-
public List<TransportAddress> buildDynamicHosts() {
96+
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
9797
// We check that needed properties have been set
9898
if (this.project == null || this.project.isEmpty() || this.zones == null || this.zones.isEmpty()) {
9999
throw new IllegalArgumentException("one or more gce discovery settings are missing. " +

plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoveryTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ protected List<TransportAddress> buildDynamicNodes(GceInstancesServiceImpl gceIn
108108
GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService,
109109
transportService, new NetworkService(Collections.emptyList()));
110110

111-
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
111+
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts(null);
112112
logger.info("--> addresses found: {}", dynamicHosts);
113113
return dynamicHosts;
114114
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.elasticsearch.discovery.DiscoverySettings;
5858
import org.elasticsearch.discovery.zen.ElectMasterService;
5959
import org.elasticsearch.discovery.zen.FaultDetection;
60+
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
6061
import org.elasticsearch.discovery.zen.UnicastZenPing;
6162
import org.elasticsearch.discovery.zen.ZenDiscovery;
6263
import org.elasticsearch.env.Environment;
@@ -360,7 +361,7 @@ public void apply(Settings value, Settings current, Settings previous) {
360361
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
361362
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
362363
ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING,
363-
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
364+
SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
364365
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING,
365366
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,
366367
SearchService.DEFAULT_KEEPALIVE_SETTING,

server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

+33-14
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import org.elasticsearch.common.settings.Setting;
3232
import org.elasticsearch.common.settings.Setting.Property;
3333
import org.elasticsearch.common.settings.Settings;
34+
import org.elasticsearch.common.transport.TransportAddress;
3435
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
36+
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
3537
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
3638
import org.elasticsearch.discovery.zen.ZenDiscovery;
3739
import org.elasticsearch.plugins.DiscoveryPlugin;
@@ -42,13 +44,15 @@
4244
import java.util.Collection;
4345
import java.util.Collections;
4446
import java.util.HashMap;
47+
import java.util.HashSet;
4548
import java.util.List;
4649
import java.util.Map;
4750
import java.util.Objects;
48-
import java.util.Optional;
51+
import java.util.Set;
4952
import java.util.function.BiConsumer;
5053
import java.util.function.Function;
5154
import java.util.function.Supplier;
55+
import java.util.stream.Collectors;
5256

5357
/**
5458
* A module for loading classes for node discovery.
@@ -57,18 +61,18 @@ public class DiscoveryModule {
5761

5862
public static final Setting<String> DISCOVERY_TYPE_SETTING =
5963
new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope);
60-
public static final Setting<Optional<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
61-
new Setting<>("discovery.zen.hosts_provider", (String)null, Optional::ofNullable, Property.NodeScope);
64+
public static final Setting<List<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
65+
Setting.listSetting("discovery.zen.hosts_provider", Collections.emptyList(), Function.identity(), Property.NodeScope);
6266

6367
private final Discovery discovery;
6468

6569
public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,
6670
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
6771
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
6872
AllocationService allocationService) {
69-
final UnicastHostsProvider hostsProvider;
7073
final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
71-
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
74+
final Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
75+
hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService));
7276
for (DiscoveryPlugin plugin : plugins) {
7377
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
7478
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
@@ -80,17 +84,32 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
8084
joinValidators.add(joinValidator);
8185
}
8286
}
83-
Optional<String> hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
84-
if (hostsProviderName.isPresent()) {
85-
Supplier<UnicastHostsProvider> hostsProviderSupplier = hostProviders.get(hostsProviderName.get());
86-
if (hostsProviderSupplier == null) {
87-
throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName.get() + "]");
88-
}
89-
hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get());
90-
} else {
91-
hostsProvider = Collections::emptyList;
87+
List<String> hostsProviderNames = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
88+
// for bwc purposes, add settings provider even if not explicitly specified
89+
if (hostsProviderNames.contains("settings") == false) {
90+
List<String> extendedHostsProviderNames = new ArrayList<>();
91+
extendedHostsProviderNames.add("settings");
92+
extendedHostsProviderNames.addAll(hostsProviderNames);
93+
hostsProviderNames = extendedHostsProviderNames;
94+
}
95+
96+
final Set<String> missingProviderNames = new HashSet<>(hostsProviderNames);
97+
missingProviderNames.removeAll(hostProviders.keySet());
98+
if (missingProviderNames.isEmpty() == false) {
99+
throw new IllegalArgumentException("Unknown zen hosts providers " + missingProviderNames);
92100
}
93101

102+
List<UnicastHostsProvider> filteredHostsProviders = hostsProviderNames.stream()
103+
.map(hostProviders::get).map(Supplier::get).collect(Collectors.toList());
104+
105+
final UnicastHostsProvider hostsProvider = hostsResolver -> {
106+
final List<TransportAddress> addresses = new ArrayList<>();
107+
for (UnicastHostsProvider provider : filteredHostsProviders) {
108+
addresses.addAll(provider.buildDynamicHosts(hostsResolver));
109+
}
110+
return Collections.unmodifiableList(addresses);
111+
};
112+
94113
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
95114
discoveryTypes.put("zen",
96115
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,

0 commit comments

Comments
 (0)