Skip to content

Commit 6d52d10

Browse files
committed
Retry follow task when remote connection queue full (#55314)
If more than 100 shard-follow tasks are trying to connect to the remote cluster, then some of them will abort with "connect listener queue is full". This is because we retry on ESRejectedExecutionException, but not on RejectedExecutionException.
1 parent 939fdf5 commit 6d52d10

File tree

10 files changed

+71
-45
lines changed

10 files changed

+71
-45
lines changed

server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -98,27 +98,28 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
9898
clusterAlias,
9999
transportService,
100100
connectionManager,
101+
settings,
101102
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
102103
PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings),
103104
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
104105
}
105106

106107
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
107-
int maxNumConnections, String configuredAddress) {
108-
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
108+
Settings settings, int maxNumConnections, String configuredAddress) {
109+
this(clusterAlias, transportService, connectionManager, settings, maxNumConnections, configuredAddress,
109110
() -> resolveAddress(configuredAddress), null);
110111
}
111112

112113
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
113-
int maxNumConnections, String configuredAddress, String configuredServerName) {
114-
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
114+
Settings settings, int maxNumConnections, String configuredAddress, String configuredServerName) {
115+
this(clusterAlias, transportService, connectionManager, settings, maxNumConnections, configuredAddress,
115116
() -> resolveAddress(configuredAddress), configuredServerName);
116117
}
117118

118119
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
119-
int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
120+
Settings settings, int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
120121
String configuredServerName) {
121-
super(clusterAlias, transportService, connectionManager);
122+
super(clusterAlias, transportService, connectionManager, settings);
122123
this.maxNumConnections = maxNumConnections;
123124
this.configuredAddress = configuredAddress;
124125
this.configuredServerName = configuredServerName;

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.settings.Settings;
3333
import org.elasticsearch.common.unit.TimeValue;
3434
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
35+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3536
import org.elasticsearch.threadpool.ThreadPool;
3637

3738
import java.io.Closeable;
@@ -48,7 +49,6 @@
4849
import java.util.Objects;
4950
import java.util.Set;
5051
import java.util.concurrent.ExecutorService;
51-
import java.util.concurrent.RejectedExecutionException;
5252
import java.util.concurrent.atomic.AtomicBoolean;
5353
import java.util.function.Consumer;
5454
import java.util.function.Supplier;
@@ -105,10 +105,14 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
105105
Setting.Property.NodeScope,
106106
Setting.Property.Dynamic));
107107

108+
// this setting is intentionally not registered, it is only used in tests
109+
public static final Setting<Integer> REMOTE_MAX_PENDING_CONNECTION_LISTENERS =
110+
Setting.intSetting("cluster.remote.max_pending_connection_listeners", 1000, Setting.Property.NodeScope);
111+
112+
private final int maxPendingConnectionListeners;
108113

109114
private static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class);
110115

111-
private static final int MAX_LISTENERS = 100;
112116
private final AtomicBoolean closed = new AtomicBoolean(false);
113117
private final Object mutex = new Object();
114118
private List<ActionListener<Void>> listeners = new ArrayList<>();
@@ -117,10 +121,12 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
117121
protected final RemoteConnectionManager connectionManager;
118122
protected final String clusterAlias;
119123

120-
RemoteConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager) {
124+
RemoteConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
125+
Settings settings) {
121126
this.clusterAlias = clusterAlias;
122127
this.transportService = transportService;
123128
this.connectionManager = connectionManager;
129+
this.maxPendingConnectionListeners = REMOTE_MAX_PENDING_CONNECTION_LISTENERS.get(settings);
124130
connectionManager.addListener(this);
125131
}
126132

