Skip to content

Commit 94f5acc

Browse files
authored
Remove seeds depedency for remote cluster settings (#52796)
Currently 3 remote cluster settings (ping interval, skip unavailable, and compression) have a dependency on the seeds setting being comfigured. With proxy mode, it is now possible that these settings the seeds setting has not been configured. This commit removes this dependency and adds new validation for these settings.
1 parent 72e0ff8 commit 94f5acc

File tree

11 files changed

+146
-45
lines changed

11 files changed

+146
-45
lines changed

qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,8 @@ public void testSkipUnavailableDependsOnSeeds() throws IOException {
248248
() -> client().performRequest(request));
249249
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
250250
assertThat(responseException.getMessage(),
251-
containsString("missing required setting [cluster.remote.remote1.seeds] " +
252-
"for setting [cluster.remote.remote1.skip_unavailable]"));
251+
containsString("Cannot configure setting [cluster.remote.remote1.skip_unavailable] if remote cluster is " +
252+
"not enabled."));
253253
}
254254

255255
Map<String, Object> settingsMap = new HashMap<>();
@@ -264,8 +264,8 @@ public void testSkipUnavailableDependsOnSeeds() throws IOException {
264264
ResponseException responseException = expectThrows(ResponseException.class,
265265
() -> client().performRequest(request));
266266
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
267-
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.seeds] " +
268-
"for setting [cluster.remote.remote1.skip_unavailable]"));
267+
assertThat(responseException.getMessage(), containsString("Cannot configure setting " +
268+
"[cluster.remote.remote1.skip_unavailable] if remote cluster is not enabled."));
269269
}
270270

271271
if (randomBoolean()) {

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ public void apply(Settings value, Settings current, Settings previous) {
298298
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
299299
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
300300
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
301-
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
301+
ProxyConnectionStrategy.PROXY_ADDRESS,
302302
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
303303
ProxyConnectionStrategy.SERVER_NAME,
304304
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,

server/src/main/java/org/elasticsearch/common/settings/Setting.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,6 +1257,12 @@ public static Setting<Boolean> boolSetting(String key, Setting<Boolean> fallback
12571257
return new Setting<>(key, fallbackSetting, b -> parseBoolean(b, key, isFiltered(properties)), properties);
12581258
}
12591259

1260+
public static Setting<Boolean> boolSetting(String key, Setting<Boolean> fallbackSetting, Validator<Boolean> validator,
1261+
Property... properties) {
1262+
return new Setting<>(new SimpleKey(key), fallbackSetting, fallbackSetting::getRaw, b -> parseBoolean(b, key,
1263+
isFiltered(properties)), validator, properties);
1264+
}
1265+
12601266
public static Setting<Boolean> boolSetting(String key, boolean defaultValue, Validator<Boolean> validator, Property... properties) {
12611267
return new Setting<>(key, Boolean.toString(defaultValue), b -> parseBoolean(b, key, isFiltered(properties)), validator, properties);
12621268
}
@@ -1626,6 +1632,12 @@ public static Setting<TimeValue> timeSetting(String key, Setting<TimeValue> fall
16261632
return new Setting<>(key, fallbackSetting, (s) -> TimeValue.parseTimeValue(s, key), properties);
16271633
}
16281634

1635+
public static Setting<TimeValue> timeSetting(String key, Setting<TimeValue> fallBackSetting, Validator<TimeValue> validator,
1636+
Property... properties) {
1637+
return new Setting<>(new SimpleKey(key), fallBackSetting, fallBackSetting::getRaw, (s) -> TimeValue.parseTimeValue(s, key),
1638+
validator, properties);
1639+
}
1640+
16291641
public static Setting<TimeValue> positiveTimeSetting(String key, TimeValue defaultValue, Property... properties) {
16301642
return timeSetting(key, defaultValue, TimeValue.timeValueMillis(0), properties);
16311643
}

server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
5353
/**
5454
* The remote address for the proxy. The connections will be opened to the configured address.
5555
*/
56-
public static final Setting.AffixSetting<String> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
56+
public static final Setting.AffixSetting<String> PROXY_ADDRESS = Setting.affixKeySetting(
5757
"cluster.remote.",
5858
"proxy_address",
5959
(ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY, s -> {
@@ -99,7 +99,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
9999
transportService,
100100
connectionManager,
101101
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
102-
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings),
102+
PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings),
103103
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
104104
}
105105

@@ -141,7 +141,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
141141
}
142142

143143
static Stream<Setting.AffixSetting<?>> enablementSettings() {
144-
return Stream.of(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES);
144+
return Stream.of(ProxyConnectionStrategy.PROXY_ADDRESS);
145145
}
146146

147147
static Writeable.Reader<RemoteConnectionInfo.ModeInfo> infoReader() {
@@ -155,7 +155,7 @@ protected boolean shouldOpenMoreConnections() {
155155

156156
@Override
157157
protected boolean strategyMustBeRebuilt(Settings newSettings) {
158-
String address = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
158+
String address = PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
159159
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
160160
return numOfSockets != maxNumConnections || configuredAddress.equals(address) == false;
161161
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
109109
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
110110
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
111111
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
112-
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
112+
ProxyConnectionStrategy.PROXY_ADDRESS,
113113
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
114114
ProxyConnectionStrategy.SERVER_NAME);
115115
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@
4040

4141
import java.io.Closeable;
4242
import java.io.IOException;
43+
import java.util.Arrays;
4344
import java.util.Collection;
4445
import java.util.HashMap;
46+
import java.util.Iterator;
4547
import java.util.List;
4648
import java.util.Map;
4749
import java.util.Set;
@@ -95,24 +97,24 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
9597
Setting.affixKeySetting(
9698
"cluster.remote.",
9799
"skip_unavailable",
98-
key -> boolSetting(
100+
(ns, key) -> boolSetting(
99101
key,
100102
false,
103+
new RemoteConnectionEnabled<>(ns, key),
101104
Setting.Property.Dynamic,
102-
Setting.Property.NodeScope),
103-
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
105+
Setting.Property.NodeScope));
104106

105107
public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
106108
"cluster.remote.",
107109
"transport.ping_schedule",
108-
key -> timeSetting(key, TransportSettings.PING_SCHEDULE, Setting.Property.Dynamic, Setting.Property.NodeScope),
109-
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
110+
(ns, key) -> timeSetting(key, TransportSettings.PING_SCHEDULE, new RemoteConnectionEnabled<>(ns, key),
111+
Setting.Property.Dynamic, Setting.Property.NodeScope));
110112

111113
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
112114
"cluster.remote.",
113115
"transport.compress",
114-
key -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS, Setting.Property.Dynamic, Setting.Property.NodeScope),
115-
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
116+
(ns, key) -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS,
117+
new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope));
116118

