Skip to content

Commit 211d31b

Browse files
committed
Client: Wrap synchronous exceptions
In the past the Low Level REST Client was super careful not to wrap any exceptions that it throws from synchronous calls so that callers can catch the exceptions and work with them. The trouble with that is that the exceptions are originally thrown on the async thread pool and then transfered back into calling thread. That means that the stack trace of the exception doesn't have the calling method which is *super* *ultra* confusing. This change always wraps exceptions transfered from the async thread pool so that the stack trace of the thrown exception contains the caller's stack. It tries to preserve the type of the throw exception but this is quite a fiddly thing to get right. We have to catch every type of exception that we want to preserve, wrap with the same type and rethrow. I've preserved the types of all exceptions that we had tests mentioning but no other exceptions. The other exceptions are either wrapped in `IOException` or `RuntimeException`. Closes elastic#28399
1 parent 861d80f commit 211d31b

File tree

6 files changed

+292
-99
lines changed

6 files changed

+292
-99
lines changed

client/rest/src/main/java/org/elasticsearch/client/ResponseException.java

+10
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ public ResponseException(Response response) throws IOException {
3939
this.response = response;
4040
}
4141

42+
/**
43+
* Wrap a {@linkplain ResponseException} with another one with the current
44+
* stack trace. This is used during synchronous calls so that the caller
45+
* ends up in the stack trace of the exception thrown.
46+
*/
47+
ResponseException(ResponseException e) throws IOException {
48+
super(e.getMessage(), e);
49+
this.response = e.getResponse();
50+
}
51+
4252
private static String buildMessage(Response response) throws IOException {
4353
String message = String.format(Locale.ROOT,
4454
"method [%s], host [%s], URI [%s], status line [%s]",

client/rest/src/main/java/org/elasticsearch/client/RestClient.java

+65-37
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.http.client.protocol.HttpClientContext;
3939
import org.apache.http.client.utils.URIBuilder;
4040
import org.apache.http.concurrent.FutureCallback;
41+
import org.apache.http.conn.ConnectTimeoutException;
4142
import org.apache.http.impl.auth.BasicScheme;
4243
import org.apache.http.impl.client.BasicAuthCache;
4344
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
@@ -47,6 +48,7 @@
4748

4849
import java.io.Closeable;
4950
import java.io.IOException;
51+
import java.net.SocketTimeoutException;
5052
import java.net.URI;
5153
import java.net.URISyntaxException;
5254
import java.util.ArrayList;
@@ -218,7 +220,8 @@ public Response performRequest(String method, String endpoint, Map<String, Strin
218220
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
219221
Header... headers) throws IOException {
220222
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
221-
performRequestAsync(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, listener, headers);
223+
performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory,
224+
listener, headers);
222225
return listener.get();
223226
}
224227

@@ -293,43 +296,50 @@ public void performRequestAsync(String method, String endpoint, Map<String, Stri
293296
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
294297
ResponseListener responseListener, Header... headers) {
295298
try {
296-
Objects.requireNonNull(params, "params must not be null");
297-
Map<String, String> requestParams = new HashMap<>(params);
298-
//ignore is a special parameter supported by the clients, shouldn't be sent to es
299-
String ignoreString = requestParams.remove("ignore");
300-
Set<Integer> ignoreErrorCodes;
301-
if (ignoreString == null) {
302-
if (HttpHead.METHOD_NAME.equals(method)) {
303-
//404 never causes error if returned for a HEAD request
304-
ignoreErrorCodes = Collections.singleton(404);
305-
} else {
306-
ignoreErrorCodes = Collections.emptySet();
307-
}
299+
performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory,
300+
responseListener, headers);
301+
} catch (Exception e) {
302+
responseListener.onFailure(e);
303+
}
304+
}
305+
306+
void performRequestAsyncNoCatch(String method, String endpoint, Map<String, String> params,
307+
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
308+
ResponseListener responseListener, Header... headers) {
309+
Objects.requireNonNull(params, "params must not be null");
310+
Map<String, String> requestParams = new HashMap<>(params);
311+
//ignore is a special parameter supported by the clients, shouldn't be sent to es
312+
String ignoreString = requestParams.remove("ignore");
313+
Set<Integer> ignoreErrorCodes;
314+
if (ignoreString == null) {
315+
if (HttpHead.METHOD_NAME.equals(method)) {
316+
//404 never causes error if returned for a HEAD request
317+
ignoreErrorCodes = Collections.singleton(404);
308318
} else {
309-
String[] ignoresArray = ignoreString.split(",");
310-
ignoreErrorCodes = new HashSet<>();
311-
if (HttpHead.METHOD_NAME.equals(method)) {
312-
//404 never causes error if returned for a HEAD request
313-
ignoreErrorCodes.add(404);
314-
}
315-
for (String ignoreCode : ignoresArray) {
316-
try {
317-
ignoreErrorCodes.add(Integer.valueOf(ignoreCode));
318-
} catch (NumberFormatException e) {
319-
throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e);
320-
}
319+
ignoreErrorCodes = Collections.emptySet();
320+
}
321+
} else {
322+
String[] ignoresArray = ignoreString.split(",");
323+
ignoreErrorCodes = new HashSet<>();
324+
if (HttpHead.METHOD_NAME.equals(method)) {
325+
//404 never causes error if returned for a HEAD request
326+
ignoreErrorCodes.add(404);
327+
}
328+
for (String ignoreCode : ignoresArray) {
329+
try {
330+
ignoreErrorCodes.add(Integer.valueOf(ignoreCode));
331+
} catch (NumberFormatException e) {
332+
throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e);
321333
}
322334
}
323-
URI uri = buildUri(pathPrefix, endpoint, requestParams);
324-
HttpRequestBase request = createHttpRequest(method, uri, entity);
325-
setHeaders(request, headers);
326-
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
327-
long startTime = System.nanoTime();
328-
performRequestAsync(startTime, nextHost(), request, ignoreErrorCodes, httpAsyncResponseConsumerFactory,
329-
failureTrackingResponseListener);
330-
} catch (Exception e) {
331-
responseListener.onFailure(e);
332335
}
336+
URI uri = buildUri(pathPrefix, endpoint, requestParams);
337+
HttpRequestBase request = createHttpRequest(method, uri, entity);
338+
setHeaders(request, headers);
339+
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
340+
long startTime = System.nanoTime();
341+
performRequestAsync(startTime, nextHost(), request, ignoreErrorCodes, httpAsyncResponseConsumerFactory,
342+
failureTrackingResponseListener);
333343
}
334344

