Skip to content

Commit eae757b

Browse files
committed
Improve exceptional behavior in reactive streams
* Errors on the request are now propagated to reactive subscribers instead of just to the request's ListenableFuture * Read timeouts can no longer occur if a reactive streams subscriber has no outstanding request. Note that this does not affect request timeouts - only read timeouts.
1 parent f2f5a84 commit eae757b

File tree

7 files changed

+471
-12
lines changed

7 files changed

+471
-12
lines changed

Diff for: client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ public HttpHandler(AsyncHttpClientConfig config, ChannelManager channelManager,
3939
super(config, channelManager, requestSender);
4040
}
4141

42-
private boolean abortAfterHandlingStatus(//
43-
AsyncHandler<?> handler,
42+
private boolean abortAfterHandlingStatus(AsyncHandler<?> handler,
4443
NettyResponseStatus status) throws Exception {
4544
return handler.onStatusReceived(status) == State.ABORT;
4645
}

Diff for: client/src/main/java/org/asynchttpclient/netty/handler/StreamedResponsePublisher.java

+66
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414

1515
import com.typesafe.netty.HandlerPublisher;
1616
import io.netty.channel.Channel;
17+
import io.netty.channel.ChannelHandlerContext;
1718
import io.netty.util.concurrent.EventExecutor;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.concurrent.atomic.AtomicLong;
1821
import org.asynchttpclient.HttpResponseBodyPart;
1922
import org.asynchttpclient.netty.NettyResponseFuture;
2023
import org.asynchttpclient.netty.channel.ChannelManager;
24+
import org.reactivestreams.Subscriber;
25+
import org.reactivestreams.Subscription;
2126
import org.slf4j.Logger;
2227
import org.slf4j.LoggerFactory;
2328

@@ -28,6 +33,8 @@ public class StreamedResponsePublisher extends HandlerPublisher<HttpResponseBody
2833
private final ChannelManager channelManager;
2934
private final NettyResponseFuture<?> future;
3035
private final Channel channel;
36+
private final AtomicBoolean hasOutstandingRequest = new AtomicBoolean(false);
37+
private Throwable error;
3138

3239
StreamedResponsePublisher(EventExecutor executor, ChannelManager channelManager, NettyResponseFuture<?> future, Channel channel) {
3340
super(executor, HttpResponseBodyPart.class);
@@ -51,7 +58,66 @@ protected void cancelled() {
5158
channelManager.closeChannel(channel);
5259
}
5360

61+
@Override
62+
protected void requestDemand() {
63+
hasOutstandingRequest.set(true);
64+
super.requestDemand();
65+
}
66+
67+
@Override
68+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
69+
hasOutstandingRequest.set(false);
70+
super.channelReadComplete(ctx);
71+
}
72+
73+
@Override
74+
public void subscribe(Subscriber<? super HttpResponseBodyPart> subscriber) {
75+
super.subscribe(new ErrorReplacingSubscriber(subscriber));
76+
}
77+
78+
public boolean hasOutstandingRequest() {
79+
return hasOutstandingRequest.get();
80+
}
81+
5482
NettyResponseFuture<?> future() {
5583
return future;
5684
}
85+
86+
public void setError(Throwable t) {
87+
this.error = t;
88+
}
89+
90+
private class ErrorReplacingSubscriber implements Subscriber<HttpResponseBodyPart> {
91+
92+
Subscriber<? super HttpResponseBodyPart> subscriber;
93+
94+
ErrorReplacingSubscriber(Subscriber<? super HttpResponseBodyPart> subscriber) {
95+
this.subscriber = subscriber;
96+
}
97+
98+
@Override
99+
public void onSubscribe(Subscription s) {
100+
subscriber.onSubscribe(s);
101+
}
102+
103+
@Override
104+
public void onNext(HttpResponseBodyPart httpResponseBodyPart) {
105+
subscriber.onNext(httpResponseBodyPart);
106+
}
107+
108+
@Override
109+
public void onError(Throwable t) {
110+
subscriber.onError(t);
111+
}
112+
113+
@Override
114+
public void onComplete() {
115+
Throwable replacementError = error;
116+
if (replacementError == null) {
117+
subscriber.onComplete();
118+
} else {
119+
subscriber.onError(replacementError);
120+
}
121+
}
122+
}
57123
}

