Skip to content

Remove seeds depedency for remote cluster settings #52829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ public void testSkipUnavailableDependsOnSeeds() throws IOException {
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(),
containsString("missing required setting [cluster.remote.remote1.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
containsString("Cannot configure setting [cluster.remote.remote1.skip_unavailable] if remote cluster is " +
"not enabled."));
}

Map<String, Object> settingsMap = new HashMap<>();
Expand All @@ -264,8 +264,8 @@ public void testSkipUnavailableDependsOnSeeds() throws IOException {
ResponseException responseException = expectThrows(ResponseException.class,
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
assertThat(responseException.getMessage(), containsString("Cannot configure setting " +
"[cluster.remote.remote1.skip_unavailable] if remote cluster is not enabled."));
}

if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
ProxyConnectionStrategy.PROXY_ADDRESS,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.SERVER_NAME,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,12 @@ public static Setting<Boolean> boolSetting(String key, Setting<Boolean> fallback
return new Setting<>(key, fallbackSetting, b -> parseBoolean(b, key, isFiltered(properties)), properties);
}

public static Setting<Boolean> boolSetting(String key, Setting<Boolean> fallbackSetting, Validator<Boolean> validator,
Property... properties) {
return new Setting<>(new SimpleKey(key), fallbackSetting, fallbackSetting::getRaw, b -> parseBoolean(b, key,
isFiltered(properties)), validator, properties);
}

public static Setting<Boolean> boolSetting(String key, boolean defaultValue, Validator<Boolean> validator, Property... properties) {
return new Setting<>(key, Boolean.toString(defaultValue), b -> parseBoolean(b, key, isFiltered(properties)), validator, properties);
}
Expand Down Expand Up @@ -1629,6 +1635,12 @@ public static Setting<TimeValue> timeSetting(String key, Setting<TimeValue> fall
return new Setting<>(key, fallbackSetting, (s) -> TimeValue.parseTimeValue(s, key), properties);
}

public static Setting<TimeValue> timeSetting(String key, Setting<TimeValue> fallBackSetting, Validator<TimeValue> validator,
Property... properties) {
return new Setting<>(new SimpleKey(key), fallBackSetting, fallBackSetting::getRaw, (s) -> TimeValue.parseTimeValue(s, key),
validator, properties);
}

public static Setting<TimeValue> positiveTimeSetting(String key, TimeValue defaultValue, Property... properties) {
return timeSetting(key, defaultValue, TimeValue.timeValueMillis(0), properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
/**
* The remote address for the proxy. The connections will be opened to the configured address.
*/
public static final Setting.AffixSetting<String> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
public static final Setting.AffixSetting<String> PROXY_ADDRESS = Setting.affixKeySetting(
"cluster.remote.",
"proxy_address",
(ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY, s -> {
Expand Down Expand Up @@ -99,7 +99,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
transportService,
connectionManager,
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings),
PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings),
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
}

Expand Down Expand Up @@ -141,7 +141,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
}

static Stream<Setting.AffixSetting<?>> enablementSettings() {
return Stream.of(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES);
return Stream.of(ProxyConnectionStrategy.PROXY_ADDRESS);
}

static Writeable.Reader<RemoteConnectionInfo.ModeInfo> infoReader() {
Expand All @@ -155,7 +155,7 @@ protected boolean shouldOpenMoreConnections() {

@Override
protected boolean strategyMustBeRebuilt(Settings newSettings) {
String address = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
String address = PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return numOfSockets != maxNumConnections || configuredAddress.equals(address) == false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
ProxyConnectionStrategy.PROXY_ADDRESS,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.SERVER_NAME);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -120,8 +122,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
Setting.affixKeySetting(
"search.remote.",
"skip_unavailable",
key -> boolSetting(key, false, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope),
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
key -> boolSetting(key, false, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope));

public static final SettingUpgrader<Boolean> SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER = new SettingUpgrader<Boolean>() {

Expand All @@ -141,27 +142,27 @@ public String getKey(final String key) {
Setting.affixKeySetting(
"cluster.remote.",
"skip_unavailable",
key -> boolSetting(
(ns, key) -> boolSetting(
key,
// the default needs to be false when fallback is removed
"_na_".equals(key)
? SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(key)
: SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSetting(key.replaceAll("^cluster", "search")),
new RemoteConnectionEnabled<>(ns, key),
Setting.Property.Dynamic,
Setting.Property.NodeScope),
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
Setting.Property.NodeScope));

public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
"cluster.remote.",
"transport.ping_schedule",
key -> timeSetting(key, TransportSettings.PING_SCHEDULE, Setting.Property.Dynamic, Setting.Property.NodeScope),
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
(ns, key) -> timeSetting(key, TransportSettings.PING_SCHEDULE, new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic,
Setting.Property.NodeScope));

public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
"cluster.remote.",
"transport.compress",
key -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS, Setting.Property.Dynamic, Setting.Property.NodeScope),
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
(ns, key) -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS,
new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope));

private final TransportService transportService;
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
Expand Down Expand Up @@ -436,4 +437,38 @@ public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias)
Collection<RemoteClusterConnection> getConnections() {
return remoteClusters.values();
}

private static class RemoteConnectionEnabled<T> implements Setting.Validator<T> {

private final String clusterAlias;
private final String key;

private RemoteConnectionEnabled(String clusterAlias, String key) {
this.clusterAlias = clusterAlias;
this.key = key;
}

@Override
public void validate(T value) {
}

@Override
public void validate(T value, Map<Setting<?>, Object> settings, boolean isPresent) {
if (isPresent && RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings) == false) {
throw new IllegalArgumentException("Cannot configure setting [" + key + "] if remote cluster is not enabled.");
}
}

@Override
public Iterator<Setting<?>> settings() {
return Stream.concat(Stream.of(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias)),
settingsStream()).iterator();
}

