Skip to content

Commit 41e3b4a

Browse files
authored
Invoke response handler on failure to send (#53631)
Today it can happen that a transport message fails to send (for example, because a transport interceptor rejects the request). In this case, the response handler is never invoked, which can lead to necessary cleanups not being performed. There are two ways to handle this. One is to expect every callsite that sends a message to try/catch these exceptions and handle them appropriately. The other is merely to invoke the response handler to handle the exception, which is already equipped to handle transport exceptions.
1 parent 87dc720 commit 41e3b4a

File tree

3 files changed

+141
-25
lines changed

3 files changed

+141
-25
lines changed

server/src/main/java/org/elasticsearch/transport/TransportService.java

+34-22
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.elasticsearch.core.internal.io.IOUtils;
4747
import org.elasticsearch.node.NodeClosedException;
4848
import org.elasticsearch.tasks.Task;
49-
import org.elasticsearch.tasks.TaskCancelledException;
5049
import org.elasticsearch.tasks.TaskManager;
5150
import org.elasticsearch.threadpool.Scheduler;
5251
import org.elasticsearch.threadpool.ThreadPool;
@@ -519,37 +518,57 @@ public void removeConnectionListener(TransportConnectionListener listener) {
519518
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
520519
final TransportRequest request,
521520
final TransportResponseHandler<T> handler) {
521+
final Transport.Connection connection;
522522
try {
523-
Transport.Connection connection = getConnection(node);
524-
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
525-
} catch (NodeNotConnectedException ex) {
523+
connection = getConnection(node);
524+
} catch (final NodeNotConnectedException ex) {
526525
// the caller might not handle this so we invoke the handler
527526
handler.handleException(ex);
527+
return;
528528
}
529+
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
529530
}
530531

531532
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
532533
final TransportRequest request,
533534
final TransportRequestOptions options,
534535
TransportResponseHandler<T> handler) {
536+
final Transport.Connection connection;
535537
try {
536-
Transport.Connection connection = getConnection(node);
537-
sendRequest(connection, action, request, options, handler);
538-
} catch (NodeNotConnectedException ex) {
538+
connection = getConnection(node);
539+
} catch (final NodeNotConnectedException ex) {
539540
// the caller might not handle this so we invoke the handler
540541
handler.handleException(ex);
542+
return;
541543
}
544+
sendRequest(connection, action, request, options, handler);
542545
}
543546

547+
/**
548+
* Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
549+
*
550+
* @param connection the connection to send the request on
551+
* @param action the name of the action
552+
* @param request the request
553+
* @param options the options for this request
554+
* @param handler the response handler
555+
* @param <T> the type of the transport response
556+
*/
544557
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
545558
final TransportRequest request,
546559
final TransportRequestOptions options,
547560
TransportResponseHandler<T> handler) {
548561
try {
549562
asyncSender.sendRequest(connection, action, request, options, handler);
550-
} catch (NodeNotConnectedException ex) {
563+
} catch (final Exception ex) {
551564
// the caller might not handle this so we invoke the handler
552-
handler.handleException(ex);
565+
final TransportException te;
566+
if (ex instanceof TransportException) {
567+
te = (TransportException) ex;
568+
} else {
569+
te = new TransportException("failure to send", ex);
570+
}
571+
handler.handleException(te);
553572
}
554573
}
555574

@@ -569,13 +588,15 @@ public final <T extends TransportResponse> void sendChildRequest(final Discovery
569588
final TransportRequest request, final Task parentTask,
570589
final TransportRequestOptions options,
571590
final TransportResponseHandler<T> handler) {
591+
final Transport.Connection connection;
572592
try {
573-
Transport.Connection connection = getConnection(node);
574-
sendChildRequest(connection, action, request, parentTask, options, handler);
575-
} catch (NodeNotConnectedException ex) {
593+
connection = getConnection(node);
594+
} catch (final NodeNotConnectedException ex) {
576595
// the caller might not handle this so we invoke the handler
577596
handler.handleException(ex);
597+
return;
578598
}
599+
sendChildRequest(connection, action, request, parentTask, options, handler);
579600
}
580601

581602
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
@@ -589,16 +610,7 @@ public <T extends TransportResponse> void sendChildRequest(final Transport.Conne
589610
final TransportRequestOptions options,
590611
final TransportResponseHandler<T> handler) {
591612
request.setParentTask(localNode.getId(), parentTask.getId());
592-
try {
593-
sendRequest(connection, action, request, options, handler);
594-
} catch (TaskCancelledException ex) {
595-
// The parent task is already cancelled - just fail the request
596-
handler.handleException(new TransportException(ex));
597-
} catch (NodeNotConnectedException ex) {
598-
// the caller might not handle this so we invoke the handler
599-
handler.handleException(ex);
600-
}
601-
613+
sendRequest(connection, action, request, options, handler);
602614
}
603615

604616
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,13 @@ public static MockNioTransport newMockTransport(Settings settings, Version versi
120120

121121
public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
122122
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
123-
return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
123+
return createNewService(settings, transport, version, threadPool, clusterSettings, taskHeaders, NOOP_TRANSPORT_INTERCEPTOR);
124+
}
125+
126+
public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
127+
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders,
128+
TransportInterceptor interceptor) {
129+
return new MockTransportService(settings, transport, threadPool, interceptor,
124130
boundAddress ->
125131
new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(),
126132
Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version),

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

+100-2
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,15 @@
9494

9595
import static java.util.Collections.emptyMap;
9696
import static java.util.Collections.emptySet;
97+
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
9798
import static org.hamcrest.Matchers.containsString;
9899
import static org.hamcrest.Matchers.empty;
99100
import static org.hamcrest.Matchers.equalTo;
100101
import static org.hamcrest.Matchers.hasToString;
101102
import static org.hamcrest.Matchers.instanceOf;
103+
import static org.hamcrest.Matchers.not;
102104
import static org.hamcrest.Matchers.notNullValue;
105+
import static org.hamcrest.Matchers.nullValue;
103106
import static org.hamcrest.Matchers.startsWith;
104107

105108
public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
@@ -187,7 +190,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
187190
}
188191

