-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Make remote setting updates support diff strategies #47891
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
abed4fd
34c86d9
fb32fd3
1e50476
9aeffaa
52570f6
537db5b
78b15f3
da479f2
ddb80c4
d1e25cf
6fdeb79
f1128c2
d871875
dab3a04
44aad3b
815bcdb
bdc269e
2cbaab0
25862b3
4629e8a
82fd446
7214854
08d0be2
30eb586
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,6 @@ | |
import org.elasticsearch.action.support.ContextPreservingActionListener; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
import org.elasticsearch.common.collect.Tuple; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
|
@@ -36,11 +35,7 @@ | |
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.function.Function; | ||
import java.util.function.Predicate; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the | ||
|
@@ -61,46 +56,29 @@ final class RemoteClusterConnection implements Closeable { | |
private final RemoteConnectionManager remoteConnectionManager; | ||
private final RemoteConnectionStrategy connectionStrategy; | ||
private final String clusterAlias; | ||
private final int maxNumRemoteConnections; | ||
private final ThreadPool threadPool; | ||
private final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes; | ||
private final String proxyAddress; | ||
private volatile boolean skipUnavailable; | ||
private final TimeValue initialConnectionTimeout; | ||
|
||
/** | ||
* Creates a new {@link RemoteClusterConnection} | ||
* @param settings the nodes settings object | ||
* @param clusterAlias the configured alias of the cluster to connect to | ||
* @param seedNodes a list of seed nodes to discover eligible nodes from | ||
* @param transportService the local nodes transport service | ||
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster | ||
* @param nodePredicate a predicate to filter eligible remote nodes to connect to | ||
* @param proxyAddress the proxy address | ||
* @param connectionProfile the connection profile to use | ||
*/ | ||
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes, | ||
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate, | ||
String proxyAddress, ConnectionProfile connectionProfile) { | ||
this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress, | ||
createConnectionManager(connectionProfile, transportService)); | ||
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) { | ||
this(settings, clusterAlias, transportService, | ||
createConnectionManager(buildConnectionProfileFromSettings(settings, clusterAlias), transportService)); | ||
} | ||
|
||
// Public for tests to pass a StubbableConnectionManager | ||
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes, | ||
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate, | ||
String proxyAddress, ConnectionManager connectionManager) { | ||
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService, | ||
ConnectionManager connectionManager) { | ||
this.transportService = transportService; | ||
this.maxNumRemoteConnections = maxNumRemoteConnections; | ||
this.clusterAlias = clusterAlias; | ||
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); | ||
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager, | ||
proxyAddress, maxNumRemoteConnections, nodePredicate, | ||
Collections.unmodifiableList(seedNodes)); | ||
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager, settings); | ||
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc. | ||
connectionManager.addListener(transportService); | ||
this.seedNodes = Collections.unmodifiableList(seedNodes); | ||
this.proxyAddress = proxyAddress; | ||
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE | ||
.getConcreteSettingForNamespace(clusterAlias).get(settings); | ||
this.threadPool = transportService.threadPool; | ||
|
@@ -125,11 +103,11 @@ boolean isSkipUnavailable() { | |
* Ensures that this cluster is connected. If the cluster is connected this operation | ||
* will invoke the listener immediately. | ||
*/ | ||
void ensureConnected(ActionListener<Void> voidActionListener) { | ||
void ensureConnected(ActionListener<Void> listener) { | ||
if (remoteConnectionManager.size() == 0) { | ||
connectionStrategy.connect(voidActionListener); | ||
connectionStrategy.connect(listener); | ||
} else { | ||
voidActionListener.onResponse(null); | ||
listener.onResponse(null); | ||
} | ||
} | ||
|
||
|
@@ -215,14 +193,6 @@ public boolean isClosed() { | |
return connectionStrategy.isClosed(); | ||
} | ||
|
||
List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() { | ||
return seedNodes; | ||
} | ||
|
||
String getProxyAddress() { | ||
return proxyAddress; | ||
} | ||
|
||
// for testing only | ||
boolean assertNoRunningConnections() { | ||
return connectionStrategy.assertNoRunningConnections(); | ||
|
@@ -236,13 +206,24 @@ boolean isNodeConnected(final DiscoveryNode node) { | |
* Get the information about remote nodes to be rendered on {@code _remote/info} requests. | ||
*/ | ||
public RemoteConnectionInfo getConnectionInfo() { | ||
return new RemoteConnectionInfo( | ||
if (connectionStrategy instanceof SniffConnectionStrategy) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of instanceof, can we call a method on the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would like to work on this in follow-up. The |
||
SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy; | ||
return new RemoteConnectionInfo( | ||
clusterAlias, | ||
sniffStrategy.getSeedNodes(), | ||
sniffStrategy.getMaxConnections(), | ||
getNumNodesConnected(), | ||
initialConnectionTimeout, | ||
skipUnavailable); | ||
} else { | ||
return new RemoteConnectionInfo( | ||
clusterAlias, | ||
seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()), | ||
maxNumRemoteConnections, | ||
Collections.emptyList(), | ||
0, | ||
getNumNodesConnected(), | ||
initialConnectionTimeout, | ||
skipUnavailable); | ||
} | ||
} | ||
|
||
int getNumNodesConnected() { | ||
|
@@ -256,4 +237,22 @@ private static ConnectionManager createConnectionManager(ConnectionProfile conne | |
ConnectionManager getConnectionManager() { | ||
return remoteConnectionManager.getConnectionManager(); | ||
} | ||
|
||
public boolean shouldRebuildConnection(Settings newSettings) { | ||
return connectionStrategy.shouldRebuildConnection(newSettings); | ||
} | ||
|
||
static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) { | ||
return new ConnectionProfile.Builder() | ||
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) | ||
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) | ||
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? | ||
// we don't want this to be used for anything else but search | ||
.addConnections(0, TransportRequestOptions.Type.BULK, | ||
TransportRequestOptions.Type.STATE, | ||
TransportRequestOptions.Type.RECOVERY) | ||
.setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings)) | ||
.setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings)) | ||
.build(); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.