Skip to content

Commit 0600e65

Browse files
authored
Make remote setting updates support diff strategies (#47891)
Currently the entire remote cluster settings infrastructure is designed around the sniff strategy. As we introduce an additional conneciton strategy this infrastructure needs to be modified to support it. This commit modifies the code so that the strategy implementations will tell the service if the connection needs to be torn down and rebuilt. As part of this commit, we will wait 10 seconds for new clusters to connect when they are added through the "update" settings infrastructure.
1 parent 18d87e3 commit 0600e65

File tree

15 files changed

+762
-671
lines changed

15 files changed

+762
-671
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

+19-21
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.elasticsearch.common.settings.Setting;
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.common.transport.TransportAddress;
33-
import org.elasticsearch.common.unit.TimeValue;
3433

3534
import java.net.InetAddress;
3635
import java.net.InetSocketAddress;
@@ -103,6 +102,17 @@ protected RemoteClusterAware(Settings settings) {
103102
this.clusterNameResolver = new ClusterNameExpressionResolver();
104103
}
105104

105+
/**
106+
* Returns remote clusters that are enabled in these settings
107+
*/
108+
protected static Set<String> getEnabledRemoteClusters(final Settings settings) {
109+
final Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
110+
return allConcreteSettings
111+
.map(REMOTE_CLUSTERS_SEEDS::getNamespace)
112+
.filter(clusterAlias -> RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings))
113+
.collect(Collectors.toSet());
114+
}
115+
106116
/**
107117
* Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple
108118
* (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to
@@ -177,29 +187,17 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
177187
return perClusterIndices;
178188
}
179189

180-
void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
181-
Boolean compress = TransportSettings.TRANSPORT_COMPRESS.get(settings);
182-
TimeValue pingSchedule = TransportSettings.PING_SCHEDULE.get(settings);
183-
updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
184-
}
185-
186-
void updateRemoteCluster(String clusterAlias, Settings settings) {
187-
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings);
188-
List<String> addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
189-
Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
190-
TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE
191-
.getConcreteSettingForNamespace(clusterAlias)
192-
.get(settings);
193-
194-
updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
190+
void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) {
191+
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
192+
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
193+
}
194+
updateRemoteCluster(clusterAlias, settings);
195195
}
196196

197197
/**
198-
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
199-
* empty the cluster alias is unregistered and should be removed.
198+
* Subclasses must implement this to receive information about updated cluster aliases.
200199
*/
201-
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy, boolean compressionEnabled,
202-
TimeValue pingSchedule);
200+
protected abstract void updateRemoteCluster(String clusterAlias, Settings settings);
203201

204202
/**
205203
* Registers this instance to listen to updates on the cluster settings.
@@ -208,7 +206,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
208206
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
209207
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
210208
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE);
211-
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster);
209+
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
212210
}
213211

214212
static InetSocketAddress parseSeedAddress(String remoteHost) {

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

+41-42
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.action.support.ContextPreservingActionListener;
2626
import org.elasticsearch.cluster.node.DiscoveryNode;
2727
import org.elasticsearch.cluster.node.DiscoveryNodes;
28-
import org.elasticsearch.common.collect.Tuple;
2928
import org.elasticsearch.common.io.stream.StreamInput;
3029
import org.elasticsearch.common.settings.Settings;
3130
import org.elasticsearch.common.unit.TimeValue;
@@ -36,11 +35,7 @@
3635
import java.io.Closeable;
3736
import java.io.IOException;
3837
import java.util.Collections;
39-
import java.util.List;
4038
import java.util.function.Function;
41-
import java.util.function.Predicate;
42-
import java.util.function.Supplier;
43-
import java.util.stream.Collectors;
4439

4540
/**
4641
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
@@ -61,46 +56,29 @@ final class RemoteClusterConnection implements Closeable {
6156
private final RemoteConnectionManager remoteConnectionManager;
6257
private final RemoteConnectionStrategy connectionStrategy;
6358
private final String clusterAlias;
64-
private final int maxNumRemoteConnections;
6559
private final ThreadPool threadPool;
66-
private final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes;
67-
private final String proxyAddress;
6860
private volatile boolean skipUnavailable;
6961
private final TimeValue initialConnectionTimeout;
7062

7163
/**
7264
* Creates a new {@link RemoteClusterConnection}
7365
* @param settings the nodes settings object
7466
* @param clusterAlias the configured alias of the cluster to connect to
75-
* @param seedNodes a list of seed nodes to discover eligible nodes from
7667
* @param transportService the local nodes transport service
77-
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
78-
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
79-
* @param proxyAddress the proxy address
80-
* @param connectionProfile the connection profile to use
8168
*/
82-
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
83-
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
84-
String proxyAddress, ConnectionProfile connectionProfile) {
85-
this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress,
86-
createConnectionManager(connectionProfile, transportService));
69+
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) {
70+
this(settings, clusterAlias, transportService,
71+
createConnectionManager(buildConnectionProfileFromSettings(settings, clusterAlias), transportService));
8772
}
8873

