Skip to content

Commit 49825cf

Browse files
authored
Close connection manager on current thread in RemoteClusterConnection (#44805)
The problem is that RemoteClusterConnection closes the connection manager asynchronously, which races with the threadpool being shutdown at the end of the test. Closes #44339 Closes #44610
1 parent e5cc3eb commit 49825cf

File tree

4 files changed

+24
-29
lines changed

4 files changed

+24
-29
lines changed

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ public class ConnectionManager implements Closeable {
5858
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
5959
@Override
6060
protected void closeInternal() {
61+
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
62+
while (iterator.hasNext()) {
63+
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
64+
try {
65+
IOUtils.closeWhileHandlingException(next.getValue());
66+
} finally {
67+
iterator.remove();
68+
}
69+
}
6170
closeLatch.countDown();
6271
}
6372
};
@@ -249,22 +258,23 @@ public Set<DiscoveryNode> connectedNodes() {
249258

250259
@Override
251260
public void close() {
261+
internalClose(true);
262+
}
263+
264+
public void closeNoBlock() {
265+
internalClose(false);
266+
}
267+
268+
private void internalClose(boolean waitForPendingConnections) {
252269
assert Transports.assertNotTransportThread("Closing ConnectionManager");
253270
if (closing.compareAndSet(false, true)) {
254271
connectingRefCounter.decRef();
255-
try {
256-
closeLatch.await();
257-
} catch (InterruptedException e) {
258-
Thread.currentThread().interrupt();
259-
throw new IllegalStateException(e);
260-
}
261-
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
262-
while (iterator.hasNext()) {
263-
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
272+
if (waitForPendingConnections) {
264273
try {
265-
IOUtils.closeWhileHandlingException(next.getValue());
266-
} finally {
267-
iterator.remove();
274+
closeLatch.await();
275+
} catch (InterruptedException e) {
276+
Thread.currentThread().interrupt();
277+
throw new IllegalStateException(e);
268278
}
269279
}
270280
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,7 @@ Transport.Connection getConnection() {
332332
@Override
333333
public void close() throws IOException {
334334
IOUtils.close(connectHandler);
335-
// In the ConnectionManager we wait on connections being closed.
336-
threadPool.generic().execute(connectionManager::close);
335+
connectionManager.closeNoBlock();
337336
}
338337

339338
public boolean isClosed() {

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import org.elasticsearch.test.transport.StubbableTransport;
6565
import org.elasticsearch.threadpool.TestThreadPool;
6666
import org.elasticsearch.threadpool.ThreadPool;
67-
import org.junit.Before;
6867

6968
import java.io.IOException;
7069
import java.net.InetAddress;
@@ -94,14 +93,14 @@
9493
import static java.util.Collections.emptySet;
9594
import static org.hamcrest.Matchers.allOf;
9695
import static org.hamcrest.Matchers.containsString;
96+
import static org.hamcrest.Matchers.endsWith;
9797
import static org.hamcrest.Matchers.equalTo;
9898
import static org.hamcrest.Matchers.instanceOf;
9999
import static org.hamcrest.Matchers.iterableWithSize;
100100
import static org.hamcrest.Matchers.not;
101101
import static org.hamcrest.Matchers.notNullValue;
102102
import static org.hamcrest.Matchers.sameInstance;
103103
import static org.hamcrest.Matchers.startsWith;
104-
import static org.hamcrest.Matchers.endsWith;
105104

106105
public class RemoteClusterConnectionTests extends ESTestCase {
107106

@@ -114,13 +113,6 @@ public void tearDown() throws Exception {
114113
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
115114
}
116115

117-
@Override
118-
@Before
119-
public void setUp() throws Exception {
120-
super.setUp();
121-
assumeFalse("https://github.com/elastic/elasticsearch/issues/44339", System.getProperty("os.name").contains("Win"));
122-
}
123-
124116
private MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes, Version version) {
125117
return startTransport(id, knownNodes, version, threadPool);
126118
}

x-pack/plugin/ccr/build.gradle

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import org.elasticsearch.gradle.OS
2-
31
evaluationDependsOn(xpackModule('core'))
42

53
apply plugin: 'elasticsearch.esplugin'
@@ -24,8 +22,6 @@ task internalClusterTestNoSecurityManager(type: Test) {
2422
include noSecurityManagerITClasses
2523
systemProperty 'es.set.netty.runtime.available.processors', 'false'
2624
systemProperty 'tests.security.manager', 'false'
27-
// Disable tests on windows https://github.com/elastic/elasticsearch/issues/44610
28-
onlyIf { OS.WINDOWS.equals(OS.current()) == false }
2925
}
3026

3127
// Instead we create a separate task to run the
@@ -38,8 +34,6 @@ task internalClusterTest(type: Test) {
3834
include '**/*IT.class'
3935
exclude noSecurityManagerITClasses
4036
systemProperty 'es.set.netty.runtime.available.processors', 'false'
41-
// Disable tests on windows https://github.com/elastic/elasticsearch/issues/44610
42-
onlyIf { OS.WINDOWS.equals(OS.current()) == false }
4337
}
4438

4539
check.dependsOn internalClusterTest

0 commit comments

Comments
 (0)