diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index f8ee85728ee29..e1d2fce51aafa 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -156,9 +156,9 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i final long requestId = message.getRequestId(); final StreamInput stream = message.getStreamInput(); final Version version = message.getVersion(); - messageListener.onRequestReceived(requestId, action); TransportChannel transportChannel = null; try { + messageListener.onRequestReceived(requestId, action); if (message.isHandshake()) { handshaker.handleHandshake(version, channel, requestId, stream); } else { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 0dcef3b7e2575..ad63ae518e767 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -61,8 +61,8 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -73,7 +73,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran public static final String DIRECT_RESPONSE_PROFILE = ".direct"; public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; - private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1); + private final AtomicBoolean handleIncomingRequests = new AtomicBoolean(); private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); protected final Transport transport; protected final ConnectionManager connectionManager; @@ -285,7 +285,7 @@ protected void doClose() throws IOException { * this method is called */ public final void acceptIncomingRequests() { - blockIncomingRequestsLatch.countDown(); + handleIncomingRequests.set(true); } public TransportInfo info() { @@ -877,11 +877,8 @@ public void registerRequestHandler(String act */ @Override public void onRequestReceived(long requestId, String action) { - try { - blockIncomingRequestsLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("interrupted while waiting for incoming requests block to be removed"); + if (handleIncomingRequests.get() == false) { + throw new IllegalStateException("transport not ready yet to handle incoming requests"); } if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received request", requestId, action); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 492de65de8a3c..df84e977976e8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1995,8 +1995,8 @@ public void onRequestReceived(long requestId, String action) { DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); ConnectTransportException exception = expectThrows(ConnectTransportException.class, () -> serviceA.connectToNode(node)); - assertTrue(exception.getCause() instanceof TransportException); - assertEquals("handshake failed because connection reset", exception.getCause().getMessage()); + assertThat(exception.getCause(), instanceOf(IllegalStateException.class)); + assertEquals("handshake failed", exception.getCause().getMessage()); } ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);