Skip to content

Do not block transport thread on startup #44939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i
final long requestId = message.getRequestId();
final StreamInput stream = message.getStreamInput();
final Version version = message.getVersion();
messageListener.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
messageListener.onRequestReceived(requestId, action);
if (message.isHandshake()) {
handshaker.handleHandshake(version, channel, requestId, stream);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand All @@ -73,7 +73,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
public static final String DIRECT_RESPONSE_PROFILE = ".direct";
public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake";

private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1);
private final AtomicBoolean handleIncomingRequests = new AtomicBoolean();
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
protected final Transport transport;
protected final ConnectionManager connectionManager;
Expand Down Expand Up @@ -285,7 +285,7 @@ protected void doClose() throws IOException {
* this method is called
*/
public final void acceptIncomingRequests() {
blockIncomingRequestsLatch.countDown();
handleIncomingRequests.set(true);
}

public TransportInfo info() {
Expand Down Expand Up @@ -877,11 +877,8 @@ public <Request extends TransportRequest> void registerRequestHandler(String act
*/
@Override
public void onRequestReceived(long requestId, String action) {
try {
blockIncomingRequestsLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("interrupted while waiting for incoming requests block to be removed");
if (handleIncomingRequests.get() == false) {
throw new IllegalStateException("transport not ready yet to handle incoming requests");
}
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] received request", requestId, action);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1995,8 +1995,8 @@ public void onRequestReceived(long requestId, String action) {
DiscoveryNode node =
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
ConnectTransportException exception = expectThrows(ConnectTransportException.class, () -> serviceA.connectToNode(node));
assertTrue(exception.getCause() instanceof TransportException);
assertEquals("handshake failed because connection reset", exception.getCause().getMessage());
assertThat(exception.getCause(), instanceOf(IllegalStateException.class));
assertEquals("handshake failed", exception.getCause().getMessage());
}

ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
Expand Down