Skip to content

Allow simple connection strategy to be configured #49066

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void setupRemoteClusterConfig() throws Exception {
String transportAddress = (String) nodesResponse.get("transport_address");

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local_cluster.seeds", transportAddress));
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local_cluster.sniff.seeds", transportAddress));
ClusterUpdateSettingsResponse updateSettingsResponse =
highLevelClient().cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
assertThat(updateSettingsResponse.isAcknowledged(), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void setupRemoteClusterConfig() throws IOException {
String transportAddress = (String) nodesResponse.get("transport_address");

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.seeds", transportAddress));
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.sniff.seeds", transportAddress));
ClusterUpdateSettingsResponse updateSettingsResponse =
client.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
assertThat(updateSettingsResponse.isAcknowledged(), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testSearchSkipUnavailable() throws IOException {
try (MockTransportService remoteTransport = startTransport("node0", new CopyOnWriteArrayList<>(), Version.CURRENT, threadPool)) {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();

updateRemoteClusterSettings(Collections.singletonMap("seeds", remoteNode.getAddress().toString()));
updateRemoteClusterSettings(Collections.singletonMap("sniff.seeds", remoteNode.getAddress().toString()));

for (int i = 0; i < 10; i++) {
restHighLevelClient.index(
Expand Down Expand Up @@ -229,7 +229,7 @@ public void testSearchSkipUnavailable() throws IOException {
assertSearchConnectFailure();

Map<String, Object> map = new HashMap<>();
map.put("seeds", null);
map.put("sniff.seeds", null);
map.put("skip_unavailable", null);
updateRemoteClusterSettings(map);
}
Expand All @@ -248,32 +248,32 @@ public void testSkipUnavailableDependsOnSeeds() throws IOException {
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(),
containsString("missing required setting [cluster.remote.remote1.seeds] " +
containsString("missing required setting [cluster.remote.remote1.sniff.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
}

Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put("seeds", remoteNode.getAddress().toString());
settingsMap.put("sniff.seeds", remoteNode.getAddress().toString());
settingsMap.put("skip_unavailable", randomBoolean());
updateRemoteClusterSettings(settingsMap);

{
//check that seeds cannot be reset alone if skip_unavailable is set
Request request = new Request("PUT", "/_cluster/settings");
request.setEntity(buildUpdateSettingsRequestBody(Collections.singletonMap("seeds", null)));
request.setEntity(buildUpdateSettingsRequestBody(Collections.singletonMap("sniff.seeds", null)));
ResponseException responseException = expectThrows(ResponseException.class,
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.seeds] " +
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.sniff.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
}

if (randomBoolean()) {
updateRemoteClusterSettings(Collections.singletonMap("skip_unavailable", null));
updateRemoteClusterSettings(Collections.singletonMap("seeds", null));
updateRemoteClusterSettings(Collections.singletonMap("sniff.seeds", null));
} else {
Map<String, Object> nullMap = new HashMap<>();
nullMap.put("seeds", null);
nullMap.put("sniff.seeds", null);
nullMap.put("skip_unavailable", null);
updateRemoteClusterSettings(nullMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.SniffConnectionStrategy;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -280,15 +280,17 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.REMOTE_NODE_ATTRIBUTE,
RemoteClusterService.ENABLE_REMOTE_CLUSTERS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING,
NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,77 +19,26 @@

package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Base class for all services and components that need up-to-date information about the registered remote clusters
*/
public abstract class RemoteClusterAware {

/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting(
"cluster.remote.",
"seeds",
key -> Setting.listSetting(
key,
Collections.emptyList(),
s -> {
// validate seed address
parsePort(s);
return s;
},
Setting.Property.Dynamic,
Setting.Property.NodeScope));

public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";

/**
* A proxy address for the remote cluster. By default this is not set, meaning that Elasticsearch will connect directly to the nodes in
* the remote cluster using their publish addresses. If this setting is set to an IP address or hostname then Elasticsearch will connect
* to the nodes in the remote cluster using this address instead. Use of this setting is not recommended and it is deliberately
* undocumented as it does not work well with all proxies.
*/
public static final Setting.AffixSetting<String> REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"cluster.remote.",
"proxy",
key -> Setting.simpleString(
key,
s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);

protected final Settings settings;
private final ClusterNameExpressionResolver clusterNameResolver;

Expand All @@ -106,57 +55,7 @@ protected RemoteClusterAware(Settings settings) {
* Returns remote clusters that are enabled in these settings
*/
protected static Set<String> getEnabledRemoteClusters(final Settings settings) {
final Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings
.map(REMOTE_CLUSTERS_SEEDS::getNamespace)
.filter(clusterAlias -> RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings))
.collect(Collectors.toSet());
}

/**
* Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple
* (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
*/
protected static Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> buildRemoteClustersDynamicConfig(
final Settings settings) {
final Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> remoteSeeds =
buildRemoteClustersDynamicConfig(settings, REMOTE_CLUSTERS_SEEDS);
return remoteSeeds.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private static Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> buildRemoteClustersDynamicConfig(
final Settings settings, final Setting.AffixSetting<List<String>> seedsSetting) {
final Stream<Setting<List<String>>> allConcreteSettings = seedsSetting.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(seedsSetting::getNamespace, concreteSetting -> {
String clusterName = seedsSetting.getNamespace(concreteSetting);
List<String> addresses = concreteSetting.get(settings);
final boolean proxyMode =
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).existsOrFallbackExists(settings);
List<Tuple<String, Supplier<DiscoveryNode>>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(Tuple.tuple(address, () -> buildSeedNode(clusterName, address, proxyMode)));
}
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
}));
}

static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) {
if (proxyMode) {
TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0);
String hostName = address.substring(0, indexOfPortSeparator(address));
return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
}
return RemoteConnectionStrategy.getRemoteClusters(settings);
}

/**
Expand Down Expand Up @@ -203,52 +102,12 @@ void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) {
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE);
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
}

static InetSocketAddress parseSeedAddress(String remoteHost) {
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
final String host = hostPort.v1();
assert hostPort.v2() != null : remoteHost;
final int port = hostPort.v2();
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
return new InetSocketAddress(hostAddress, port);
}

public static Tuple<String, Integer> parseHostPort(final String remoteHost) {
final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
final int port = parsePort(remoteHost);
return Tuple.tuple(host, port);
}

private static int parsePort(String remoteHost) {
try {
int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return port;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("failed to parse port", e);
}
}

private static int indexOfPortSeparator(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
return portSeparator;
}

public static String buildRemoteIndexName(String clusterAlias, String indexName) {
return clusterAlias == null || LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)
? indexName : clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ final class RemoteClusterConnection implements Closeable {
*/
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) {
this(settings, clusterAlias, transportService,
createConnectionManager(buildConnectionProfileFromSettings(settings, clusterAlias), transportService));
createConnectionManager(RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings), transportService));
}

RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService,
ConnectionManager connectionManager) {
this.transportService = transportService;
this.clusterAlias = clusterAlias;
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
Expand Down Expand Up @@ -241,18 +241,4 @@ ConnectionManager getConnectionManager() {
public boolean shouldRebuildConnection(Settings newSettings) {
return connectionStrategy.shouldRebuildConnection(newSettings);
}

static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) {
return new ConnectionProfile.Builder()
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable?
// we don't want this to be used for anything else but search
.addConnections(0, TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY)
.setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings))
.setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings))
.build();
}
}
Loading