@@ -237,9 +243,9 @@ void connect(ActionListener<Void> connectListener) {
237243
if (closed) {
238244
assert listeners.isEmpty();
239245
} else {
240-
if (listeners.size() >= MAX_LISTENERS) {
241-
assert listeners.size() == MAX_LISTENERS;
242-
listener.onFailure(new RejectedExecutionException("connect listener queue is full"));
246+
if (listeners.size() >= maxPendingConnectionListeners) {
247+
assert listeners.size() == maxPendingConnectionListeners;
248+
listener.onFailure(new EsRejectedExecutionException("connect listener queue is full"));
243249
return;
244250
} else {
245251
listeners.add(listener);

server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -220,23 +220,24 @@ public String getKey(final String key) {
220220
transportService,
221221
connectionManager,
222222
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings),
223+
settings,
223224
REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
224225
getNodePredicate(settings),
225226
REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings));
226227
}
227228

228229
SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
229-
String proxyAddress, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
230+
String proxyAddress, Settings settings, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
230231
List<String> configuredSeedNodes) {
231-
this(clusterAlias, transportService, connectionManager, proxyAddress, maxNumRemoteConnections, nodePredicate, configuredSeedNodes,
232-
configuredSeedNodes.stream().map(seedAddress ->
232+
this(clusterAlias, transportService, connectionManager, proxyAddress, settings, maxNumRemoteConnections, nodePredicate,
233+
configuredSeedNodes, configuredSeedNodes.stream().map(seedAddress ->
233234
(Supplier<DiscoveryNode>) () -> resolveSeedNode(clusterAlias, seedAddress, proxyAddress)).collect(Collectors.toList()));
234235
}
235236

236237
SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
237-
String proxyAddress, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
238+
String proxyAddress, Settings settings, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
238239
List<String> configuredSeedNodes, List<Supplier<DiscoveryNode>> seedNodes) {
239-
super(clusterAlias, transportService, connectionManager);
240+
super(clusterAlias, transportService, connectionManager, settings);
240241
this.proxyAddress = proxyAddress;
241242
this.maxNumRemoteConnections = maxNumRemoteConnections;
242243
this.nodePredicate = nodePredicate;

server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddress() {
9494
int numOfConnections = randomIntBetween(4, 8);
9595
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
9696
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
97-
numOfConnections, address1.toString())) {
97+
Settings.EMPTY, numOfConnections, address1.toString())) {
9898
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
9999

100100
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
@@ -126,7 +126,8 @@ public void testProxyStrategyWillOpenNewConnectionsOnDisconnect() throws Excepti
126126

127127
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
128128
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
129-
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) {
129+
Settings.EMPTY, numOfConnections, address1.toString(),
130+
alternatingResolver(address1, address2, useAddress1), null)) {
130131
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
131132
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
132133

@@ -173,7 +174,7 @@ public void testConnectFailsWithIncompatibleNodes() {
173174
int numOfConnections = randomIntBetween(4, 8);
174175
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
175176
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
176-
numOfConnections, address1.toString())) {
177+
Settings.EMPTY, numOfConnections, address1.toString())) {
177178

178179
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
179180
strategy.connect(connectFuture);
@@ -206,7 +207,8 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro
206207

207208
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
208209
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
209-
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) {
210+
Settings.EMPTY, numOfConnections, address1.toString(),
211+
alternatingResolver(address1, address2, useAddress1), null)) {
210212
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
211213
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
212214

@@ -255,7 +257,7 @@ public void testProxyStrategyWillResolveAddressesEachConnect() throws Exception
255257
int numOfConnections = randomIntBetween(4, 8);
256258
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
257259
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
258-
numOfConnections, address.toString(), addressSupplier, null)) {
260+
Settings.EMPTY, numOfConnections, address.toString(), addressSupplier, null)) {
259261
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
260262
strategy.connect(connectFuture);
261263
connectFuture.actionGet();
@@ -280,7 +282,7 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesOrServe
280282
int numOfConnections = randomIntBetween(4, 8);
281283
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
282284
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
283-
numOfConnections, remoteAddress.toString(), "server-name")) {
285+
Settings.EMPTY, numOfConnections, remoteAddress.toString(), "server-name")) {
284286
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
285287
strategy.connect(connectFuture);
286288
connectFuture.actionGet();
@@ -373,7 +375,7 @@ public void testServerNameAttributes() {
373375
int numOfConnections = randomIntBetween(4, 8);
374376
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
375377
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
376-
numOfConnections, address, "localhost")) {
378+
Settings.EMPTY, numOfConnections, address, "localhost")) {
377379
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
378380

379381
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();

server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private static class FakeConnectionStrategy extends RemoteConnectionStrategy {
9191

9292
FakeConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
9393
RemoteConnectionStrategy.ConnectionStrategy strategy) {
94-
super(clusterAlias, transportService, connectionManager);
94+
super(clusterAlias, transportService, connectionManager, Settings.EMPTY);
9595
this.strategy = strategy;
9696
}
9797

0 commit comments

Comments
 (0)