335345
private void performRequestAsync(final long startTime, final HostTuple<Iterator<HttpHost>> hostTuple, final HttpRequestBase request,
@@ -674,12 +684,30 @@ Response get() throws IOException {
674684
e.addSuppressed(exception);
675685
throw e;
676686
}
677-
//try and leave the exception untouched as much as possible but we don't want to just add throws Exception clause everywhere
687+
/*
688+
* Wrap and rethrow whatever exception we received, copying the type
689+
* where possible so the synchronous API looks as much as possible
690+
* like the asynchronous API. We wrap the exception so that the caller's
691+
* signature shows up in any exception we throw.
692+
*/
693+
if (exception instanceof ResponseException) {
694+
throw new ResponseException((ResponseException) exception);
695+
}
696+
if (exception instanceof ConnectTimeoutException) {
697+
ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage());
698+
e.initCause(exception);
699+
throw e;
700+
}
701+
if (exception instanceof SocketTimeoutException) {
702+
SocketTimeoutException e = new SocketTimeoutException(exception.getMessage());
703+
e.initCause(exception);
704+
throw e;
705+
}
678706
if (exception instanceof IOException) {
679-
throw (IOException) exception;
707+
throw new IOException(exception.getMessage(), exception);
680708
}
681709
if (exception instanceof RuntimeException){
682-
throw (RuntimeException) exception;
710+
throw new RuntimeException(exception.getMessage(), exception);
683711
}
684712
throw new RuntimeException("error while performing request", exception);
685713
}

client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java

+51-14
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.http.message.BasicStatusLine;
3636
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
3737
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
38+
import org.junit.After;
3839
import org.junit.Before;
3940
import org.mockito.invocation.InvocationOnMock;
4041
import org.mockito.stubbing.Answer;
@@ -44,6 +45,8 @@
4445
import java.util.Collections;
4546
import java.util.HashSet;
4647
import java.util.Set;
48+
import java.util.concurrent.ExecutorService;
49+
import java.util.concurrent.Executors;
4750
import java.util.concurrent.Future;
4851

