From 2ba57d7b1e899ce96e6a3e6acb337d784712a2ee Mon Sep 17 00:00:00 2001
From: Fabian Fett <fabianfett@apple.com>
Date: Wed, 22 Sep 2021 18:54:31 +0200
Subject: [PATCH 1/2] [HTTP1Connection] Add download streaming backpressure
 test

---
 .../HTTP1ConnectionTests+XCTest.swift         |   1 +
 .../HTTP1ConnectionTests.swift                | 165 ++++++++++++++++++
 2 files changed, 166 insertions(+)

diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift
index 05844a517..35c510e4f 100644
--- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift
+++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift
@@ -32,6 +32,7 @@ extension HTTP1ConnectionTests {
             ("testConnectionClosesOnCloseHeader", testConnectionClosesOnCloseHeader),
             ("testConnectionClosesOnRandomlyAppearingCloseHeader", testConnectionClosesOnRandomlyAppearingCloseHeader),
             ("testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader", testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader),
+            ("testDownloadStreamingBackpressure", testDownloadStreamingBackpressure),
         ]
     }
 }
diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
index bb99454d6..9e27b842d 100644
--- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
+++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
@@ -364,6 +364,171 @@ class HTTP1ConnectionTests: XCTestCase {
         XCTAssertEqual(connectionDelegate.hitConnectionClosed, 1)
         XCTAssertEqual(httpBin.activeConnections, 0)
     }
