Skip to content

Commit bf07aee

Browse files
committed
Retry follow task when remote connection queue full (elastic#55314)
If more than 100 shard-follow tasks are trying to connect to the remote cluster, then some of them will abort with "connect listener queue is full". This is because we retry on ESRejectedExecutionException, but not on RejectedExecutionException.
1 parent 66f0e17 commit bf07aee

File tree

5 files changed

+30
-4
lines changed

5 files changed

+30
-4
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.cluster.node.DiscoveryNodes;
4242
import org.elasticsearch.common.collect.Tuple;
4343
import org.elasticsearch.common.io.stream.StreamInput;
44+
import org.elasticsearch.common.settings.Setting;
4445
import org.elasticsearch.common.settings.Settings;
4546
import org.elasticsearch.common.transport.TransportAddress;
4647
import org.elasticsearch.common.unit.TimeValue;
@@ -86,7 +87,7 @@
8687
* {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of
8788
* connections per cluster has been reached.
8889
*/
89-
final class RemoteClusterConnection implements TransportConnectionListener, Closeable {
90+
public final class RemoteClusterConnection implements TransportConnectionListener, Closeable {
9091

9192
private static final Logger logger = LogManager.getLogger(RemoteClusterConnection.class);
9293

@@ -102,8 +103,14 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
102103
private volatile boolean skipUnavailable;
103104
private final ConnectHandler connectHandler;
104105
private final TimeValue initialConnectionTimeout;
106+
private final int maxPendingConnectionListeners;
107+
105108
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
106109

110+
// this setting is intentionally not registered, it is only used in tests
111+
public static final Setting<Integer> REMOTE_MAX_PENDING_CONNECTION_LISTENERS =
112+
Setting.intSetting("cluster.remote.max_pending_connection_listeners", 1000, Setting.Property.NodeScope);
113+
107114
/**
108115
* Creates a new {@link RemoteClusterConnection}
109116
* @param settings the nodes settings object
@@ -142,6 +149,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
142149
connectionManager.addListener(transportService);
143150
this.proxyAddress = proxyAddress;
144151
initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
152+
this.maxPendingConnectionListeners = REMOTE_MAX_PENDING_CONNECTION_LISTENERS.get(settings);
145153
}
146154

147155

@@ -393,14 +401,14 @@ public List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
393401
* There is at most one connect job running at any time. If such a connect job is triggered
394402
* while another job is running the provided listeners are queued and batched up until the current running job returns.
395403
*
396-
* The handler has a built-in queue that can hold up to 100 connect attempts and will reject requests once the queue is full.
404+
* The handler has a built-in queue that can hold up to 1000 connect attempts and will reject requests once the queue is full.
397405
* In a scenario when a remote cluster becomes unavailable we will queue requests up but if we can't connect quick enough
398406
* we will just reject the connect trigger which will lead to failing searches.
399407
*/
400408
private class ConnectHandler implements Closeable {
401409
private final Semaphore running = new Semaphore(1);
402410
private final AtomicBoolean closed = new AtomicBoolean(false);
403-
private final BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<>(100);
411+
private final BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<>(maxPendingConnectionListeners);
404412
private final CancellableThreads cancellableThreads = new CancellableThreads();
405413

406414
/**

test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.index.IndexService;
2727
import org.elasticsearch.index.IndexSettings;
2828
import org.elasticsearch.plugins.Plugin;
29+
import org.elasticsearch.transport.RemoteClusterConnection;
2930

3031
import java.util.Arrays;
3132
import java.util.List;
@@ -50,6 +51,7 @@ public List<Setting<?>> getSettings() {
5051
INDEX_CREATION_DATE_SETTING,
5152
PROVIDED_NAME_SETTING,
5253
TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING,
54+
RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS,
5355
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
5456
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING,
5557
IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.elasticsearch.test.TestCluster;
7777
import org.elasticsearch.test.discovery.TestZenDiscovery;
7878
import org.elasticsearch.test.transport.MockTransportService;
79+
import org.elasticsearch.transport.RemoteConnectionStrategy;
7980
import org.elasticsearch.transport.TransportService;
8081
import org.elasticsearch.xpack.ccr.CcrSettings;
8182
import org.elasticsearch.xpack.ccr.LocalStateCcr;
@@ -139,7 +140,11 @@ protected Settings leaderClusterSettings() {
139140
}
140141

141142
protected Settings followerClusterSettings() {
142-
return Settings.EMPTY;
143+
final Settings.Builder builder = Settings.builder();
144+
if (randomBoolean()) {
145+
builder.put(RemoteConnectionStrategy.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), randomIntBetween(1, 100));
146+
}
147+
return builder.build();
143148
}
144149

145150
@Before

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
117117
@Override
118118
protected Settings followerClusterSettings() {
119119
return Settings.builder()
120+
.put(super.followerClusterSettings())
120121
.put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200))
121122
.build();
122123
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java

+10
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.xcontent.XContentType;
1414
import org.elasticsearch.index.IndexSettings;
1515
import org.elasticsearch.transport.RemoteConnectionInfo;
16+
import org.elasticsearch.transport.RemoteConnectionStrategy;
1617
import org.elasticsearch.transport.TransportService;
1718
import org.elasticsearch.xpack.CcrIntegTestCase;
1819
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
@@ -37,6 +38,15 @@ protected boolean configureRemoteClusterViaNodeSettings() {
3738
return false;
3839
}
3940

41+
@Override
42+
protected Settings followerClusterSettings() {
43+
final Settings.Builder settings = Settings.builder().put(super.followerClusterSettings());
44+
if (randomBoolean()) {
45+
settings.put(RemoteConnectionStrategy.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), 1);
46+
}
47+
return settings.build();
48+
}
49+
4050
public void testFollowIndex() throws Exception {
4151
final String leaderIndexSettings = getIndexSettings(1, 0,
4252
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));

0 commit comments

Comments
 (0)