Skip to content

Commit 01d2339

Browse files
committed
Invoke response handler on failure to send (#53631)
Today it can happen that a transport message fails to send (for example, because a transport interceptor rejects the request). In this case, the response handler is never invoked, which can lead to necessary cleanups not being performed. There are two ways to handle this. One is to expect every callsite that sends a message to try/catch these exceptions and handle them appropriately. The other is merely to invoke the response handler to handle the exception, which is already equipped to handle transport exceptions.
1 parent 881d0bf commit 01d2339

File tree

3 files changed

+141
-26
lines changed

3 files changed

+141
-26
lines changed

server/src/main/java/org/elasticsearch/transport/TransportService.java

+34-22
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.elasticsearch.core.internal.io.IOUtils;
5050
import org.elasticsearch.node.NodeClosedException;
5151
import org.elasticsearch.tasks.Task;
52-
import org.elasticsearch.tasks.TaskCancelledException;
5352
import org.elasticsearch.tasks.TaskManager;
5453
import org.elasticsearch.threadpool.Scheduler;
5554
import org.elasticsearch.threadpool.ThreadPool;
@@ -577,37 +576,57 @@ public <T extends TransportResponse> TransportFuture<T> submitRequest(DiscoveryN
577576
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
578577
final TransportRequest request,
579578
final TransportResponseHandler<T> handler) {
579+
final Transport.Connection connection;
580580
try {
581-
Transport.Connection connection = getConnection(node);
582-
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
583-
} catch (NodeNotConnectedException ex) {
581+
connection = getConnection(node);
582+
} catch (final NodeNotConnectedException ex) {
584583
// the caller might not handle this so we invoke the handler
585584
handler.handleException(ex);
585+
return;
586586
}
587+
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
587588
}
588589

589590
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
590591
final TransportRequest request,
591592
final TransportRequestOptions options,
592593
TransportResponseHandler<T> handler) {
594+
final Transport.Connection connection;
593595
try {
594-
Transport.Connection connection = getConnection(node);
595-
sendRequest(connection, action, request, options, handler);
596-
} catch (NodeNotConnectedException ex) {
596+
connection = getConnection(node);
597+
} catch (final NodeNotConnectedException ex) {
597598
// the caller might not handle this so we invoke the handler
598599
handler.handleException(ex);
600+
return;
599601
}
602+
sendRequest(connection, action, request, options, handler);
600603
}
601604

605+
/**
606+
* Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
607+
*
608+
* @param connection the connection to send the request on
609+
* @param action the name of the action
610+
* @param request the request
611+
* @param options the options for this request
612+
* @param handler the response handler
613+
* @param <T> the type of the transport response
614+
*/
602615
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
603616
final TransportRequest request,
604617
final TransportRequestOptions options,
605618
TransportResponseHandler<T> handler) {
606619
try {
607620
asyncSender.sendRequest(connection, action, request, options, handler);
608-
} catch (NodeNotConnectedException ex) {
621+
} catch (final Exception ex) {
609622
// the caller might not handle this so we invoke the handler
610-
handler.handleException(ex);
623+
final TransportException te;
624+
if (ex instanceof TransportException) {
625+
te = (TransportException) ex;
626+
} else {
627+
te = new TransportException("failure to send", ex);
628+
}
629+
handler.handleException(te);
611630
}
612631
}
613632

@@ -627,13 +646,15 @@ public final <T extends TransportResponse> void sendChildRequest(final Discovery
627646
final TransportRequest request, final Task parentTask,
628647
final TransportRequestOptions options,
629648
final TransportResponseHandler<T> handler) {
649+
final Transport.Connection connection;
630650
try {
631-
Transport.Connection connection = getConnection(node);
632-
sendChildRequest(connection, action, request, parentTask, options, handler);
633-
} catch (NodeNotConnectedException ex) {
651+
connection = getConnection(node);
652+
} catch (final NodeNotConnectedException ex) {
634653
// the caller might not handle this so we invoke the handler
635654
handler.handleException(ex);
655+
return;
636656
}
657+
sendChildRequest(connection, action, request, parentTask, options, handler);
637658
}
638659

639660
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
@@ -647,16 +668,7 @@ public <T extends TransportResponse> void sendChildRequest(final Transport.Conne
647668
final TransportRequestOptions options,
648669
final TransportResponseHandler<T> handler) {
649670
request.setParentTask(localNode.getId(), parentTask.getId());
650-
try {
651-
sendRequest(connection, action, request, options, handler);
652-
} catch (TaskCancelledException ex) {
653-
// The parent task is already cancelled - just fail the request
654-
handler.handleException(new TransportException(ex));
655-
} catch (NodeNotConnectedException ex) {
656-
// the caller might not handle this so we invoke the handler
657-
handler.handleException(ex);
658-
}
659-
671+
sendRequest(connection, action, request, options, handler);
660672
}
661673

662674
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,13 @@ public static MockNioTransport newMockTransport(Settings settings, Version versi
120120

121121
public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
122122
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
123-
return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
123+
return createNewService(settings, transport, version, threadPool, clusterSettings, taskHeaders, NOOP_TRANSPORT_INTERCEPTOR);
124+
}
125+
126+
public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
127+
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders,
128+
TransportInterceptor interceptor) {
129+
return new MockTransportService(settings, transport, threadPool, interceptor,
124130
boundAddress ->
125131
new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(),
126132
Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version),

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

+100-3
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,15 @@
9393

9494
import static java.util.Collections.emptyMap;
9595
import static java.util.Collections.emptySet;
96-
import static org.elasticsearch.test.ESTestCase.getPortRange;
96+
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
9797
import static org.hamcrest.Matchers.containsString;
9898
import static org.hamcrest.Matchers.empty;
9999
import static org.hamcrest.Matchers.equalTo;
100100
import static org.hamcrest.Matchers.hasToString;
101101
import static org.hamcrest.Matchers.instanceOf;
102+
import static org.hamcrest.Matchers.not;
102103
import static org.hamcrest.Matchers.notNullValue;
104+
import static org.hamcrest.Matchers.nullValue;
103105
import static org.hamcrest.Matchers.startsWith;
104106

105107
public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
@@ -187,7 +189,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
187189
}
188190