+
+    // In order to test backpressure we need to make sure that reads will not happen
+    // until the backpressure promise is succeeded. Since we cannot guarantee when
+    // messages will be delivered to a client pipeline and we need this test to be
+    // fast (no waiting for arbitrary amounts of time), we do the following.
+    // First, we enforce NIO to send us only 1 byte at a time. Then we send a message
+    // of 4 bytes. This will guarantee that if we see first byte of the message, other
+    // bytes a ready to be read as well. This will allow us to test if subsequent reads
+    // are waiting for backpressure promise.
+    func testDownloadStreamingBackpressure() {
+        class BackpressureTestDelegate: HTTPClientResponseDelegate {
+            typealias Response = Void
+
+            var _reads = 0
+            var _channel: Channel?
+
+            let lock: Lock
+            let backpressurePromise: EventLoopPromise<Void>
+            let optionsApplied: EventLoopPromise<Void>
+            let messageReceived: EventLoopPromise<Void>
+
+            init(eventLoop: EventLoop) {
+                self.lock = Lock()
+                self.backpressurePromise = eventLoop.makePromise()
+                self.optionsApplied = eventLoop.makePromise()
+                self.messageReceived = eventLoop.makePromise()
+            }
+
+            var reads: Int {
+                return self.lock.withLock {
+                    self._reads
+                }
+            }
+
+            func willExecuteOnChannel(_ channel: Channel) {
+                self.lock.withLockVoid {
+                    self._channel = channel
+                }
+            }
+
+            func didReceiveHead(task: HTTPClient.Task<Void>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
+                // This is to force NIO to send only 1 byte at a time.
+                guard let channel = self.lock.withLock({ self._channel }) else {
+                    preconditionFailure("Expected to have a channel at this point")
+                }
+
+                let future = channel.setOption(ChannelOptions.maxMessagesPerRead, value: 1).flatMap {
+                    channel.setOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 1))
+                }
+                future.cascade(to: self.optionsApplied)
+                return future
+            }
+
+            func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
+                // We count a number of reads received.
+                self.lock.withLockVoid {
+                    self._reads += 1
+                }
+                // We need to notify the test when first byte of the message is arrived.
+                self.messageReceived.succeed(())
+                return self.backpressurePromise.futureResult
+            }
+
+            func didFinishRequest(task: HTTPClient.Task<Response>) throws {}
+        }
+
+        final class WriteAfterFutureSucceedsHandler: ChannelInboundHandler {
+            typealias InboundIn = HTTPServerRequestPart
+            typealias OutboundOut = HTTPServerResponsePart
+
+            let bodyFuture: EventLoopFuture<Void>
+            let endFuture: EventLoopFuture<Void>
+
+            init(bodyFuture: EventLoopFuture<Void>, endFuture: EventLoopFuture<Void>) {
+                self.bodyFuture = bodyFuture
+                self.endFuture = endFuture
+            }
+
+            func channelRead(context: ChannelHandlerContext, data: NIOAny) {
+                switch self.unwrapInboundIn(data) {
+                case .head:
+                    let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok)
+                    context.writeAndFlush(wrapOutboundOut(.head(head)), promise: nil)
+                case .body:
+                    // ignore
+                    break
+                case .end:
+                    self.bodyFuture.hop(to: context.eventLoop).whenSuccess {
+                        let buffer = context.channel.allocator.buffer(string: "1234")
+                        context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
+                    }
+
+                    self.endFuture.hop(to: context.eventLoop).whenSuccess {
+                        context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
+                    }
+                }
+            }
+        }
+
+        let logger = Logger(label: "test")
+
+        // cannot test with NIOTS as `maxMessagesPerRead` is not supported
+        let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
+        defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
+        let requestEventLoop = eventLoopGroup.next()
+        let backpressureDelegate = BackpressureTestDelegate(eventLoop: requestEventLoop)
+
+        let httpBin = HTTPBin { _ in
+            WriteAfterFutureSucceedsHandler(
+                bodyFuture: backpressureDelegate.optionsApplied.futureResult,
+                endFuture: backpressureDelegate.backpressurePromise.futureResult
+            )
+        }
+        defer { XCTAssertNoThrow(try httpBin.shutdown()) }
+
+        var maybeChannel: Channel?
+        XCTAssertNoThrow(maybeChannel = try ClientBootstrap(group: eventLoopGroup)
+            .connect(host: "localhost", port: httpBin.port)
+            .wait())
+        guard let channel = maybeChannel else { return XCTFail("Expected to have a channel at this point") }
+        let connectionDelegate = MockConnectionDelegate()
+        var maybeConnection: HTTP1Connection?
+        XCTAssertNoThrow(maybeConnection = try channel.eventLoop.submit { try HTTP1Connection.start(
+            channel: channel,
+            connectionID: 0,
+            delegate: connectionDelegate,
+            configuration: .init(),
+            logger: logger
+        ) }.wait())
+        guard let connection = maybeConnection else { return XCTFail("Expected to have a connection at this point") }
+
+        var maybeRequestBag: RequestBag<BackpressureTestDelegate>?
+
+        XCTAssertNoThrow(maybeRequestBag = try RequestBag(
+            request: HTTPClient.Request(url: "http://localhost:\(httpBin.port)/custom"),
+            eventLoopPreference: .delegate(on: requestEventLoop),
+            task: .init(eventLoop: requestEventLoop, logger: logger),
+            redirectHandler: nil,
+            connectionDeadline: .now() + .seconds(30),
+            requestOptions: .forTests(),
+            delegate: backpressureDelegate
+        ))
+        guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }
+        backpressureDelegate.willExecuteOnChannel(connection.channel)
+
+        connection.executeRequest(requestBag)
+
+        let requestFuture = requestBag.task.futureResult
+
+        // We need to wait for channel options that limit NIO to sending only one byte at a time.
+        XCTAssertNoThrow(try backpressureDelegate.optionsApplied.futureResult.wait())
+
+        // Send 4 bytes, but only one should be received until the backpressure promise is succeeded.
+
+        // Now we wait until message is delivered to client channel pipeline
+        XCTAssertNoThrow(try backpressureDelegate.messageReceived.futureResult.wait())
+        XCTAssertEqual(backpressureDelegate.reads, 1)
+
+        // Succeed the backpressure promise.
+        backpressureDelegate.backpressurePromise.succeed(())
+        XCTAssertNoThrow(try requestFuture.wait())
+
+        // At this point all other bytes should be delivered.
+        XCTAssertEqual(backpressureDelegate.reads, 4)
+    }
 }
 
 class MockHTTP1ConnectionDelegate: HTTP1ConnectionDelegate {

From 4653c80715540cf9f75cdfab9a2996fb7698906e Mon Sep 17 00:00:00 2001
From: Fabian Fett <fabianfett@apple.com>
Date: Thu, 23 Sep 2021 17:31:14 +0200
Subject: [PATCH 2/2] Make test easier.

---
 .../HTTP1ConnectionTests.swift                | 29 ++++---------------
 1 file changed, 6 insertions(+), 23 deletions(-)

diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
index 9e27b842d..6fc8f3e94 100644
--- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
+++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
@@ -382,13 +382,11 @@ class HTTP1ConnectionTests: XCTestCase {
 
             let lock: Lock
             let backpressurePromise: EventLoopPromise<Void>
-            let optionsApplied: EventLoopPromise<Void>
             let messageReceived: EventLoopPromise<Void>
 
             init(eventLoop: EventLoop) {
                 self.lock = Lock()
                 self.backpressurePromise = eventLoop.makePromise()
-                self.optionsApplied = eventLoop.makePromise()
                 self.messageReceived = eventLoop.makePromise()
             }
 
@@ -405,16 +403,7 @@ class HTTP1ConnectionTests: XCTestCase {
             }
 
             func didReceiveHead(task: HTTPClient.Task<Void>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
-                // This is to force NIO to send only 1 byte at a time.
-                guard let channel = self.lock.withLock({ self._channel }) else {
-                    preconditionFailure("Expected to have a channel at this point")
-                }
-
-                let future = channel.setOption(ChannelOptions.maxMessagesPerRead, value: 1).flatMap {
-                    channel.setOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 1))
-                }
-                future.cascade(to: self.optionsApplied)
-                return future
+                return task.futureResult.eventLoop.makeSucceededVoidFuture()
             }
 
             func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