117119
private final TransportService transportService;
118120
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
@@ -386,4 +388,38 @@ public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias)
386388
Collection<RemoteClusterConnection> getConnections() {
387389
return remoteClusters.values();
388390
}
391+
392+
private static class RemoteConnectionEnabled<T> implements Setting.Validator<T> {
393+
394+
private final String clusterAlias;
395+
private final String key;
396+
397+
private RemoteConnectionEnabled(String clusterAlias, String key) {
398+
this.clusterAlias = clusterAlias;
399+
this.key = key;
400+
}
401+
402+
@Override
403+
public void validate(T value) {
404+
}
405+
406+
@Override
407+
public void validate(T value, Map<Setting<?>, Object> settings, boolean isPresent) {
408+
if (isPresent && RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings) == false) {
409+
throw new IllegalArgumentException("Cannot configure setting [" + key + "] if remote cluster is not enabled.");
410+
}
411+
}
412+
413+
@Override
414+
public Iterator<Setting<?>> settings() {
415+
return Stream.concat(Stream.of(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias)),
416+
settingsStream()).iterator();
417+
}
418+
419+
private Stream<Setting<?>> settingsStream() {
420+
return Arrays.stream(RemoteConnectionStrategy.ConnectionStrategy.values())
421+
.flatMap(strategy -> strategy.getEnablementSettings().get())
422+
.map(as -> as.getConcreteSettingForNamespace(clusterAlias));
423+
}
424+
};
389425
}

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public int getNumberOfChannels() {
8888
return numberOfChannels;
8989
}
9090

91+
public Supplier<Stream<Setting.AffixSetting<?>>> getEnablementSettings() {
92+
return enablementSettings;
93+
}
94+
9195
public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
9296
return reader.get();
9397
}
@@ -149,7 +153,7 @@ static RemoteConnectionStrategy buildStrategy(String clusterAlias, TransportServ
149153

150154
static Set<String> getRemoteClusters(Settings settings) {
151155
final Stream<Setting.AffixSetting<?>> enablementSettings = Arrays.stream(ConnectionStrategy.values())
152-
.flatMap(strategy -> strategy.enablementSettings.get());
156+
.flatMap(strategy -> strategy.getEnablementSettings().get());
153157
return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet());
154158
}
155159

@@ -159,7 +163,21 @@ public static boolean isConnectionEnabled(String clusterAlias, Settings settings
159163
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
160164
return seeds.isEmpty() == false;
161165
} else {
162-
String address = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings);
166+
String address = ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
167+
return Strings.isEmpty(address) == false;
168+
}
169+
}
170+
171+
@SuppressWarnings("unchecked")
172+
public static boolean isConnectionEnabled(String clusterAlias, Map<Setting<?>, Object> settings) {
173+
ConnectionStrategy mode = (ConnectionStrategy) settings.get(REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias));
174+
if (mode.equals(ConnectionStrategy.SNIFF)) {
175+
List<String> seeds = (List<String>) settings.get(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS
176+
.getConcreteSettingForNamespace(clusterAlias));
177+
return seeds.isEmpty() == false;
178+
} else {
179+
String address = (String) settings.get(ProxyConnectionStrategy.PROXY_ADDRESS
180+
.getConcreteSettingForNamespace(clusterAlias));
163181
return Strings.isEmpty(address) == false;
164182
}
165183
}

server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange(
291291

292292
Setting<?> modeSetting = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE
293293
.getConcreteSettingForNamespace("cluster-alias");
294-
Setting<?> addressesSetting = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES
294+
Setting<?> addressesSetting = ProxyConnectionStrategy.PROXY_ADDRESS
295295
.getConcreteSettingForNamespace("cluster-alias");
296296
Setting<?> socketConnections = ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS
297297
.getConcreteSettingForNamespace("cluster-alias");
@@ -320,7 +320,7 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange(
320320

321321
public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
322322
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
323-
new Tuple<>(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"),
323+
new Tuple<>(ProxyConnectionStrategy.PROXY_ADDRESS, "192.168.0.1:8080"),
324324
new Tuple<>(ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3"));
325325

326326
RemoteConnectionStrategy.ConnectionStrategy sniff = RemoteConnectionStrategy.ConnectionStrategy.SNIFF;

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@ private Settings buildRandomSettings(String clusterAlias, List<String> addresses
590590

591591
private static Settings buildProxySettings(String clusterAlias, List<String> addresses) {
592592
Settings.Builder builder = Settings.builder();
593-
builder.put(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).getKey(),
593+
builder.put(ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).getKey(),
594594
addresses.get(0));
595595
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "proxy");
596596
return builder.build();

0 commit comments

Comments
 (0)