From cef8a26f78e1bd3b58c49cddd25ab690cf1c9e87 Mon Sep 17 00:00:00 2001 From: Nathan Miles Date: Thu, 7 May 2020 22:04:24 -0400 Subject: [PATCH] Improve exceptional behavior in reactive streams * Errors on the request are now propagated to reactive subscribers instead of just to the request's ListenableFuture * Read timeouts can no longer occur if a reactive streams subscriber has no outstanding request. Note that this does not affect request timeouts - only read timeouts. --- .../netty/handler/HttpHandler.java | 3 +- .../handler/StreamedResponsePublisher.java | 64 +++ .../netty/request/NettyRequestSender.java | 12 +- .../netty/timeout/ReadTimeoutTimerTask.java | 10 +- ....java => ReactiveStreamsDownloadTest.java} | 12 +- .../ReactiveStreamsErrorTest.java | 378 ++++++++++++++++++ ...est.java => ReactiveStreamsRetryTest.java} | 2 +- 7 files changed, 469 insertions(+), 12 deletions(-) rename client/src/test/java/org/asynchttpclient/reactivestreams/{ReactiveStreamsDownLoadTest.java => ReactiveStreamsDownloadTest.java} (95%) create mode 100644 client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsErrorTest.java rename client/src/test/java/org/asynchttpclient/reactivestreams/{FailingReactiveStreamsTest.java => ReactiveStreamsRetryTest.java} (98%) diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java b/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java index a52f75fc83..dddaeb34cb 100755 --- a/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java @@ -39,8 +39,7 @@ public HttpHandler(AsyncHttpClientConfig config, ChannelManager channelManager, super(config, channelManager, requestSender); } - private boolean abortAfterHandlingStatus(// - AsyncHandler handler, + private boolean abortAfterHandlingStatus(AsyncHandler handler, NettyResponseStatus status) throws Exception { return handler.onStatusReceived(status) == State.ABORT; } diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/StreamedResponsePublisher.java b/client/src/main/java/org/asynchttpclient/netty/handler/StreamedResponsePublisher.java index f4565f91b6..4fb24dbd1a 100644 --- a/client/src/main/java/org/asynchttpclient/netty/handler/StreamedResponsePublisher.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/StreamedResponsePublisher.java @@ -14,10 +14,13 @@ import com.typesafe.netty.HandlerPublisher; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.EventExecutor; import org.asynchttpclient.HttpResponseBodyPart; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.channel.ChannelManager; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +31,8 @@ public class StreamedResponsePublisher extends HandlerPublisher future; private final Channel channel; + private volatile boolean hasOutstandingRequest = false; + private Throwable error; StreamedResponsePublisher(EventExecutor executor, ChannelManager channelManager, NettyResponseFuture future, Channel channel) { super(executor, HttpResponseBodyPart.class); @@ -51,7 +56,66 @@ protected void cancelled() { channelManager.closeChannel(channel); } + @Override + protected void requestDemand() { + hasOutstandingRequest = true; + super.requestDemand(); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + hasOutstandingRequest = false; + super.channelReadComplete(ctx); + } + + @Override + public void subscribe(Subscriber subscriber) { + super.subscribe(new ErrorReplacingSubscriber(subscriber)); + } + + public boolean hasOutstandingRequest() { + return hasOutstandingRequest; + } + NettyResponseFuture future() { return future; } + + public void setError(Throwable t) { + this.error = t; + } + + private class ErrorReplacingSubscriber implements Subscriber { + + private final Subscriber subscriber; + + ErrorReplacingSubscriber(Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void onSubscribe(Subscription s) { + subscriber.onSubscribe(s); + } + + @Override + public void onNext(HttpResponseBodyPart httpResponseBodyPart) { + subscriber.onNext(httpResponseBodyPart); + } + + @Override + public void onError(Throwable t) { + subscriber.onError(t); + } + + @Override + public void onComplete() { + Throwable replacementError = error; + if (replacementError == null) { + subscriber.onComplete(); + } else { + subscriber.onError(replacementError); + } + } + } } diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 32720acc10..4fa0589a84 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -35,6 +35,7 @@ import org.asynchttpclient.netty.OnLastHttpContentCallback; import org.asynchttpclient.netty.SimpleFutureListener; import org.asynchttpclient.netty.channel.*; +import org.asynchttpclient.netty.handler.StreamedResponsePublisher; import org.asynchttpclient.netty.timeout.TimeoutsHolder; import org.asynchttpclient.proxy.ProxyServer; import org.asynchttpclient.resolver.RequestHostnameResolver; @@ -462,8 +463,15 @@ private void scheduleReadTimeout(NettyResponseFuture nettyResponseFuture) { public void abort(Channel channel, NettyResponseFuture future, Throwable t) { - if (channel != null && channel.isActive()) { - channelManager.closeChannel(channel); + if (channel != null) { + Object attribute = Channels.getAttribute(future.channel()); + if (attribute instanceof StreamedResponsePublisher) { + ((StreamedResponsePublisher) attribute).setError(t); + } + + if (channel.isActive()) { + channelManager.closeChannel(channel); + } } if (!future.isDone()) { diff --git a/client/src/main/java/org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java b/client/src/main/java/org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java index 5aebed9f80..0af2d153e0 100755 --- a/client/src/main/java/org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java +++ b/client/src/main/java/org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java @@ -15,6 +15,8 @@ import io.netty.util.Timeout; import org.asynchttpclient.netty.NettyResponseFuture; +import org.asynchttpclient.netty.channel.Channels; +import org.asynchttpclient.netty.handler.StreamedResponsePublisher; import org.asynchttpclient.netty.request.NettyRequestSender; import org.asynchttpclient.util.StringBuilderPool; @@ -47,7 +49,7 @@ public void run(Timeout timeout) { long currentReadTimeoutInstant = readTimeout + nettyResponseFuture.getLastTouch(); long durationBeforeCurrentReadTimeout = currentReadTimeoutInstant - now; - if (durationBeforeCurrentReadTimeout <= 0L) { + if (durationBeforeCurrentReadTimeout <= 0L && !isReactiveWithNoOutstandingRequest()) { // idleConnectTimeout reached StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to "); appendRemoteAddress(sb); @@ -62,4 +64,10 @@ public void run(Timeout timeout) { timeoutsHolder.startReadTimeout(this); } } + + private boolean isReactiveWithNoOutstandingRequest() { + Object attribute = Channels.getAttribute(nettyResponseFuture.channel()); + return attribute instanceof StreamedResponsePublisher && + !((StreamedResponsePublisher) attribute).hasOutstandingRequest(); + } } diff --git a/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownLoadTest.java b/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownloadTest.java similarity index 95% rename from client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownLoadTest.java rename to client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownloadTest.java index 9a782bfcfb..b6f2b2fc65 100644 --- a/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownLoadTest.java +++ b/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownloadTest.java @@ -39,11 +39,11 @@ import static org.asynchttpclient.Dsl.asyncHttpClient; import static org.testng.Assert.assertEquals; -public class ReactiveStreamsDownLoadTest { +public class ReactiveStreamsDownloadTest { - private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsDownLoadTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsDownloadTest.class); - private int serverPort = 8080; + private final int serverPort = 8080; private File largeFile; private File smallFile; @@ -104,7 +104,7 @@ public void onThrowable(Throwable t) { } @Override - public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { + public State onBodyPartReceived(HttpResponseBodyPart bodyPart) { LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onBodyPartReceived"); throw new AssertionError("Should not have received body part"); } @@ -115,12 +115,12 @@ public State onStatusReceived(HttpResponseStatus responseStatus) { } @Override - public State onHeadersReceived(HttpHeaders headers) throws Exception { + public State onHeadersReceived(HttpHeaders headers) { return State.CONTINUE; } @Override - public SimpleStreamedAsyncHandler onCompleted() throws Exception { + public SimpleStreamedAsyncHandler onCompleted() { LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onSubscribe"); return this; } diff --git a/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsErrorTest.java b/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsErrorTest.java new file mode 100644 index 0000000000..d95973a0eb --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsErrorTest.java @@ -0,0 +1,378 @@ +package org.asynchttpclient.reactivestreams; + +import io.netty.handler.codec.http.HttpHeaders; +import org.asynchttpclient.AbstractBasicTest; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.HttpResponseBodyPart; +import org.asynchttpclient.HttpResponseStatus; +import org.asynchttpclient.exception.RemotelyClosedException; +import org.asynchttpclient.handler.StreamedAsyncHandler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; +import static org.testng.Assert.*; + +public class ReactiveStreamsErrorTest extends AbstractBasicTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsErrorTest.class); + + private static final byte[] BODY_CHUNK = "someBytes".getBytes(); + + private AsyncHttpClient client; + private ServletResponseHandler servletResponseHandler; + + @BeforeTest + public void initClient() { + client = asyncHttpClient(config() + .setMaxRequestRetry(0) + .setRequestTimeout(3_000) + .setReadTimeout(1_000)); + } + + @AfterTest + public void closeClient() throws Throwable { + client.close(); + } + + @Override + public AbstractHandler configureHandler() throws Exception { + return new AbstractHandler() { + @Override + public void handle(String target, Request r, HttpServletRequest request, HttpServletResponse response) { + try { + servletResponseHandler.handle(response); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } + + @Test + public void timeoutWithNoStatusLineSent() throws Throwable { + try { + execute(response -> Thread.sleep(5_000), bodyPublisher -> {}); + fail("Request should have timed out"); + } catch (ExecutionException e) { + expectReadTimeout(e.getCause()); + } + } + + @Test + public void neverSubscribingToResponseBodyHitsRequestTimeout() throws Throwable { + try { + execute(response -> { + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + Thread.sleep(500); + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + + response.getOutputStream().close(); + }, bodyPublisher -> {}); + + fail("Request should have timed out"); + } catch (ExecutionException e) { + expectRequestTimeout(e.getCause()); + } + } + + @Test + public void readTimeoutInMiddleOfBody() throws Throwable { + ServletResponseHandler responseHandler = response -> { + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + Thread.sleep(500); + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + Thread.sleep(5_000); + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + response.getOutputStream().close(); + }; + + try { + execute(responseHandler, bodyPublisher -> bodyPublisher.subscribe(new ManualRequestSubscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + })); + fail("Request should have timed out"); + } catch (ExecutionException e) { + expectReadTimeout(e.getCause()); + } + } + + @Test + public void notRequestingForLongerThanReadTimeoutDoesNotCauseTimeout() throws Throwable { + ServletResponseHandler responseHandler = response -> { + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + Thread.sleep(100); + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + response.getOutputStream().close(); + }; + + ManualRequestSubscriber subscriber = new ManualRequestSubscriber() { + @Override + public void onSubscribe(Subscription s) { + super.onSubscribe(s); + new Thread(() -> { + try { + // chunk 1 + s.request(1); + + // there will be no read for longer than the read timeout + Thread.sleep(1_500); + + // read the rest + s.request(Long.MAX_VALUE); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).start(); + } + }; + + execute(responseHandler, bodyPublisher -> bodyPublisher.subscribe(subscriber)); + + subscriber.await(); + + assertEquals(subscriber.elements.size(), 2); + } + + @Test + public void readTimeoutCancelsBodyStream() throws Throwable { + ServletResponseHandler responseHandler = response -> { + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + Thread.sleep(2_000); + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + response.getOutputStream().close(); + }; + + ManualRequestSubscriber subscriber = new ManualRequestSubscriber() { + @Override + public void onSubscribe(Subscription s) { + super.onSubscribe(s); + s.request(Long.MAX_VALUE); + } + }; + + try { + execute(responseHandler, bodyPublisher -> bodyPublisher.subscribe(subscriber)); + fail("Request should have timed out"); + } catch (ExecutionException e) { + expectReadTimeout(e.getCause()); + } + + subscriber.await(); + + assertEquals(subscriber.elements.size(), 1); + } + + @Test + public void requestTimeoutCancelsBodyStream() throws Throwable { + ServletResponseHandler responseHandler = response -> { + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + Thread.sleep(900); + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + Thread.sleep(900); + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + Thread.sleep(900); + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + Thread.sleep(900); + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + response.getOutputStream().close(); + }; + + ManualRequestSubscriber subscriber = new ManualRequestSubscriber() { + @Override + public void onSubscribe(Subscription subscription) { + super.onSubscribe(subscription); + subscription.request(Long.MAX_VALUE); + } + }; + + try { + execute(responseHandler, bodyPublisher -> bodyPublisher.subscribe(subscriber)); + fail("Request should have timed out"); + } catch (ExecutionException e) { + expectRequestTimeout(e.getCause()); + } + + subscriber.await(); + + expectRequestTimeout(subscriber.error); + assertEquals(subscriber.elements.size(), 4); + } + + @Test + public void ioErrorsArePropagatedToSubscriber() throws Throwable { + ServletResponseHandler responseHandler = response -> { + response.setContentLength(100); + + response.getOutputStream().write(BODY_CHUNK); + response.getOutputStream().flush(); + + response.getOutputStream().close(); + }; + + ManualRequestSubscriber subscriber = new ManualRequestSubscriber() { + @Override + public void onSubscribe(Subscription subscription) { + super.onSubscribe(subscription); + subscription.request(Long.MAX_VALUE); + } + }; + + Throwable error = null; + try { + execute(responseHandler, bodyPublisher -> bodyPublisher.subscribe(subscriber)); + fail("Request should have failed"); + } catch (ExecutionException e) { + error = e.getCause(); + assertTrue(error instanceof RemotelyClosedException, "Unexpected error: " + e); + } + + subscriber.await(); + + assertEquals(subscriber.error, error); + assertEquals(subscriber.elements.size(), 1); + } + + private void expectReadTimeout(Throwable e) { + assertTrue(e instanceof TimeoutException, + "Expected a read timeout, but got " + e); + assertTrue(e.getMessage().contains("Read timeout"), + "Expected read timeout, but was " + e); + } + + private void expectRequestTimeout(Throwable e) { + assertTrue(e instanceof TimeoutException, + "Expected a request timeout, but got " + e); + assertTrue(e.getMessage().contains("Request timeout"), + "Expected request timeout, but was " + e); + } + + private void execute(ServletResponseHandler responseHandler, + Consumer> bodyConsumer) throws Exception { + this.servletResponseHandler = responseHandler; + client.prepareGet(getTargetUrl()) + .execute(new SimpleStreamer(bodyConsumer)) + .get(3_500, TimeUnit.MILLISECONDS); + } + + private interface ServletResponseHandler { + void handle(HttpServletResponse response) throws Exception; + } + + private static class SimpleStreamer implements StreamedAsyncHandler { + + final Consumer> bodyStreamHandler; + + private SimpleStreamer(Consumer> bodyStreamHandler) { + this.bodyStreamHandler = bodyStreamHandler; + } + + @Override + public State onStream(Publisher publisher) { + LOGGER.debug("Got stream"); + bodyStreamHandler.accept(publisher); + return State.CONTINUE; + } + + @Override + public State onStatusReceived(HttpResponseStatus responseStatus) { + LOGGER.debug("Got status line"); + return State.CONTINUE; + } + + @Override + public State onHeadersReceived(HttpHeaders headers) { + LOGGER.debug("Got headers"); + return State.CONTINUE; + } + + @Override + public State onBodyPartReceived(HttpResponseBodyPart bodyPart) { + throw new IllegalStateException(); + } + + @Override + public void onThrowable(Throwable t) { + LOGGER.debug("Caught error", t); + } + + @Override + public Void onCompleted() { + LOGGER.debug("Completed request"); + return null; + } + } + + private static class ManualRequestSubscriber implements Subscriber { + private final List elements = Collections.synchronizedList(new ArrayList<>()); + private final CountDownLatch latch = new CountDownLatch(1); + private volatile Throwable error; + + @Override + public void onSubscribe(Subscription subscription) { + LOGGER.debug("SimpleSubscriber onSubscribe"); + } + + @Override + public void onNext(HttpResponseBodyPart t) { + LOGGER.debug("SimpleSubscriber onNext"); + elements.add(t); + } + + @Override + public void onError(Throwable error) { + LOGGER.debug("SimpleSubscriber onError"); + this.error = error; + latch.countDown(); + } + + @Override + public void onComplete() { + LOGGER.debug("SimpleSubscriber onComplete"); + latch.countDown(); + } + + void await() throws InterruptedException { + if (!latch.await(3_500, TimeUnit.MILLISECONDS)) { + fail("Request should have finished"); + } + } + } +} diff --git a/client/src/test/java/org/asynchttpclient/reactivestreams/FailingReactiveStreamsTest.java b/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsRetryTest.java similarity index 98% rename from client/src/test/java/org/asynchttpclient/reactivestreams/FailingReactiveStreamsTest.java rename to client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsRetryTest.java index 860678b35a..d09b16d037 100644 --- a/client/src/test/java/org/asynchttpclient/reactivestreams/FailingReactiveStreamsTest.java +++ b/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsRetryTest.java @@ -32,7 +32,7 @@ import static org.asynchttpclient.test.TestUtils.LARGE_IMAGE_BYTES; import static org.testng.Assert.assertTrue; -public class FailingReactiveStreamsTest extends AbstractBasicTest { +public class ReactiveStreamsRetryTest extends AbstractBasicTest { @Test public void testRetryingOnFailingStream() throws Exception {