From 2409335aa72fb2a2f7f91b6999db8cfe56985a98 Mon Sep 17 00:00:00 2001
From: Jason Joo <hblzxsj@gmail.com>
Date: Mon, 10 Mar 2025 10:46:53 +0800
Subject: [PATCH] fix: inappropriate connection reuse when using HTTP proxy
 There is an extra CONNECT request needs to send before the real request to
 the HTTP proxy and the 2nd request only happens if the CONNECT request
 succeeds. When CONNECT failed, the connection should be dropped as it's not
 in connected state.

Signed-off-by: Jason Joo <hblzxsj@gmail.com>
---
 .../netty/handler/HttpHandler.java            | 10 ++-
 .../netty/request/NettyRequestSender.java     | 36 ++++++----
 .../asynchttpclient/proxy/HttpsProxyTest.java | 72 ++++++++++++++++++-
 3 files changed, 101 insertions(+), 17 deletions(-)

diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java b/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java
index dddaeb34cb..feef13a59a 100755
--- a/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java
+++ b/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java
@@ -28,6 +28,7 @@
 import org.asynchttpclient.netty.channel.ChannelManager;
 import org.asynchttpclient.netty.channel.Channels;
 import org.asynchttpclient.netty.request.NettyRequestSender;
+import org.asynchttpclient.util.HttpConstants.ResponseStatusCodes;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -39,9 +40,12 @@ public HttpHandler(AsyncHttpClientConfig config, ChannelManager channelManager,
     super(config, channelManager, requestSender);
   }
 
