Skip to content

Commit 5216bd2

Browse files
authored
Retry follow task when remote connection queue full (#55314)
If more than 100 shard-follow tasks are trying to connect to the remote cluster, then some of them will abort with "connect listener queue is full". This is because we retry on ESRejectedExecutionException, but not on RejectedExecutionException.
1 parent 1bb9283 commit 5216bd2

File tree

10 files changed

+72
-46
lines changed

10 files changed

+72
-46
lines changed

server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,27 +98,28 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
9898
clusterAlias,
9999
transportService,
100100
connectionManager,
101+
settings,
101102
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
102103
PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings),
103104
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
104105
}
105106

106107
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
107-
int maxNumConnections, String configuredAddress) {
108-
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
108+
Settings settings, int maxNumConnections, String configuredAddress) {
109+
this(clusterAlias, transportService, connectionManager, settings, maxNumConnections, configuredAddress,
109110
() -> resolveAddress(configuredAddress), null);
110111
}
111112

112113
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
113-
int maxNumConnections, String configuredAddress, String configuredServerName) {
114-
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
114+
Settings settings, int maxNumConnections, String configuredAddress, String configuredServerName) {
115+
this(clusterAlias, transportService, connectionManager, settings, maxNumConnections, configuredAddress,
115116
() -> resolveAddress(configuredAddress), configuredServerName);
116117
}
117118

118119
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
119-
int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
120+
Settings settings, int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
120121
String configuredServerName) {
121-
super(clusterAlias, transportService, connectionManager);
122+
super(clusterAlias, transportService, connectionManager, settings);
122123
this.maxNumConnections = maxNumConnections;
123124
this.configuredAddress = configuredAddress;
124125
this.configuredServerName = configuredServerName;

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.settings.Settings;
3333
import org.elasticsearch.common.unit.TimeValue;
3434
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
35+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3536
import org.elasticsearch.threadpool.ThreadPool;
3637

3738
import java.io.Closeable;
@@ -48,7 +49,6 @@
4849
import java.util.Objects;
4950
import java.util.Set;
5051
import java.util.concurrent.ExecutorService;
51-
import java.util.concurrent.RejectedExecutionException;
5252
import java.util.concurrent.atomic.AtomicBoolean;
5353
import java.util.function.Consumer;
5454
import java.util.function.Supplier;
@@ -105,10 +105,14 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
105105
Setting.Property.NodeScope,
106106
Setting.Property.Dynamic));
107107

108+
// this setting is intentionally not registered, it is only used in tests
109+
public static final Setting<Integer> REMOTE_MAX_PENDING_CONNECTION_LISTENERS =
110+
Setting.intSetting("cluster.remote.max_pending_connection_listeners", 1000, Setting.Property.NodeScope);
111+
112+
private final int maxPendingConnectionListeners;
108113

109114
private static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class);
110115

111-
private static final int MAX_LISTENERS = 100;
112116
private final AtomicBoolean closed = new AtomicBoolean(false);
113117
private final Object mutex = new Object();
114118
private List<ActionListener<Void>> listeners = new ArrayList<>();
@@ -117,10 +121,12 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
117121
protected final RemoteConnectionManager connectionManager;
118122
protected final String clusterAlias;
119123

120-
RemoteConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager) {
124+
RemoteConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
125+
Settings settings) {
121126
this.clusterAlias = clusterAlias;
122127
this.transportService = transportService;
123128
this.connectionManager = connectionManager;
129+
this.maxPendingConnectionListeners = REMOTE_MAX_PENDING_CONNECTION_LISTENERS.get(settings);
124130
connectionManager.addListener(this);
125131
}
126132

@@ -237,9 +243,9 @@ void connect(ActionListener<Void> connectListener) {
237243
if (closed) {
238244
assert listeners.isEmpty();
239245
} else {
240-
if (listeners.size() >= MAX_LISTENERS) {
241-
assert listeners.size() == MAX_LISTENERS;
242-
listener.onFailure(new RejectedExecutionException("connect listener queue is full"));
246+
if (listeners.size() >= maxPendingConnectionListeners) {
247+
assert listeners.size() == maxPendingConnectionListeners;
248+
listener.onFailure(new EsRejectedExecutionException("connect listener queue is full"));
243249
return;
244250
} else {
245251
listeners.add(listener);

server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,23 +151,24 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
151151
transportService,
152152
connectionManager,
153153
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings),
154+
settings,
154155
REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
155156
getNodePredicate(settings),
156157
REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings));
157158
}
158159

