Skip to content

Commit ef5181c

Browse files
authored
Allow to enable pings for specific remote clusters (#34753)
When we connect to remote clusters, there may be a few more routers/firewalls in-between compared to when we connect to nodes in the same cluster. We've experienced cases where firewalls drop connections completely and keep-alives seem not to be enough, or they are not properly configured. With this commit we allow to enable application-level pings specifically from CCS nodes to the selected remote nodes through the new setting `cluster.remote.${clusterAlias}.transport.ping_schedule`. The new setting is similar `transport.ping_schedule` but it does not affect intra-cluster communication, pings are only sent to specific remote cluster when specifically enabled, as they are disabled by default. Relates to #34405
1 parent 72cb885 commit ef5181c

File tree

8 files changed

+177
-85
lines changed

8 files changed

+177
-85
lines changed

Diff for: docs/reference/modules/remote-clusters.asciidoc

+9
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,15 @@ PUT _cluster/settings
152152
by default, but they can selectively be made optional by setting this setting
153153
to `true`.
154154

155+
`cluster.remote.${cluster_alias}.transport.ping_schedule`::
156+
157+
Sets the time interval between regular application-level ping messages that
158+
are sent to ensure that transport connections to nodes belonging to remote
159+
clusters are kept alive. If set to `-1`, application-level ping messages to
160+
this remote cluster are not sent. If unset, application-level ping messages
161+
are sent according to the global `transport.ping_schedule` setting, which
162+
defaults to ``-1` meaning that pings are not sent.
163+
155164
[float]
156165
[[retrieve-remote-clusters-info]]
157166
=== Retrieving remote clusters info

Diff for: docs/reference/modules/transport.asciidoc

+3-3
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ between all nodes. Defaults to `false`.
4646

4747
|`transport.ping_schedule` | Schedule a regular application-level ping message
4848
to ensure that transport connections between nodes are kept alive. Defaults to
49-
`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable to
50-
correctly configure TCP keep-alives instead of using this feature, because TCP
51-
keep-alives apply to all kinds of long-lived connection and not just to
49+
`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable
50+
to correctly configure TCP keep-alives instead of using this feature, because
51+
TCP keep-alives apply to all kinds of long-lived connections and not just to
5252
transport connections.
5353

5454
|=======================================================================

Diff for: server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ public void apply(Settings value, Settings current, Settings previous) {
293293
RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE,
294294
RemoteClusterService.ENABLE_REMOTE_CLUSTERS,
295295
RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS,
296+
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
296297
TransportService.TRACE_LOG_EXCLUDE_SETTING,
297298
TransportService.TRACE_LOG_INCLUDE_SETTING,
298299
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,

Diff for: server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21-
import org.apache.logging.log4j.Logger;
2221
import org.apache.logging.log4j.LogManager;
22+
import org.apache.logging.log4j.Logger;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
2424
import org.elasticsearch.action.ActionListener;
2525
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -67,16 +67,15 @@ public class ConnectionManager implements Closeable {
6767
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
6868

6969
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
70-
this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings));
70+
this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings));
7171
}
7272

73-
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
73+
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) {
7474
this.transport = transport;
7575
this.threadPool = threadPool;
76-
this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings);
77-
this.defaultProfile = defaultProfile;
76+
this.pingSchedule = pingSchedule;
77+
this.defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(settings);
7878
this.lifecycle.moveToStarted();
79-
8079
if (pingSchedule.millis() > 0) {
8180
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing());
8281
}
@@ -252,6 +251,10 @@ private void ensureOpen() {
252251
}
253252
}
254253

254+
TimeValue getPingSchedule() {
255+
return pingSchedule;
256+
}
257+
255258
private class ScheduledPing extends AbstractLifecycleRunnable {
256259

257260
private ScheduledPing() {

Diff for: server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21-
import java.net.InetSocketAddress;
22-
import java.util.function.Supplier;
2321
import org.apache.logging.log4j.message.ParameterizedMessage;
2422
import org.apache.lucene.store.AlreadyClosedException;
2523
import org.apache.lucene.util.SetOnce;
@@ -48,6 +46,7 @@
4846

4947
import java.io.Closeable;
5048
import java.io.IOException;
49+
import java.net.InetSocketAddress;
5150
import java.util.ArrayList;
5251
import java.util.Collection;
5352
import java.util.Collections;
@@ -64,6 +63,7 @@
6463
import java.util.function.Consumer;
6564
import java.util.function.Function;
6665
import java.util.function.Predicate;
66+
import java.util.function.Supplier;
6767
import java.util.stream.Collectors;
6868

6969
/**
@@ -105,13 +105,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
105105
* @param connectionManager the connection manager to use for this remote connection
106106
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
107107
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
108+
* @param proxyAddress the proxy address
108109
*/
109-
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
110-
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
111-
Predicate<DiscoveryNode> nodePredicate) {
112-
this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null);
113-
}
114-
115110
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
116111
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
117112
Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
@@ -151,7 +146,7 @@ private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, Discovery
151146
InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress);
152147
return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node
153148
.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
154-
}
149+
}
155150
}
156151

157152
/**

Diff for: server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

+22-17
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.stream.Stream;
6161

6262
import static org.elasticsearch.common.settings.Setting.boolSetting;
63+
import static org.elasticsearch.common.settings.Setting.timeSetting;
6364

6465
/**
6566
* Basic service for accessing remote clusters via gateway nodes
@@ -166,6 +167,12 @@ public String getKey(final String key) {
166167
Setting.Property.NodeScope),
167168
REMOTE_CLUSTERS_SEEDS);
168169

170+
public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
171+
"cluster.remote.",
172+
"transport.ping_schedule",
173+
key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope),
174+
REMOTE_CLUSTERS_SEEDS);
175+
169176
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
170177
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());
171178

@@ -211,10 +218,13 @@ private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Su
211218
}
212219

213220
if (remote == null) { // this is a new cluster we have to add a new representation
214-
remote = new RemoteClusterConnection(settings, entry.getKey(), seedList, transportService,
215-
new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections,
216-
getNodePredicate(settings), proxyAddress);
217-
remoteClusters.put(entry.getKey(), remote);
221+
String clusterAlias = entry.getKey();
222+
TimeValue pingSchedule = REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings);
223+
ConnectionManager connectionManager = new ConnectionManager(settings, transportService.transport,
224+
transportService.threadPool, pingSchedule);
225+
remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, connectionManager,
226+
numRemoteConnections, getNodePredicate(settings), proxyAddress);
227+
remoteClusters.put(clusterAlias, remote);
218228
}
219229

220230
// now update the seed nodes no matter if it's new or already existing
@@ -340,31 +350,27 @@ public void onFailure(Exception e) {
340350
* @throws IllegalArgumentException if the remote cluster is unknown
341351
*/
342352
public Transport.Connection getConnection(DiscoveryNode node, String cluster) {
343-
RemoteClusterConnection connection = remoteClusters.get(cluster);
344-
if (connection == null) {
345-
throw new IllegalArgumentException("no such remote cluster: " + cluster);
346-
}
347-
return connection.getConnection(node);
353+
return getRemoteClusterConnection(cluster).getConnection(node);
348354
}
349355

350356
/**
351357
* Ensures that the given cluster alias is connected. If the cluster is connected this operation
352358
* will invoke the listener immediately.
353359
*/
354-
public void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
355-
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias);
356-
if (remoteClusterConnection == null) {
357-
throw new IllegalArgumentException("no such remote cluster: " + clusterAlias);
358-
}
359-
remoteClusterConnection.ensureConnected(listener);
360+
void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
361+
getRemoteClusterConnection(clusterAlias).ensureConnected(listener);
360362
}
361363

362364
public Transport.Connection getConnection(String cluster) {
365+
return getRemoteClusterConnection(cluster).getConnection();
366+
}
367+
368+
RemoteClusterConnection getRemoteClusterConnection(String cluster) {
363369
RemoteClusterConnection connection = remoteClusters.get(cluster);
364370
if (connection == null) {
365371
throw new IllegalArgumentException("no such remote cluster: " + cluster);
366372
}
367-
return connection.getConnection();
373+
return connection;
368374
}
369375

370376
@Override
@@ -386,7 +392,6 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail
386392
}
387393
}
388394

389-
390395
@Override
391396
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
392397
updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {}));

0 commit comments

Comments
 (0)