Skip to content

Commit 987ddcb

Browse files
Fix potential race during TcpTransport close (#39031)
Fixed two potential causes for leaked threads during tests: 1. When adding a channel to serverChannels, we add it under a monitor that we do not use when reading from it. This is potentially unsafe if there is no other happens-before relationship ensuring the safety of this. 2. Long-shot but if the thread pool was shutdown before entering this code, we would silently forget about closing server channels so added assert. Strengthened the locking to ensure that once we stop the transport, no new server channels can be made. Relates to CI failure issue: #37543
1 parent 62ed1aa commit 987ddcb

File tree

1 file changed

+19
-18
lines changed

1 file changed

+19
-18
lines changed

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

+19-18
Original file line numberDiff line numberDiff line change
@@ -388,28 +388,28 @@ private InetSocketAddress bindToPort(final String name, final InetAddress hostAd
388388
PortsRange portsRange = new PortsRange(port);
389389
final AtomicReference<Exception> lastException = new AtomicReference<>();
390390
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
391-
boolean success = portsRange.iterate(portNumber -> {
392-
try {
393-
TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
394-
synchronized (serverChannels) {
395-
List<TcpServerChannel> list = serverChannels.get(name);
396-
if (list == null) {
397-
list = new ArrayList<>();
398-
serverChannels.put(name, list);
399-
}
400-
list.add(channel);
391+
closeLock.writeLock().lock();
392+
try {
393+
if (lifecycle.initialized() == false && lifecycle.started() == false) {
394+
throw new IllegalStateException("transport has been stopped");
395+
}
396+
boolean success = portsRange.iterate(portNumber -> {
397+
try {
398+
TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
399+
serverChannels.computeIfAbsent(name, k -> new ArrayList<>()).add(channel);
401400
boundSocket.set(channel.getLocalAddress());
401+
} catch (Exception e) {
402+
lastException.set(e);
403+
return false;
402404
}
403-
} catch (Exception e) {
404-
lastException.set(e);
405-
return false;
405+
return true;
406+
});
407+
if (!success) {
408+
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
406409
}
407-
return true;
408-
});
409-
if (!success) {
410-
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
410+
} finally {
411+
closeLock.writeLock().unlock();
411412
}
412-
413413
if (logger.isDebugEnabled()) {
414414
logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
415415
}
@@ -553,6 +553,7 @@ protected final void doClose() {
553553
protected final void doStop() {
554554
final CountDownLatch latch = new CountDownLatch(1);
555555
// make sure we run it on another thread than a possible IO handler thread
556+
assert threadPool.generic().isShutdown() == false : "Must stop transport before terminating underlying threadpool";
556557
threadPool.generic().execute(() -> {
557558
closeLock.writeLock().lock();
558559
try {

0 commit comments

Comments
 (0)