89-
// Public for tests to pass a StubbableConnectionManager
90-
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
91-
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
92-
String proxyAddress, ConnectionManager connectionManager) {
74+
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService,
75+
ConnectionManager connectionManager) {
9376
this.transportService = transportService;
94-
this.maxNumRemoteConnections = maxNumRemoteConnections;
9577
this.clusterAlias = clusterAlias;
9678
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
97-
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager,
98-
proxyAddress, maxNumRemoteConnections, nodePredicate,
99-
Collections.unmodifiableList(seedNodes));
79+
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
10080
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
10181
connectionManager.addListener(transportService);
102-
this.seedNodes = Collections.unmodifiableList(seedNodes);
103-
this.proxyAddress = proxyAddress;
10482
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
10583
.getConcreteSettingForNamespace(clusterAlias).get(settings);
10684
this.threadPool = transportService.threadPool;
@@ -125,11 +103,11 @@ boolean isSkipUnavailable() {
125103
* Ensures that this cluster is connected. If the cluster is connected this operation
126104
* will invoke the listener immediately.
127105
*/
128-
void ensureConnected(ActionListener<Void> voidActionListener) {
106+
void ensureConnected(ActionListener<Void> listener) {
129107
if (remoteConnectionManager.size() == 0) {
130-
connectionStrategy.connect(voidActionListener);
108+
connectionStrategy.connect(listener);
131109
} else {
132-
voidActionListener.onResponse(null);
110+
listener.onResponse(null);
133111
}
134112
}
135113

@@ -215,14 +193,6 @@ public boolean isClosed() {
215193
return connectionStrategy.isClosed();
216194
}
217195

218-
List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
219-
return seedNodes;
220-
}
221-
222-
String getProxyAddress() {
223-
return proxyAddress;
224-
}
225-
226196
// for testing only
227197
boolean assertNoRunningConnections() {
228198
return connectionStrategy.assertNoRunningConnections();
@@ -236,13 +206,24 @@ boolean isNodeConnected(final DiscoveryNode node) {
236206
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
237207
*/
238208
public RemoteConnectionInfo getConnectionInfo() {
239-
return new RemoteConnectionInfo(
209+
if (connectionStrategy instanceof SniffConnectionStrategy) {
210+
SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy;
211+
return new RemoteConnectionInfo(
212+
clusterAlias,
213+
sniffStrategy.getSeedNodes(),
214+
sniffStrategy.getMaxConnections(),
215+
getNumNodesConnected(),
216+
initialConnectionTimeout,
217+
skipUnavailable);
218+
} else {
219+
return new RemoteConnectionInfo(
240220
clusterAlias,
241-
seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()),
242-
maxNumRemoteConnections,
221+
Collections.emptyList(),
222+
0,
243223
getNumNodesConnected(),
244224
initialConnectionTimeout,
245225
skipUnavailable);
226+
}
246227
}
247228

248229
int getNumNodesConnected() {
@@ -256,4 +237,22 @@ private static ConnectionManager createConnectionManager(ConnectionProfile conne
256237
ConnectionManager getConnectionManager() {
257238
return remoteConnectionManager.getConnectionManager();
258239
}
240+
241+
public boolean shouldRebuildConnection(Settings newSettings) {
242+
return connectionStrategy.shouldRebuildConnection(newSettings);
243+
}
244+
245+
static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) {
246+
return new ConnectionProfile.Builder()
247+
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
248+
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
249+
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable?
250+
// we don't want this to be used for anything else but search
251+
.addConnections(0, TransportRequestOptions.Type.BULK,
252+
TransportRequestOptions.Type.STATE,
253+
TransportRequestOptions.Type.RECOVERY)
254+
.setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings))
255+
.setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings))
256+
.build();
257+
}
259258
}

0 commit comments

Comments
 (0)