Skip to content

Commit d2fc371

Browse files
rachid-oRachid Ben Moussa
and
Rachid Ben Moussa
authoredMar 27, 2021
Fix for NPE when connection is reset by peer (#1771)
* Unit test that reproduces the NPE when connection is reset by peer * Changed future.channel() to channel which is guarded to be non null * Higher request timeout because the test seems to fail regularly on Github Also the duration of running the build in Github varies Co-authored-by: Rachid Ben Moussa <[email protected]>
1 parent bd7b5bd commit d2fc371

File tree

2 files changed

+109
-1
lines changed

2 files changed

+109
-1
lines changed
 

‎client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ private void scheduleReadTimeout(NettyResponseFuture<?> nettyResponseFuture) {
464464
public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) {
465465

466466
if (channel != null) {
467-
Object attribute = Channels.getAttribute(future.channel());
467+
Object attribute = Channels.getAttribute(channel);
468468
if (attribute instanceof StreamedResponsePublisher) {
469469
((StreamedResponsePublisher) attribute).setError(t);
470470
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package org.asynchttpclient.netty;
2+
3+
import org.asynchttpclient.DefaultAsyncHttpClient;
4+
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
5+
import org.asynchttpclient.RequestBuilder;
6+
import org.testng.annotations.BeforeTest;
7+
import org.testng.annotations.Test;
8+
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import java.io.OutputStream;
12+
import java.net.ServerSocket;
13+
import java.net.Socket;
14+
import java.net.SocketException;
15+
import java.util.Arrays;
16+
import java.util.concurrent.Exchanger;
17+
import java.util.concurrent.ExecutionException;
18+
import java.util.concurrent.TimeoutException;
19+
import java.util.function.Consumer;
20+
21+
import static org.hamcrest.CoreMatchers.instanceOf;
22+
import static org.hamcrest.CoreMatchers.is;
23+
import static org.hamcrest.MatcherAssert.assertThat;
24+
import static org.hamcrest.Matchers.not;
25+
import static org.testng.Assert.assertTrue;
26+
27+
public class NettyConnectionResetByPeerTest {
28+
29+
private String resettingServerAddress;
30+
31+
@BeforeTest
32+
public void setUp() {
33+
resettingServerAddress = createResettingServer();
34+
}
35+
36+
@Test
37+
public void testAsyncHttpClientConnectionResetByPeer() throws InterruptedException {
38+
try {
39+
DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder()
40+
.setRequestTimeout(1500)
41+
.build();
42+
new DefaultAsyncHttpClient(config).executeRequest(
43+
new RequestBuilder("GET").setUrl(resettingServerAddress)
44+
)
45+
.get();
46+
} catch (ExecutionException e) {
47+
Throwable ex = e.getCause();
48+
assertThat(ex, is(instanceOf(IOException.class)));
49+
}
50+
}
51+
52+
private static String createResettingServer() {
53+
return createServer(sock -> {
54+
try (Socket socket = sock) {
55+
socket.setSoLinger(true, 0);
56+
InputStream inputStream = socket.getInputStream();
57+
//to not eliminate read
58+
OutputStream os = new OutputStream() {
59+
@Override
60+
public void write(int b) {
61+
// Do nothing
62+
}
63+
};
64+
os.write(startRead(inputStream));
65+
} catch (IOException e) {
66+
throw new RuntimeException(e);
67+
}
68+
});
69+
}
70+
71+
private static String createServer(Consumer<Socket> handler) {
72+
Exchanger<Integer> portHolder = new Exchanger<>();
73+
Thread t = new Thread(() -> {
74+
try (ServerSocket ss = new ServerSocket(0)) {
75+
portHolder.exchange(ss.getLocalPort());
76+
while (true) {
77+
handler.accept(ss.accept());
78+
}
79+
} catch (Exception e) {
80+
if (e instanceof InterruptedException) {
81+
Thread.currentThread()
82+
.interrupt();
83+
}
84+
throw new RuntimeException(e);
85+
}
86+
});
87+
t.setDaemon(true);
88+
t.start();
89+
return tryGetAddress(portHolder);
90+
}
91+
92+
private static String tryGetAddress(Exchanger<Integer> portHolder) {
93+
try {
94+
return "http://localhost:" + portHolder.exchange(0);
95+
} catch (InterruptedException e) {
96+
Thread.currentThread()
97+
.interrupt();
98+
throw new RuntimeException(e);
99+
}
100+
}
101+
102+
private static byte[] startRead(InputStream inputStream) throws IOException {
103+
byte[] buffer = new byte[4];
104+
int length = inputStream.read(buffer);
105+
return Arrays.copyOf(buffer, length);
106+
}
107+
108+
}

0 commit comments

Comments
 (0)
Please sign in to comment.