Skip to content

Commit 785547d

Browse files
committed
Make remote setting updates support diff strategies (elastic#47891)
Currently the entire remote cluster settings infrastructure is designed around the sniff strategy. As we introduce an additional conneciton strategy this infrastructure needs to be modified to support it. This commit modifies the code so that the strategy implementations will tell the service if the connection needs to be torn down and rebuilt. As part of this commit, we will wait 10 seconds for new clusters to connect when they are added through the "update" settings infrastructure.
1 parent e29a3ee commit 785547d

File tree

15 files changed

+826
-612
lines changed

15 files changed

+826
-612
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

+110-115
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

+39-40
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.action.support.ContextPreservingActionListener;
2626
import org.elasticsearch.cluster.node.DiscoveryNode;
2727
import org.elasticsearch.cluster.node.DiscoveryNodes;
28-
import org.elasticsearch.common.collect.Tuple;
2928
import org.elasticsearch.common.io.stream.StreamInput;
3029
import org.elasticsearch.common.settings.Settings;
3130
import org.elasticsearch.common.unit.TimeValue;
@@ -36,11 +35,7 @@
3635
import java.io.Closeable;
3736
import java.io.IOException;
3837
import java.util.Collections;
39-
import java.util.List;
4038
import java.util.function.Function;
41-
import java.util.function.Predicate;
42-
import java.util.function.Supplier;
43-
import java.util.stream.Collectors;
4439

4540
/**
4641
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
@@ -61,46 +56,29 @@ final class RemoteClusterConnection implements Closeable {
6156
private final RemoteConnectionManager remoteConnectionManager;
6257
private final RemoteConnectionStrategy connectionStrategy;
6358
private final String clusterAlias;
64-
private final int maxNumRemoteConnections;
6559
private final ThreadPool threadPool;
66-
private final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes;
67-
private final String proxyAddress;
6860
private volatile boolean skipUnavailable;
6961
private final TimeValue initialConnectionTimeout;
7062

7163
/**
7264
* Creates a new {@link RemoteClusterConnection}
7365
* @param settings the nodes settings object
7466
* @param clusterAlias the configured alias of the cluster to connect to
75-
* @param seedNodes a list of seed nodes to discover eligible nodes from
7667
* @param transportService the local nodes transport service
77-
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
78-
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
79-
* @param proxyAddress the proxy address
80-
* @param connectionProfile the connection profile to use
8168
*/
82-
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
83-
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
84-
String proxyAddress, ConnectionProfile connectionProfile) {
85-
this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress,
86-
createConnectionManager(connectionProfile, transportService));
69+
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) {
70+
this(settings, clusterAlias, transportService,
71+
createConnectionManager(buildConnectionProfileFromSettings(settings, clusterAlias), transportService));
8772
}
8873

89-
// Public for tests to pass a StubbableConnectionManager
90-
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
91-
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
92-
String proxyAddress, ConnectionManager connectionManager) {
74+
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService,
75+
ConnectionManager connectionManager) {
9376
this.transportService = transportService;
94-
this.maxNumRemoteConnections = maxNumRemoteConnections;
9577
this.clusterAlias = clusterAlias;
9678
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
97-
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager,
98-
proxyAddress, maxNumRemoteConnections, nodePredicate,
99-
Collections.unmodifiableList(seedNodes));
79+
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
10080
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
10181
connectionManager.addListener(transportService);
102-
this.seedNodes = Collections.unmodifiableList(seedNodes);
103-
this.proxyAddress = proxyAddress;
10482
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
10583
.getConcreteSettingForNamespace(clusterAlias).get(settings);
10684
this.threadPool = transportService.threadPool;
@@ -125,11 +103,11 @@ boolean isSkipUnavailable() {
125103
* Ensures that this cluster is connected. If the cluster is connected this operation
126104
* will invoke the listener immediately.
127105
*/
128-
void ensureConnected(ActionListener<Void> voidActionListener) {
106+
void ensureConnected(ActionListener<Void> listener) {
129107
if (remoteConnectionManager.size() == 0) {
130-
connectionStrategy.connect(voidActionListener);
108+
connectionStrategy.connect(listener);
131109
} else {
132-
voidActionListener.onResponse(null);
110+
listener.onResponse(null);
133111
}
134112
}
135113

@@ -215,10 +193,6 @@ public boolean isClosed() {
215193
return connectionStrategy.isClosed();
216194
}
217195

218-
List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
219-
return seedNodes;
220-
}
221-
222196
// for testing only
223197
boolean assertNoRunningConnections() {
224198
return connectionStrategy.assertNoRunningConnections();
@@ -232,13 +206,24 @@ boolean isNodeConnected(final DiscoveryNode node) {
232206
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
233207
*/
234208
public RemoteConnectionInfo getConnectionInfo() {
235-
return new RemoteConnectionInfo(
209+
if (connectionStrategy instanceof SniffConnectionStrategy) {
210+
SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy;
211+
return new RemoteConnectionInfo(
212+
clusterAlias,
213+
sniffStrategy.getSeedNodes(),
214+
sniffStrategy.getMaxConnections(),
215+
getNumNodesConnected(),
216+
initialConnectionTimeout,
217+
skipUnavailable);
218+
} else {
219+
return new RemoteConnectionInfo(
236220
clusterAlias,
237-
seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()),
238-
maxNumRemoteConnections,
221+
Collections.emptyList(),
222+
0,
239223
getNumNodesConnected(),
240224
initialConnectionTimeout,
241225
skipUnavailable);
226+
}
242227
}
243228

244229
int getNumNodesConnected() {
@@ -253,7 +238,21 @@ ConnectionManager getConnectionManager() {
253238
return remoteConnectionManager.getConnectionManager();
254239
}
255240

256-
public String getProxyAddress() {
257-
return proxyAddress;
241+
public boolean shouldRebuildConnection(Settings newSettings) {
242+
return connectionStrategy.shouldRebuildConnection(newSettings);
243+
}
244+
245+
static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) {
246+
return new ConnectionProfile.Builder()
247+
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
248+
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
249+
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable?
250+
// we don't want this to be used for anything else but search
251+
.addConnections(0, TransportRequestOptions.Type.BULK,
252+
TransportRequestOptions.Type.STATE,
253+
TransportRequestOptions.Type.RECOVERY)
254+
.setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings))
255+
.setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings))
256+
.build();
258257
}
259258
}

0 commit comments

Comments
 (0)