189192
private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings,
190-
Settings settings, boolean acceptRequests, boolean doHandshake) {
193+
Settings settings, boolean acceptRequests, boolean doHandshake,
194+
TransportInterceptor interceptor) {
191195
Settings updatedSettings = Settings.builder()
192196
.put(TransportSettings.PORT.getKey(), getPortRange())
193197
.put(settings)
@@ -198,14 +202,19 @@ private MockTransportService buildService(final String name, final Version versi
198202
}
199203
Transport transport = build(updatedSettings, version, clusterSettings, doHandshake);
200204
MockTransportService service = MockTransportService.createNewService(updatedSettings, transport, version, threadPool,
201-
clusterSettings, Collections.emptySet());
205+
clusterSettings, Collections.emptySet(), interceptor);
202206
service.start();
203207
if (acceptRequests) {
204208
service.acceptIncomingRequests();
205209
}
206210
return service;
207211
}
208212

213+
private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings,
214+
Settings settings, boolean acceptRequests, boolean doHandshake) {
215+
return buildService(name, version, clusterSettings, settings, acceptRequests, doHandshake, NOOP_TRANSPORT_INTERCEPTOR);
216+
}
217+
209218
protected MockTransportService buildService(final String name, final Version version, Settings settings) {
210219
return buildService(name, version, null, settings);
211220
}
@@ -2744,6 +2753,95 @@ public void onConnectionClosed(Transport.Connection connection) {
27442753
}
27452754
}
27462755

2756+
// test that the response handler is invoked on a failure to send
2757+
public void testFailToSend() throws InterruptedException {
2758+
final RuntimeException failToSendException;
2759+
if (randomBoolean()) {
2760+
failToSendException = new IllegalStateException("fail to send");
2761+
} else {
2762+
failToSendException = new TransportException("fail to send");
2763+
}
2764+
final TransportInterceptor interceptor = new TransportInterceptor() {
2765+
@Override
2766+
public AsyncSender interceptSender(final AsyncSender sender) {
2767+
return new AsyncSender() {
2768+
@Override
2769+
public <T extends TransportResponse> void sendRequest(
2770+
final Transport.Connection connection,
2771+
final String action,
2772+
final TransportRequest request,
2773+
final TransportRequestOptions options,
2774+
final TransportResponseHandler<T> handler) {
2775+
if ("fail-to-send-action".equals(action)) {
2776+
throw failToSendException;
2777+
} else {
2778+
sender.sendRequest(connection, action, request, options, handler);
2779+
}
2780+
}
2781+
};
2782+
}
2783+
};
2784+
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, null, Settings.EMPTY, true, true, interceptor)) {
2785+
serviceC.start();
2786+
serviceC.acceptIncomingRequests();
2787+
final CountDownLatch latch = new CountDownLatch(1);
2788+
serviceC.connectToNode(
2789+
serviceA.getLocalDiscoNode(),
2790+
ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY),
2791+
new ActionListener<>() {
2792+
@Override
2793+
public void onResponse(final Void v) {
2794+
latch.countDown();
2795+
}
2796+
2797+
@Override
2798+
public void onFailure(final Exception e) {
2799+
fail(e.getMessage());
2800+
}
2801+
});
2802+
latch.await();
2803+
final AtomicReference<TransportException> te = new AtomicReference<>();
2804+
final Transport.Connection connection = serviceC.getConnection(nodeA);
2805+
serviceC.sendRequest(
2806+
connection,
2807+
"fail-to-send-action",
2808+
TransportRequest.Empty.INSTANCE,
2809+
TransportRequestOptions.EMPTY,
2810+
new TransportResponseHandler<TransportResponse>() {
2811+
@Override
2812+
public void handleResponse(final TransportResponse response) {
2813+
fail("handle response should not be invoked");
2814+
}
2815+
2816+
@Override
2817+
public void handleException(final TransportException exp) {
2818+
te.set(exp);
2819+
}
2820+
2821+
@Override
2822+
public String executor() {
2823+
return ThreadPool.Names.SAME;
2824+
}
2825+
2826+
@Override
2827+
public TransportResponse read(final StreamInput in) {
2828+
return TransportResponse.Empty.INSTANCE;
2829+
}
2830+
});
2831+
assertThat(te.get(), not(nullValue()));
2832+
2833+
if (failToSendException instanceof IllegalStateException) {
2834+
assertThat(te.get().getMessage(), equalTo("failure to send"));
2835+
assertThat(te.get().getCause(), instanceOf(IllegalStateException.class));
2836+
assertThat(te.get().getCause().getMessage(), equalTo("fail to send"));
2837+
} else {
2838+
assertThat(te.get().getMessage(), equalTo("fail to send"));
2839+
assertThat(te.get().getCause(), nullValue());
2840+
}
2841+
}
2842+
2843+
}
2844+
27472845
private void closeConnectionChannel(Transport.Connection connection) {
27482846
StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection;
27492847
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection();

0 commit comments

Comments
 (0)