4952
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
@@ -66,6 +69,7 @@
6669
*/
6770
public class RestClientMultipleHostsTests extends RestClientTestCase {
6871

72+
private ExecutorService exec = Executors.newFixedThreadPool(1);
6973
private RestClient restClient;
7074
private HttpHost[] httpHosts;
7175
private HostsTrackingFailureListener failureListener;
@@ -79,23 +83,28 @@ public void createRestClient() throws IOException {
7983
@Override
8084
public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable {
8185
HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0];
82-
HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest();
83-
HttpHost httpHost = requestProducer.getTarget();
86+
final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest();
87+
final HttpHost httpHost = requestProducer.getTarget();
8488
HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2];
8589
assertThat(context.getAuthCache().get(httpHost), instanceOf(BasicScheme.class));
86-
FutureCallback<HttpResponse> futureCallback = (FutureCallback<HttpResponse>) invocationOnMock.getArguments()[3];
90+
final FutureCallback<HttpResponse> futureCallback = (FutureCallback<HttpResponse>) invocationOnMock.getArguments()[3];
8791
//return the desired status code or exception depending on the path
88-
if (request.getURI().getPath().equals("/soe")) {
89-
futureCallback.failed(new SocketTimeoutException(httpHost.toString()));
90-
} else if (request.getURI().getPath().equals("/coe")) {
91-
futureCallback.failed(new ConnectTimeoutException(httpHost.toString()));
92-
} else if (request.getURI().getPath().equals("/ioe")) {
93-
futureCallback.failed(new IOException(httpHost.toString()));
94-
} else {
95-
int statusCode = Integer.parseInt(request.getURI().getPath().substring(1));
96-
StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, "");
97-
futureCallback.completed(new BasicHttpResponse(statusLine));
98-
}
92+
exec.execute(new Runnable() {
93+
@Override
94+
public void run() {
95+
if (request.getURI().getPath().equals("/soe")) {
96+
futureCallback.failed(new SocketTimeoutException(httpHost.toString()));
97+
} else if (request.getURI().getPath().equals("/coe")) {
98+
futureCallback.failed(new ConnectTimeoutException(httpHost.toString()));
99+
} else if (request.getURI().getPath().equals("/ioe")) {
100+
futureCallback.failed(new IOException(httpHost.toString()));
101+
} else {
102+
int statusCode = Integer.parseInt(request.getURI().getPath().substring(1));
103+
StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, "");
104+
futureCallback.completed(new BasicHttpResponse(statusLine));
105+
}
106+
}
107+
});
99108
return null;
100109
}
101110
});
@@ -108,6 +117,14 @@ public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Thr
108117
restClient = new RestClient(httpClient, 10000, new Header[0], httpHosts, null, failureListener);
109118
}
110119

120+
/**
121+
* Shutdown the executor so we don't leak threads into other test runs.
122+
*/
123+
@After
124+
public void shutdownExec() {
125+
exec.shutdown();
126+
}
127+
111128
public void testRoundRobinOkStatusCodes() throws IOException {
112129
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
113130
for (int i = 0; i < numIters; i++) {
@@ -163,6 +180,11 @@ public void testRoundRobinRetryErrors() throws IOException {
163180
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint);
164181
fail("request should have failed");
165182
} catch(ResponseException e) {
183+
/*
184+
* Unwrap the top level failure that was added so the stack trace contains
185+
* the caller. It wraps the exception that contains the failed hosts.
186+
*/
187+
e = (ResponseException) e.getCause();
166188
Set<HttpHost> hostsSet = new HashSet<>();
167189
Collections.addAll(hostsSet, httpHosts);
168190
//first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each
@@ -183,6 +205,11 @@ public void testRoundRobinRetryErrors() throws IOException {
183205
} while(e != null);
184206
assertEquals("every host should have been used but some weren't: " + hostsSet, 0, hostsSet.size());
185207
} catch(IOException e) {
208+
/*
209+
* Unwrap the top level failure that was added so the stack trace contains
210+
* the caller. It wraps the exception that contains the failed hosts.
211+
*/
212+
e = (IOException) e.getCause();
186213
Set<HttpHost> hostsSet = new HashSet<>();
187214
Collections.addAll(hostsSet, httpHosts);
188215
//first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each
@@ -221,6 +248,11 @@ public void testRoundRobinRetryErrors() throws IOException {
221248
failureListener.assertCalled(response.getHost());
222249
assertEquals(0, e.getSuppressed().length);
223250
} catch(IOException e) {
251+
/*
252+
* Unwrap the top level failure that was added so the stack trace contains
253+
* the caller. It wraps the exception that contains the failed hosts.
254+
*/
255+
e = (IOException) e.getCause();
224256
HttpHost httpHost = HttpHost.create(e.getMessage());
225257
assertTrue("host [" + httpHost + "] not found, most likely used multiple times", hostsSet.remove(httpHost));
226258
//after the first request, all hosts are blacklisted, a single one gets resurrected each time
@@ -263,6 +295,11 @@ public void testRoundRobinRetryErrors() throws IOException {
263295
assertThat(response.getHost(), equalTo(selectedHost));
264296
failureListener.assertCalled(selectedHost);
265297
} catch(IOException e) {
298+
/*
299+
* Unwrap the top level failure that was added so the stack trace contains
300+
* the caller. It wraps the exception that contains the failed hosts.
301+
*/
302+
e = (IOException) e.getCause();
266303
HttpHost httpHost = HttpHost.create(e.getMessage());
267304
assertThat(httpHost, equalTo(selectedHost));
268305
failureListener.assertCalled(selectedHost);

0 commit comments

Comments
 (0)