-  private boolean abortAfterHandlingStatus(AsyncHandler<?> handler,
+  private boolean abortAfterHandlingStatus(AsyncHandler<?> handler, HttpMethod httpMethod,
                                            NettyResponseStatus status) throws Exception {
-    return handler.onStatusReceived(status) == State.ABORT;
+    // For non-200 response of a CONNECT request, it's still unconnected.
+    // We need to either close the connection or reuse it but send CONNECT request again.
+    // The former one is easier or we have to attach more state to Channel.
+    return handler.onStatusReceived(status) == State.ABORT || httpMethod == HttpMethod.CONNECT && status.getStatusCode() != ResponseStatusCodes.OK_200;
   }
 
   private boolean abortAfterHandlingHeaders(AsyncHandler<?> handler,
@@ -75,7 +79,7 @@ private void handleHttpResponse(final HttpResponse response, final Channel chann
     HttpHeaders responseHeaders = response.headers();
 
     if (!interceptors.exitAfterIntercept(channel, future, handler, response, status, responseHeaders)) {
-      boolean abort = abortAfterHandlingStatus(handler, status) || //
+      boolean abort = abortAfterHandlingStatus(handler, httpRequest.method(), status) || //
               abortAfterHandlingHeaders(handler, responseHeaders) || //
               abortAfterHandlingReactiveStreams(channel, future, handler);
 
diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java
index aed08b7a70..48e493fa67 100755
--- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java
+++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java
@@ -83,6 +83,13 @@ public NettyRequestSender(AsyncHttpClientConfig config,
     this.clientState = clientState;
     requestFactory = new NettyRequestFactory(config);
   }
+  
+  // needConnect returns true if the request is secure/websocket and a HTTP proxy is set
+  private boolean needConnect(final Request request, final ProxyServer proxyServer) {
+      return proxyServer != null
+              && proxyServer.getProxyType().isHttp()
+              && (request.getUri().isSecured() || request.getUri().isWebSocket());
+  }
 
   public <T> ListenableFuture<T> sendRequest(final Request request,
                                              final AsyncHandler<T> asyncHandler,
@@ -97,10 +104,7 @@ public <T> ListenableFuture<T> sendRequest(final Request request,
     ProxyServer proxyServer = getProxyServer(config, request);
 
     // WebSockets use connect tunneling to work with proxies
-    if (proxyServer != null
-            && proxyServer.getProxyType().isHttp()
-            && (request.getUri().isSecured() || request.getUri().isWebSocket())
-            && !isConnectAlreadyDone(request, future)) {
+    if (needConnect(request, proxyServer) && !isConnectAlreadyDone(request, future)) {
       // Proxy with HTTPS or WebSocket: CONNECT for sure
       if (future != null && future.isConnectAllowed()) {
         // Perform CONNECT
@@ -117,6 +121,8 @@ public <T> ListenableFuture<T> sendRequest(final Request request,
 
   private boolean isConnectAlreadyDone(Request request, NettyResponseFuture<?> future) {
     return future != null
+            // If the channel can't be reused or closed, a CONNECT is still required
+            && future.isReuseChannel() && Channels.isChannelActive(future.channel())
             && future.getNettyRequest() != null
             && future.getNettyRequest().getHttpRequest().method() == HttpMethod.CONNECT
             && !request.getMethod().equals(CONNECT);
@@ -132,15 +138,19 @@ private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request reque
                                                                      NettyResponseFuture<T> future,
                                                                      ProxyServer proxyServer,
                                                                      boolean performConnectRequest) {
-
-    NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,
-            performConnectRequest);
-
-    Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
-
-    return Channels.isChannelActive(channel)
-            ? sendRequestWithOpenChannel(newFuture, asyncHandler, channel)
-            : sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
+      Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
+      if (Channels.isChannelActive(channel)) {
+          NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future,
+                  proxyServer, performConnectRequest);
+          return sendRequestWithOpenChannel(newFuture, asyncHandler, channel);
+      } else {
+          // A new channel is not expected when performConnectRequest is false. We need to
+          // revisit the condition of sending
+          // the CONNECT request to the new channel.
+          NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future,
+                  proxyServer, needConnect(request, proxyServer));
+          return sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
+      }
   }
 
   /**
diff --git a/client/src/test/java/org/asynchttpclient/proxy/HttpsProxyTest.java b/client/src/test/java/org/asynchttpclient/proxy/HttpsProxyTest.java
index a8a1e8d3d3..fce69d1fe1 100644
--- a/client/src/test/java/org/asynchttpclient/proxy/HttpsProxyTest.java
+++ b/client/src/test/java/org/asynchttpclient/proxy/HttpsProxyTest.java
@@ -13,8 +13,10 @@
 package org.asynchttpclient.proxy;
 
 import org.asynchttpclient.*;
+import org.asynchttpclient.proxy.ProxyServer.Builder;
 import org.asynchttpclient.request.body.generator.ByteArrayBodyGenerator;
 import org.asynchttpclient.test.EchoHandler;
+import org.asynchttpclient.util.HttpConstants;
 import org.eclipse.jetty.proxy.ConnectHandler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -23,11 +25,21 @@
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import io.netty.handler.codec.http.DefaultHttpHeaders;
+
 import static org.asynchttpclient.Dsl.*;
 import static org.asynchttpclient.test.TestUtils.LARGE_IMAGE_BYTES;
 import static org.asynchttpclient.test.TestUtils.addHttpConnector;
 import static org.asynchttpclient.test.TestUtils.addHttpsConnector;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 
 /**
  * Proxy usage tests.
@@ -37,7 +49,7 @@ public class HttpsProxyTest extends AbstractBasicTest {
   private Server server2;
 
   public AbstractHandler configureHandler() throws Exception {
-    return new ConnectHandler();
+    return new ProxyHandler();
   }
 
   @BeforeClass(alwaysRun = true)
@@ -129,4 +141,62 @@ public void testPooledConnectionsWithProxy() throws Exception {
       assertEquals(r2.getStatusCode(), 200);
     }
   }
+  
+  @Test
+  public void testFailedConnectWithProxy() throws Exception {
+      try (AsyncHttpClient asyncHttpClient = asyncHttpClient(config().setFollowRedirect(true).setUseInsecureTrustManager(true).setKeepAlive(true))) {
+          Builder proxyServer = proxyServer("localhost", port1);
+          proxyServer.setCustomHeaders(r -> new DefaultHttpHeaders().set(ProxyHandler.HEADER_FORBIDDEN, "1"));
+          RequestBuilder rb = get(getTargetUrl2()).setProxyServer(proxyServer);
+
+          Response response1 = asyncHttpClient.executeRequest(rb.build()).get();
+          assertEquals(403, response1.getStatusCode());
+
+          Response response2 = asyncHttpClient.executeRequest(rb.build()).get();
+          assertEquals(403, response2.getStatusCode());
+
+          Response response3 = asyncHttpClient.executeRequest(rb.build()).get();
+          assertEquals(403, response3.getStatusCode());
+      }
+  }
+
+  @Test
+  public void testClosedConnectionWithProxy() throws Exception {
+      try (AsyncHttpClient asyncHttpClient = asyncHttpClient(
+              config().setFollowRedirect(true).setUseInsecureTrustManager(true).setKeepAlive(true))) {
+          Builder proxyServer = proxyServer("localhost", port1);
+          proxyServer.setCustomHeaders(r -> new DefaultHttpHeaders().set(ProxyHandler.HEADER_FORBIDDEN, "2"));
+          RequestBuilder rb = get(getTargetUrl2()).setProxyServer(proxyServer);
+
+          assertThrows(ExecutionException.class, () -> asyncHttpClient.executeRequest(rb.build()).get());
+          assertThrows(ExecutionException.class, () -> asyncHttpClient.executeRequest(rb.build()).get());
+          assertThrows(ExecutionException.class, () -> asyncHttpClient.executeRequest(rb.build()).get());
+      }
+  }
+  
+  public static class ProxyHandler extends ConnectHandler {
+      final static String HEADER_FORBIDDEN = "X-REJECT-REQUEST";
+
+      @Override
+      public void handle(String s, org.eclipse.jetty.server.Request r, HttpServletRequest request,
+              HttpServletResponse response) throws ServletException, IOException {
+          if (HttpConstants.Methods.CONNECT.equalsIgnoreCase(request.getMethod())) {
+              String headerValue = request.getHeader(HEADER_FORBIDDEN);
+              if (headerValue == null) {
+                  headerValue = "";
+              }
+              switch (headerValue) {
+              case "1":
+                  response.setStatus(HttpServletResponse.SC_FORBIDDEN);
+                  r.setHandled(true);
+                  return;
+              case "2":
+                  r.getHttpChannel().getConnection().close();
+                  r.setHandled(true);
+                  return;
+              }
+          }
+          super.handle(s, r, request, response);
+      }
+  }
 }