189191
private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings,
190-
Settings settings, boolean acceptRequests, boolean doHandshake) {
192+
Settings settings, boolean acceptRequests, boolean doHandshake,
193+
TransportInterceptor interceptor) {
191194
Settings updatedSettings = Settings.builder()
192195
.put(TransportSettings.PORT.getKey(), getPortRange())
193196
.put(settings)
@@ -198,14 +201,19 @@ private MockTransportService buildService(final String name, final Version versi
198201
}
199202
Transport transport = build(updatedSettings, version, clusterSettings, doHandshake);
200203
MockTransportService service = MockTransportService.createNewService(updatedSettings, transport, version, threadPool,
201-
clusterSettings, Collections.emptySet());
204+
clusterSettings, Collections.emptySet(), interceptor);
202205
service.start();
203206
if (acceptRequests) {
204207
service.acceptIncomingRequests();
205208
}
206209
return service;
207210
}
208211

212+
private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings,
213+
Settings settings, boolean acceptRequests, boolean doHandshake) {
214+
return buildService(name, version, clusterSettings, settings, acceptRequests, doHandshake, NOOP_TRANSPORT_INTERCEPTOR);
215+
}
216+
209217
protected MockTransportService buildService(final String name, final Version version, Settings settings) {
210218
return buildService(name, version, null, settings);
211219
}
@@ -2747,6 +2755,95 @@ public void onConnectionClosed(Transport.Connection connection) {
27472755
}
27482756
}
27492757

2758+
// test that the response handler is invoked on a failure to send
2759+
public void testFailToSend() throws InterruptedException {
2760+
final RuntimeException failToSendException;
2761+
if (randomBoolean()) {
2762+
failToSendException = new IllegalStateException("fail to send");
2763+
} else {
2764+
failToSendException = new TransportException("fail to send");
2765+
}
2766+
final TransportInterceptor interceptor = new TransportInterceptor() {
2767+
@Override
2768+
public AsyncSender interceptSender(final AsyncSender sender) {
2769+
return new AsyncSender() {
2770+
@Override
2771+
public <T extends TransportResponse> void sendRequest(
2772+
final Transport.Connection connection,
2773+
final String action,
2774+
final TransportRequest request,
2775+
final TransportRequestOptions options,
2776+
final TransportResponseHandler<T> handler) {
2777+
if ("fail-to-send-action".equals(action)) {
2778+
throw failToSendException;
2779+
} else {
2780+
sender.sendRequest(connection, action, request, options, handler);
2781+
}
2782+
}
2783+
};
2784+
}
2785+
};
2786+
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, null, Settings.EMPTY, true, true, interceptor)) {
2787+
serviceC.start();
2788+
serviceC.acceptIncomingRequests();
2789+
final CountDownLatch latch = new CountDownLatch(1);
2790+
serviceC.connectToNode(
2791+
serviceA.getLocalDiscoNode(),
2792+
ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY),
2793+
new ActionListener<>() {
2794+
@Override
2795+
public void onResponse(final Void v) {
2796+
latch.countDown();
2797+
}
2798+
2799+
@Override
2800+
public void onFailure(final Exception e) {
2801+
fail(e.getMessage());
2802+
}
2803+
});
2804+
latch.await();
2805+
final AtomicReference<TransportException> te = new AtomicReference<>();
2806+
final Transport.Connection connection = serviceC.getConnection(nodeA);
2807+
serviceC.sendRequest(
2808+
connection,
2809+
"fail-to-send-action",
2810+
TransportRequest.Empty.INSTANCE,
2811+
TransportRequestOptions.EMPTY,
2812+
new TransportResponseHandler<TransportResponse>() {
2813+
@Override
2814+
public void handleResponse(final TransportResponse response) {
2815+
fail("handle response should not be invoked");
2816+
}
2817+
2818+
@Override
2819+
public void handleException(final TransportException exp) {
2820+
te.set(exp);
2821+
}
2822+
2823+
@Override
2824+
public String executor() {
2825+
return ThreadPool.Names.SAME;
2826+
}
2827+
2828+
@Override
2829+
public TransportResponse read(final StreamInput in) {
2830+
return TransportResponse.Empty.INSTANCE;
2831+
}
2832+
});
2833+
assertThat(te.get(), not(nullValue()));
2834+
2835+
if (failToSendException instanceof IllegalStateException) {
2836+
assertThat(te.get().getMessage(), equalTo("failure to send"));
2837+
assertThat(te.get().getCause(), instanceOf(IllegalStateException.class));
2838+
assertThat(te.get().getCause().getMessage(), equalTo("fail to send"));
2839+
} else {
2840+
assertThat(te.get().getMessage(), equalTo("fail to send"));
2841+
assertThat(te.get().getCause(), nullValue());
2842+
}
2843+
}
2844+
2845+
}
2846+
27502847
private void closeConnectionChannel(Transport.Connection connection) {
27512848
StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection;
27522849
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection();

0 commit comments

Comments
 (0)