Skip to content

Send hostname in SNI header in simple remote mode #50247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
SimpleConnectionStrategy.INCLUDE_SERVER_NAME,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,10 @@ public static Setting<Boolean> boolSetting(String key, Setting<Boolean> fallback
return new Setting<>(key, fallbackSetting, b -> parseBoolean(b, key, isFiltered(properties)), properties);
}

public static Setting<Boolean> boolSetting(String key, boolean defaultValue, Validator<Boolean> validator, Property... properties) {
return new Setting<>(key, Boolean.toString(defaultValue), b -> parseBoolean(b, key, isFiltered(properties)), validator, properties);
}

public static Setting<Boolean> boolSetting(String key, Function<Settings, String> defaultValueFn, Property... properties) {
return new Setting<>(key, defaultValueFn, b -> parseBoolean(b, key, isFiltered(properties)), properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS);
SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
SimpleConnectionStrategy.INCLUDE_SERVER_NAME);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private static <T> Stream<String> getClusterAlias(Settings settings, Setting.Aff
return allConcreteSettings.map(affixSetting::getNamespace);
}

static InetSocketAddress parseSeedAddress(String remoteHost) {
static InetSocketAddress parseConfiguredAddress(String remoteHost) {
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
final String host = hostPort.v1();
assert hostPort.v2() != null : remoteHost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -40,6 +41,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -49,6 +51,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.common.settings.Setting.intSetting;

public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
Expand Down Expand Up @@ -76,6 +79,15 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
(ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE),
Setting.Property.Dynamic, Setting.Property.NodeScope));

/**
* Whether to include the hostname as a server_name attribute
*/
public static final Setting.AffixSetting<Boolean> INCLUDE_SERVER_NAME = Setting.affixKeySetting(
"cluster.remote.",
"simple.include_server_name",
(ns, key) -> boolSetting(key, false, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE),
Setting.Property.Dynamic, Setting.Property.NodeScope));

static final int CHANNELS_PER_CONNECTION = 1;

private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3;
Expand All @@ -84,6 +96,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
private final int maxNumConnections;
private final AtomicLong counter = new AtomicLong(0);
private final List<String> configuredAddresses;
private final boolean includeServerName;
private final List<Supplier<TransportAddress>> addresses;
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
private final ConnectionProfile profile;
Expand All @@ -96,21 +109,31 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
transportService,
connectionManager,
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings));
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings),
INCLUDE_SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
}

SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
configuredAddresses.stream().map(address ->
(Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()));
(Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()), false);
}

SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses) {
int maxNumConnections, List<String> configuredAddresses, boolean includeServerName) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
configuredAddresses.stream().map(address ->
(Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()), includeServerName);
}

SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses,
boolean includeServerName) {
super(clusterAlias, transportService, connectionManager);
this.maxNumConnections = maxNumConnections;
this.configuredAddresses = configuredAddresses;
this.includeServerName = includeServerName;
assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses";
this.addresses = addresses;
// TODO: Move into the ConnectionManager
Expand Down Expand Up @@ -207,7 +230,14 @@ public void onFailure(Exception e) {
for (int i = 0; i < remaining; ++i) {
TransportAddress address = nextAddress(resolved);
String id = clusterAlias + "#" + address;
DiscoveryNode node = new DiscoveryNode(id, address, Version.CURRENT.minimumCompatibilityVersion());
Map<String, String> attributes;
if (includeServerName) {
attributes = Collections.singletonMap("server_name", address.address().getHostString());
} else {
attributes = Collections.emptyMap();
}
DiscoveryNode node = new DiscoveryNode(id, address, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());

connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<>() {
@Override
Expand Down Expand Up @@ -243,7 +273,7 @@ private TransportAddress nextAddress(List<TransportAddress> resolvedAddresses) {
}

private static TransportAddress resolveAddress(String address) {
return new TransportAddress(parseSeedAddress(address));
return new TransportAddress(parseConfiguredAddress(address));
}

private boolean addressesChanged(final List<String> oldAddresses, final List<String> newAddresses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,11 @@ public String toString() {

private static DiscoveryNode resolveSeedNode(String clusterAlias, String address, String proxyAddress) {
if (proxyAddress == null || proxyAddress.isEmpty()) {
TransportAddress transportAddress = new TransportAddress(parseSeedAddress(address));
TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(address));
return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(), transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(parseSeedAddress(proxyAddress));
TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(proxyAddress));
String hostName = address.substring(0, indexOfPortSeparator(address));
return new DiscoveryNode("", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES,
Expand Down Expand Up @@ -460,7 +460,7 @@ private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, Discovery
return node;
} else {
// resolve proxy address lazy here
InetSocketAddress proxyInetAddress = parseSeedAddress(proxyAddress);
InetSocketAddress proxyInetAddress = parseConfiguredAddress(proxyAddress);
return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node
.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -291,7 +293,7 @@ public void testSimpleStrategyWillResolveAddressesEachConnect() throws Exception
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address), Collections.singletonList(addressSupplier))) {
numOfConnections, addresses(address), Collections.singletonList(addressSupplier), false)) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
Expand Down Expand Up @@ -387,6 +389,39 @@ public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
}
}

public void testServerNameAttributes() {
Settings bindSettings = Settings.builder().put(TransportSettings.BIND_HOST.getKey(), "localhost").build();
try (MockTransportService transport1 = startTransport("node1", Version.CURRENT, bindSettings)) {
TransportAddress address1 = transport1.boundAddress().publishAddress();

try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
localService.acceptIncomingRequests();

ArrayList<String> addresses = new ArrayList<>();
addresses.add("localhost:" + address1.getPort());

ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses, true)) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));

PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();

assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertTrue(strategy.assertNoRunningConnections());

DiscoveryNode discoveryNode = connectionManager.getAllConnectedNodes().stream().findFirst().get();
assertEquals("localhost", discoveryNode.getAttributes().get("server_name"));
}
}
}
}

private static List<String> addresses(final TransportAddress... addresses) {
return Arrays.stream(addresses).map(TransportAddress::toString).collect(Collectors.toList());
}
Expand Down