From 2222b03f08144975daa8e1f9c59de62ffa2aeef8 Mon Sep 17 00:00:00 2001 From: Rachid Ben Moussa Date: Tue, 9 Feb 2021 20:42:44 +0100 Subject: [PATCH 1/3] Unit test that reproduces the NPE when connection is reset by peer --- .../netty/NettyConnectionResetByPeerTest.java | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java diff --git a/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java b/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java new file mode 100644 index 0000000000..72106d980e --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java @@ -0,0 +1,107 @@ +package org.asynchttpclient.netty; + +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.RequestBuilder; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.Arrays; +import java.util.concurrent.Exchanger; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.not; +import static org.testng.Assert.assertTrue; + +public class NettyConnectionResetByPeerTest { + + private String resettingServerAddress; + + @BeforeTest + public void setUp() { + resettingServerAddress = createResettingServer(); + } + + @Test + public void testAsyncHttpClientConnectionResetByPeer() throws InterruptedException { + try { + DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder() + .setRequestTimeout(500) + .build(); + new DefaultAsyncHttpClient(config).executeRequest( + new RequestBuilder("GET").setUrl(resettingServerAddress) + ).get(); + } catch (ExecutionException e) { + Throwable ex = e.getCause(); + assertThat(ex, is(not(instanceOf(TimeoutException.class)))); + assertThat(ex, is(instanceOf(IOException.class))); +// assertTrue(ex.getMessage().equalsIgnoreCase("Connection reset by peer")); + } + } + + private static String createResettingServer() { + return createServer(sock -> { + try (Socket socket = sock) { + socket.setSoLinger(true, 0); + InputStream inputStream = socket.getInputStream(); + //to not eliminate read + OutputStream os = new OutputStream() { + @Override + public void write(int b) { + // Do nothing + } + }; + os.write(startRead(inputStream)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + private static String createServer(Consumer handler) { + Exchanger portHolder = new Exchanger<>(); + Thread t = new Thread(() -> { + try (ServerSocket ss = new ServerSocket(0)) { + portHolder.exchange(ss.getLocalPort()); + while (true) { + handler.accept(ss.accept()); + } + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException(e); + } + }); + t.setDaemon(true); + t.start(); + return tryGetAddress(portHolder); + } + + private static String tryGetAddress(Exchanger portHolder) { + try { + return "http://localhost:" + portHolder.exchange(0); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + private static byte[] startRead(InputStream inputStream) throws IOException { + byte[] buffer = new byte[4]; + int length = inputStream.read(buffer); + return Arrays.copyOf(buffer, length); + } + +} From 37adafe2c38f59d913b9ddfcf2309085c0ea6322 Mon Sep 17 00:00:00 2001 From: Rachid Ben Moussa Date: Tue, 9 Feb 2021 21:34:40 +0100 Subject: [PATCH 2/3] Changed future.channel() to channel which is guarded to be non null --- .../netty/request/NettyRequestSender.java | 2 +- .../netty/NettyConnectionResetByPeerTest.java | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 4fa0589a84..cd184fdb92 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -464,7 +464,7 @@ private void scheduleReadTimeout(NettyResponseFuture nettyResponseFuture) { public void abort(Channel channel, NettyResponseFuture future, Throwable t) { if (channel != null) { - Object attribute = Channels.getAttribute(future.channel()); + Object attribute = Channels.getAttribute(channel); if (attribute instanceof StreamedResponsePublisher) { ((StreamedResponsePublisher) attribute).setError(t); } diff --git a/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java b/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java index 72106d980e..ada452fb27 100644 --- a/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java +++ b/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java @@ -41,12 +41,11 @@ public void testAsyncHttpClientConnectionResetByPeer() throws InterruptedExcepti .build(); new DefaultAsyncHttpClient(config).executeRequest( new RequestBuilder("GET").setUrl(resettingServerAddress) - ).get(); + ) + .get(); } catch (ExecutionException e) { Throwable ex = e.getCause(); - assertThat(ex, is(not(instanceOf(TimeoutException.class)))); assertThat(ex, is(instanceOf(IOException.class))); -// assertTrue(ex.getMessage().equalsIgnoreCase("Connection reset by peer")); } } @@ -79,7 +78,8 @@ private static String createServer(Consumer handler) { } } catch (Exception e) { if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + Thread.currentThread() + .interrupt(); } throw new RuntimeException(e); } @@ -93,7 +93,8 @@ private static String tryGetAddress(Exchanger portHolder) { try { return "http://localhost:" + portHolder.exchange(0); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + Thread.currentThread() + .interrupt(); throw new RuntimeException(e); } } From 57fc8b24694986c19ec4ab4e991ee51e387ef0c5 Mon Sep 17 00:00:00 2001 From: Rachid Ben Moussa Date: Wed, 10 Feb 2021 21:14:25 +0100 Subject: [PATCH 3/3] Higher request timeout because the test seems to fail regularly on Github Also the duration of running the build in Github varies --- .../asynchttpclient/netty/NettyConnectionResetByPeerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java b/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java index ada452fb27..6a3dcc9ce1 100644 --- a/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java +++ b/client/src/test/java/org/asynchttpclient/netty/NettyConnectionResetByPeerTest.java @@ -37,7 +37,7 @@ public void setUp() { public void testAsyncHttpClientConnectionResetByPeer() throws InterruptedException { try { DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder() - .setRequestTimeout(500) + .setRequestTimeout(1500) .build(); new DefaultAsyncHttpClient(config).executeRequest( new RequestBuilder("GET").setUrl(resettingServerAddress)