Skip to content

Commit a5e1f9f

Browse files
committed
Reduce channels in AbstractSimpleTransportTestCase (elastic#34863) (elastic#34880)
This is related to elastic#30876. The AbstractSimpleTransportTestCase initiates many tcp connections. There are normally over 1,000 connections in TIME_WAIT at the end of the test. This is because every test opens at least two different transports that connect to each other with 13 channel connection profiles. This commit modifies the default connection profile used by this test to 6. One connection for each type, except for REG which gets 2 connections.
1 parent 2ae0d1b commit a5e1f9f

File tree

4 files changed

+73
-7
lines changed

4 files changed

+73
-7
lines changed

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ protected Version getCurrentVersion() {
7373
}
7474
};
7575
MockTransportService mockTransportService =
76-
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
76+
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
7777
mockTransportService.start();
7878
return mockTransportService;
7979
}

server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21+
import org.elasticsearch.cluster.node.DiscoveryNode;
2122
import org.elasticsearch.common.Nullable;
23+
import org.elasticsearch.common.settings.Settings;
2224
import org.elasticsearch.common.unit.TimeValue;
2325

2426
import java.util.ArrayList;
2527
import java.util.Arrays;
2628
import java.util.Collections;
2729
import java.util.EnumSet;
2830
import java.util.List;
31+
import java.util.Objects;
2932
import java.util.Set;
3033
import java.util.concurrent.atomic.AtomicInteger;
3134

@@ -70,6 +73,55 @@ private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections
7073
}
7174

7275
/**
76+
<<<<<<< HEAD
77+
=======
78+
* takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile
79+
*/
80+
public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile profile, ConnectionProfile fallbackProfile) {
81+
Objects.requireNonNull(fallbackProfile);
82+
if (profile == null) {
83+
return fallbackProfile;
84+
} else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) {
85+
return profile;
86+
} else {
87+
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile);
88+
if (profile.getConnectTimeout() == null) {
89+
builder.setConnectTimeout(fallbackProfile.getConnectTimeout());
90+
}
91+
if (profile.getHandshakeTimeout() == null) {
92+
builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout());
93+
}
94+
return builder.build();
95+
}
96+
}
97+
98+
/**
99+
* Builds a default connection profile based on the provided settings.
100+
*
101+
* @param settings to build the connection profile from
102+
* @return the connection profile
103+
*/
104+
public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
105+
int connectionsPerNodeRecovery = TcpTransport.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
106+
int connectionsPerNodeBulk = TcpTransport.CONNECTIONS_PER_NODE_BULK.get(settings);
107+
int connectionsPerNodeReg = TcpTransport.CONNECTIONS_PER_NODE_REG.get(settings);
108+
int connectionsPerNodeState = TcpTransport.CONNECTIONS_PER_NODE_STATE.get(settings);
109+
int connectionsPerNodePing = TcpTransport.CONNECTIONS_PER_NODE_PING.get(settings);
110+
Builder builder = new Builder();
111+
builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
112+
builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
113+
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
114+
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
115+
// if we are not master eligible we don't need a dedicated channel to publish the state
116+
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
117+
// if we are not a data-node we don't need any dedicated channels for recovery
118+
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
119+
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
120+
return builder.build();
121+
}
122+
123+
/**
124+
>>>>>>> 419ad4569dc... Reduce channels in AbstractSimpleTransportTestCase (#34863) (#34880)
73125
* A builder to build a new {@link ConnectionProfile}
74126
*/
75127
public static class Builder {

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
110110
protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);
111111

112112
protected int channelsPerNodeConnection() {
113-
return 13;
113+
// This is a customized profile for this test case.
114+
return 6;
114115
}
115116

116117
@Override
@@ -119,9 +120,17 @@ public void setUp() throws Exception {
119120
super.setUp();
120121
threadPool = new TestThreadPool(getClass().getName());
121122
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
122-
serviceA = buildService("TS_A", version0, clusterSettings); // this one supports dynamic tracer updates
123+
Settings connectionSettings = Settings.builder()
124+
.put(TcpTransport.CONNECTIONS_PER_NODE_RECOVERY.getKey(), 1)
125+
.put(TcpTransport.CONNECTIONS_PER_NODE_BULK.getKey(), 1)
126+
.put(TcpTransport.CONNECTIONS_PER_NODE_REG.getKey(), 2)
127+
.put(TcpTransport.CONNECTIONS_PER_NODE_STATE.getKey(), 1)
128+
.put(TcpTransport.CONNECTIONS_PER_NODE_PING.getKey(), 1)
129+
.build();
130+
131+
serviceA = buildService("TS_A", version0, clusterSettings, connectionSettings); // this one supports dynamic tracer updates
123132
nodeA = serviceA.getLocalNode();
124-
serviceB = buildService("TS_B", version1, null); // this one doesn't support dynamic tracer updates
133+
serviceB = buildService("TS_B", version1, null, connectionSettings); // this one doesn't support dynamic tracer updates
125134
nodeB = serviceB.getLocalNode();
126135
// wait till all nodes are properly connected and the event has been sent, so tests in this class
127136
// will not get this callback called on the connections done in this setup
@@ -167,8 +176,13 @@ private MockTransportService buildService(final String name, final Version versi
167176
return service;
168177
}
169178

170-
private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
171-
return buildService(name, version, clusterSettings, Settings.EMPTY, true, true);
179+
protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
180+
return buildService(name, version, clusterSettings, Settings.EMPTY);
181+
}
182+
183+
protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
184+
Settings settings) {
185+
return buildService(name, version, clusterSettings, settings, true, true);
172186
}
173187

174188
@Override

test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ protected Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, T
5050
}
5151
};
5252
MockTransportService mockTransportService =
53-
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
53+
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
5454
mockTransportService.start();
5555
return mockTransportService;
5656
}

0 commit comments

Comments
 (0)