@@ -434,11 +423,9 @@ class HTTP1ConnectionTests: XCTestCase {
             typealias InboundIn = HTTPServerRequestPart
             typealias OutboundOut = HTTPServerResponsePart
 
-            let bodyFuture: EventLoopFuture<Void>
             let endFuture: EventLoopFuture<Void>
 
-            init(bodyFuture: EventLoopFuture<Void>, endFuture: EventLoopFuture<Void>) {
-                self.bodyFuture = bodyFuture
+            init(endFuture: EventLoopFuture<Void>) {
                 self.endFuture = endFuture
             }
 
@@ -451,10 +438,8 @@ class HTTP1ConnectionTests: XCTestCase {
                     // ignore
                     break
                 case .end:
-                    self.bodyFuture.hop(to: context.eventLoop).whenSuccess {
-                        let buffer = context.channel.allocator.buffer(string: "1234")
-                        context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
-                    }
+                    let buffer = context.channel.allocator.buffer(string: "1234")
+                    context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
 
                     self.endFuture.hop(to: context.eventLoop).whenSuccess {
                         context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
@@ -473,7 +458,6 @@ class HTTP1ConnectionTests: XCTestCase {
 
         let httpBin = HTTPBin { _ in
             WriteAfterFutureSucceedsHandler(
-                bodyFuture: backpressureDelegate.optionsApplied.futureResult,
                 endFuture: backpressureDelegate.backpressurePromise.futureResult
             )
         }
@@ -481,6 +465,8 @@ class HTTP1ConnectionTests: XCTestCase {
 
         var maybeChannel: Channel?
         XCTAssertNoThrow(maybeChannel = try ClientBootstrap(group: eventLoopGroup)
+            .channelOption(ChannelOptions.maxMessagesPerRead, value: 1)
+            .channelOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 1))
             .connect(host: "localhost", port: httpBin.port)
             .wait())
         guard let channel = maybeChannel else { return XCTFail("Expected to have a channel at this point") }
@@ -513,9 +499,6 @@ class HTTP1ConnectionTests: XCTestCase {
 
         let requestFuture = requestBag.task.futureResult
 
-        // We need to wait for channel options that limit NIO to sending only one byte at a time.
-        XCTAssertNoThrow(try backpressureDelegate.optionsApplied.futureResult.wait())
-
         // Send 4 bytes, but only one should be received until the backpressure promise is succeeded.
 
         // Now we wait until message is delivered to client channel pipeline