Diff for: client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.asynchttpclient.netty.OnLastHttpContentCallback;
3636
import org.asynchttpclient.netty.SimpleFutureListener;
3737
import org.asynchttpclient.netty.channel.*;
38+
import org.asynchttpclient.netty.handler.StreamedResponsePublisher;
3839
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
3940
import org.asynchttpclient.proxy.ProxyServer;
4041
import org.asynchttpclient.resolver.RequestHostnameResolver;
@@ -462,8 +463,15 @@ private void scheduleReadTimeout(NettyResponseFuture<?> nettyResponseFuture) {
462463

463464
public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) {
464465

465-
if (channel != null && channel.isActive()) {
466-
channelManager.closeChannel(channel);
466+
if (channel != null) {
467+
Object attribute = Channels.getAttribute(future.channel());
468+
if (attribute instanceof StreamedResponsePublisher) {
469+
((StreamedResponsePublisher) attribute).setError(t);
470+
}
471+
472+
if (channel.isActive()) {
473+
channelManager.closeChannel(channel);
474+
}
467475
}
468476

469477
if (!future.isDone()) {

Diff for: client/src/main/java/org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import io.netty.util.Timeout;
1717
import org.asynchttpclient.netty.NettyResponseFuture;
18+
import org.asynchttpclient.netty.channel.Channels;
19+
import org.asynchttpclient.netty.handler.StreamedResponsePublisher;
1820
import org.asynchttpclient.netty.request.NettyRequestSender;
1921
import org.asynchttpclient.util.StringBuilderPool;
2022

@@ -47,7 +49,7 @@ public void run(Timeout timeout) {
4749
long currentReadTimeoutInstant = readTimeout + nettyResponseFuture.getLastTouch();
4850
long durationBeforeCurrentReadTimeout = currentReadTimeoutInstant - now;
4951

50-
if (durationBeforeCurrentReadTimeout <= 0L) {
52+
if (durationBeforeCurrentReadTimeout <= 0L && !isReactiveWithNoOutstandingRequest()) {
5153
// idleConnectTimeout reached
5254
StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to ");
5355
appendRemoteAddress(sb);
@@ -62,4 +64,10 @@ public void run(Timeout timeout) {
6264
timeoutsHolder.startReadTimeout(this);
6365
}
6466
}
67+
68+
private boolean isReactiveWithNoOutstandingRequest() {
69+
Object attribute = Channels.getAttribute(nettyResponseFuture.channel());
70+
return attribute instanceof StreamedResponsePublisher &&
71+
!((StreamedResponsePublisher) attribute).hasOutstandingRequest();
72+
}
6573
}

Diff for: client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownLoadTest.java renamed to client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownloadTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@
3939
import static org.asynchttpclient.Dsl.asyncHttpClient;
4040
import static org.testng.Assert.assertEquals;
4141

42-
public class ReactiveStreamsDownLoadTest {
42+
public class ReactiveStreamsDownloadTest {
4343

44-
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsDownLoadTest.class);
44+
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsDownloadTest.class);
4545

46-
private int serverPort = 8080;
46+
private final int serverPort = 8080;
4747
private File largeFile;
4848
private File smallFile;
4949

@@ -104,7 +104,7 @@ public void onThrowable(Throwable t) {
104104
}
105105

106106
@Override
107-
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
107+
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) {
108108
LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onBodyPartReceived");
109109
throw new AssertionError("Should not have received body part");
110110
}
@@ -115,12 +115,12 @@ public State onStatusReceived(HttpResponseStatus responseStatus) {
115115
}
116116

117117
@Override
118-
public State onHeadersReceived(HttpHeaders headers) throws Exception {
118+
public State onHeadersReceived(HttpHeaders headers) {
119119
return State.CONTINUE;
120120
}
121121

122122
@Override
123-
public SimpleStreamedAsyncHandler onCompleted() throws Exception {
123+
public SimpleStreamedAsyncHandler onCompleted() {
124124
LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onSubscribe");
125125
return this;
126126
}

0 commit comments

Comments
 (0)