private Stream<Setting<?>> settingsStream() {
return Arrays.stream(RemoteConnectionStrategy.ConnectionStrategy.values())
.flatMap(strategy -> strategy.getEnablementSettings().get())
.map(as -> as.getConcreteSettingForNamespace(clusterAlias));
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ public int getNumberOfChannels() {
return numberOfChannels;
}

public Supplier<Stream<Setting.AffixSetting<?>>> getEnablementSettings() {
return enablementSettings;
}

public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
return reader.get();
}
Expand Down Expand Up @@ -149,7 +153,7 @@ static RemoteConnectionStrategy buildStrategy(String clusterAlias, TransportServ

static Set<String> getRemoteClusters(Settings settings) {
final Stream<Setting.AffixSetting<?>> enablementSettings = Arrays.stream(ConnectionStrategy.values())
.flatMap(strategy -> strategy.enablementSettings.get());
.flatMap(strategy -> strategy.getEnablementSettings().get());
return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet());
}

Expand All @@ -159,7 +163,21 @@ public static boolean isConnectionEnabled(String clusterAlias, Settings settings
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
return seeds.isEmpty() == false;
} else {
String address = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings);
String address = ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
return Strings.isEmpty(address) == false;
}
}

@SuppressWarnings("unchecked")
public static boolean isConnectionEnabled(String clusterAlias, Map<Setting<?>, Object> settings) {
ConnectionStrategy mode = (ConnectionStrategy) settings.get(REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias));
if (mode.equals(ConnectionStrategy.SNIFF)) {
List<String> seeds = (List<String>) settings.get(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS
.getConcreteSettingForNamespace(clusterAlias));
return seeds.isEmpty() == false;
} else {
String address = (String) settings.get(ProxyConnectionStrategy.PROXY_ADDRESS
.getConcreteSettingForNamespace(clusterAlias));
return Strings.isEmpty(address) == false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange(

Setting<?> modeSetting = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> addressesSetting = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES
Setting<?> addressesSetting = ProxyConnectionStrategy.PROXY_ADDRESS
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> socketConnections = ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS
.getConcreteSettingForNamespace("cluster-alias");
Expand Down Expand Up @@ -320,7 +320,7 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange(

public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
new Tuple<>(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"),
new Tuple<>(ProxyConnectionStrategy.PROXY_ADDRESS, "192.168.0.1:8080"),
new Tuple<>(ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3"));

RemoteConnectionStrategy.ConnectionStrategy sniff = RemoteConnectionStrategy.ConnectionStrategy.SNIFF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ private Settings buildRandomSettings(String clusterAlias, List<String> addresses

private static Settings buildProxySettings(String clusterAlias, List<String> addresses) {
Settings.Builder builder = Settings.builder();
builder.put(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).getKey(),
builder.put(ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).getKey(),
addresses.get(0));
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "proxy");
return builder.build();
Expand Down
Loading