159160
SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
160-
String proxyAddress, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
161+
String proxyAddress, Settings settings, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
161162
List<String> configuredSeedNodes) {
162-
this(clusterAlias, transportService, connectionManager, proxyAddress, maxNumRemoteConnections, nodePredicate, configuredSeedNodes,
163-
configuredSeedNodes.stream().map(seedAddress ->
163+
this(clusterAlias, transportService, connectionManager, proxyAddress, settings, maxNumRemoteConnections, nodePredicate,
164+
configuredSeedNodes, configuredSeedNodes.stream().map(seedAddress ->
164165
(Supplier<DiscoveryNode>) () -> resolveSeedNode(clusterAlias, seedAddress, proxyAddress)).collect(Collectors.toList()));
165166
}
166167

167168
SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
168-
String proxyAddress, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
169+
String proxyAddress, Settings settings, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
169170
List<String> configuredSeedNodes, List<Supplier<DiscoveryNode>> seedNodes) {
170-
super(clusterAlias, transportService, connectionManager);
171+
super(clusterAlias, transportService, connectionManager, settings);
171172
this.proxyAddress = proxyAddress;
172173
this.maxNumRemoteConnections = maxNumRemoteConnections;
173174
this.nodePredicate = nodePredicate;

server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddress() {
9494
int numOfConnections = randomIntBetween(4, 8);
9595
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
9696
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
97-
numOfConnections, address1.toString())) {
97+
Settings.EMPTY, numOfConnections, address1.toString())) {
9898
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
9999

100100
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
@@ -126,7 +126,8 @@ public void testProxyStrategyWillOpenNewConnectionsOnDisconnect() throws Excepti
126126

127127
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
128128
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
129-
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) {
129+
Settings.EMPTY, numOfConnections, address1.toString(),
130+
alternatingResolver(address1, address2, useAddress1), null)) {
130131
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
131132
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
132133

@@ -173,7 +174,7 @@ public void testConnectFailsWithIncompatibleNodes() {
173174
int numOfConnections = randomIntBetween(4, 8);
174175
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
175176
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
176-
numOfConnections, address1.toString())) {
177+
Settings.EMPTY, numOfConnections, address1.toString())) {
177178

178179
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
179180
strategy.connect(connectFuture);
@@ -206,7 +207,8 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro
206207

207208
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
208209
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
209-
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) {
210+
Settings.EMPTY, numOfConnections, address1.toString(),
211+
alternatingResolver(address1, address2, useAddress1), null)) {
210212
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
211213
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
212214

@@ -255,7 +257,7 @@ public void testProxyStrategyWillResolveAddressesEachConnect() throws Exception
255257
int numOfConnections = randomIntBetween(4, 8);
256258
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
257259
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
258-
numOfConnections, address.toString(), addressSupplier, null)) {
260+
Settings.EMPTY, numOfConnections, address.toString(), addressSupplier, null)) {
259261
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
260262
strategy.connect(connectFuture);
261263
connectFuture.actionGet();
@@ -280,7 +282,7 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesOrServe
280282
int numOfConnections = randomIntBetween(4, 8);
281283
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
282284
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
283-
numOfConnections, remoteAddress.toString(), "server-name")) {
285+
Settings.EMPTY, numOfConnections, remoteAddress.toString(), "server-name")) {
284286
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
285287
strategy.connect(connectFuture);
286288
connectFuture.actionGet();
@@ -373,7 +375,7 @@ public void testServerNameAttributes() {
373375
int numOfConnections = randomIntBetween(4, 8);
374376
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
375377
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
376-
numOfConnections, address, "localhost")) {
378+
Settings.EMPTY, numOfConnections, address, "localhost")) {
377379
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
378380

379381
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();

server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private static class FakeConnectionStrategy extends RemoteConnectionStrategy {
9191

9292
FakeConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
9393
RemoteConnectionStrategy.ConnectionStrategy strategy) {
94-
super(clusterAlias, transportService, connectionManager);
94+
super(clusterAlias, transportService, connectionManager, Settings.EMPTY);
9595
this.strategy = strategy;
9696
}
9797

0 commit comments

Comments
 (0)