From 1acdb0c3abdb4db35fbbddd1740073e3e7ff3502 Mon Sep 17 00:00:00 2001
From: Fabian Fett <fabianfett@apple.com>
Date: Wed, 15 Sep 2021 11:42:42 +0200
Subject: [PATCH 1/2] Failing tests

---
 .../HTTPRequestStateMachine+Demand.swift      | 55 ++++++++++++++-----
 .../HTTPRequestStateMachine.swift             | 13 +++--
 .../HTTPRequestStateMachineTests+XCTest.swift |  2 +
 .../HTTPRequestStateMachineTests.swift        | 34 ++++++++++++
 4 files changed, 87 insertions(+), 17 deletions(-)

diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift
index 8a061bfa3..366a375f5 100644
--- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift
+++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift
@@ -50,14 +50,32 @@ extension HTTPRequestStateMachine {
                 self.state = .modifying
                 buffer.append(body)
                 self.state = .waitingForBytes(buffer)
-
-            case .waitingForRead,
-                 .waitingForDemand,
-                 .waitingForReadOrDemand:
-                preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
-
+                
             case .modifying:
                 preconditionFailure("Invalid state: \(self.state)")
+
+            // For all the following cases, please note:
+            // Normally these code paths should never be hit. However there is one way to trigger
+            // this:
+            //
+            // If the server decides to close a connection, NIO will forward all outstanding
+            // `channelRead`s without waiting for a next `context.read` call. For this reason we
+            // might receive further bytes, when we don't expect them here.
+
+            case .waitingForRead(var buffer):
+                self.state = .modifying
+                buffer.append(body)
+                self.state = .waitingForRead(buffer)
+
+            case .waitingForDemand(var buffer):
+                self.state = .modifying
+                buffer.append(body)
+                self.state = .waitingForDemand(buffer)
+
+            case .waitingForReadOrDemand(var buffer):
+                self.state = .modifying
+                buffer.append(body)
+                self.state = .waitingForReadOrDemand(buffer)
             }
         }
 
@@ -134,15 +152,26 @@ extension HTTPRequestStateMachine {
             }
         }
 
-        mutating func end() -> CircularBuffer<ByteBuffer> {
+        enum ConnectionAction {
+            case none
+            case close
+        }
+
+        mutating func end() -> (CircularBuffer<ByteBuffer>, ConnectionAction) {
             switch self.state {
             case .waitingForBytes(let buffer):
-                return buffer
-
-            case .waitingForReadOrDemand,
-                 .waitingForRead,
-                 .waitingForDemand:
-                preconditionFailure("How can we receive a body end, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
+                return (buffer, .none)
+
+            case .waitingForReadOrDemand(let buffer),
+                 .waitingForRead(let buffer),
+                 .waitingForDemand(let buffer):
+                // Normally this code path should never be hit. However there is one way to trigger
+                // this:
+                //
+                // If the server decides to close a connection, NIO will forward all outstanding
+                // `channelRead`s without waiting for a next `context.read` call. For this reason we
+                // might receive a call to `end()`, when we don't expect it here.
+                return (buffer, .close)
 
             case .modifying:
                 preconditionFailure("Invalid state: \(self.state)")
diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
index be10e9418..553b1a32d 100644
--- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
+++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
@@ -523,7 +523,7 @@ struct HTTPRequestStateMachine {
             where head.status.code < 300:
 
             return self.avoidingStateMachineCoW { state -> Action in
-                let remainingBuffer = responseStreamState.end()
+                let (remainingBuffer, _) = responseStreamState.end()
                 state = .running(
                     .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState),
                     .endReceived
@@ -536,16 +536,21 @@ struct HTTPRequestStateMachine {
             assert(producerState == .paused, "Expected to have paused the request body stream, when the head was received. Invalid state: \(self.state)")
 
             return self.avoidingStateMachineCoW { state -> Action in
-                let remainingBuffer = responseStreamState.end()
+                let (remainingBuffer, _) = responseStreamState.end()
                 state = .finished
                 return .succeedRequest(.close, remainingBuffer)
             }
 
         case .running(.endSent, .receivingBody(_, var responseStreamState)):
             return self.avoidingStateMachineCoW { state -> Action in
-                let remainingBuffer = responseStreamState.end()
+                let (remainingBuffer, action) = responseStreamState.end()
                 state = .finished
-                return .succeedRequest(.none, remainingBuffer)
+                switch action {
+                case .none:
+                    return .succeedRequest(.none, remainingBuffer)
+                case .close:
+                    return .succeedRequest(.close, remainingBuffer)
+                }
             }
 
         case .running(_, .endReceived), .finished:
diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift
index 7fef14658..05d2281e1 100644
--- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift
@@ -50,6 +50,8 @@ extension HTTPRequestStateMachineTests {
             ("testReadTimeoutThatFiresToLateIsIgnored", testReadTimeoutThatFiresToLateIsIgnored),
             ("testCancellationThatIsInvokedToLateIsIgnored", testCancellationThatIsInvokedToLateIsIgnored),
             ("testErrorWhileRunningARequestClosesTheStream", testErrorWhileRunningARequestClosesTheStream),
+            ("testCanReadHTTP1_0ResponseWithoutBody", testCanReadHTTP1_0ResponseWithoutBody),
+            ("testCanReadHTTP1_0ResponseWithBody", testCanReadHTTP1_0ResponseWithBody),
         ]
     }
 }
diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift
index b465c3484..7bf47ea37 100644
--- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift
@@ -451,6 +451,40 @@ class HTTPRequestStateMachineTests: XCTestCase {
         XCTAssertEqual(state.errorHappened(NIOSSLError.uncleanShutdown), .failRequest(NIOSSLError.uncleanShutdown, .close))
         XCTAssertEqual(state.requestCancelled(), .wait, "A cancellation that happens to late is ignored")
     }
+
+    func testCanReadHTTP1_0ResponseWithoutBody() {
+        var state = HTTPRequestStateMachine(isChannelWritable: true)
+        let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/")
+        let metadata = RequestFramingMetadata(connectionClose: false, body: .none)
+        XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false))
+
+        let responseHead = HTTPResponseHead(version: .http1_0, status: .internalServerError)
+        XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false))
+        XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait)
+        XCTAssertEqual(state.channelReadComplete(), .wait)
+        XCTAssertEqual(state.read(), .read)
+        XCTAssertEqual(state.channelReadComplete(), .wait)
+        XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, []))
+        XCTAssertEqual(state.channelInactive(), .wait)
+    }
+
+    func testCanReadHTTP1_0ResponseWithBody() {
+        var state = HTTPRequestStateMachine(isChannelWritable: true)
+        let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/")
+        let metadata = RequestFramingMetadata(connectionClose: false, body: .none)
+        XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false))
+
+        let responseHead = HTTPResponseHead(version: .http1_0, status: .internalServerError)
+        let body = ByteBuffer(string: "foo bar")
+        XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false))
+        XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait)
+        XCTAssertEqual(state.channelReadComplete(), .wait)
+        XCTAssertEqual(state.read(), .read)
+        XCTAssertEqual(state.channelReadComplete(), .wait)
+        XCTAssertEqual(state.channelRead(.body(body)), .wait)
+        XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, [body]))
+        XCTAssertEqual(state.channelInactive(), .wait)
+    }
 }
 
 extension HTTPRequestStateMachine.Action: Equatable {

From 2189eb656a7ed73c562ee9da5644616a7b33579d Mon Sep 17 00:00:00 2001
From: Fabian Fett <fabianfett@apple.com>
Date: Mon, 20 Sep 2021 15:39:52 +0200
Subject: [PATCH 2/2] Code review

---
 .../HTTPRequestStateMachine+Demand.swift      |  2 +-
 .../HTTPRequestStateMachine.swift             | 24 ++++++++++++++-----
 .../HTTPRequestStateMachineTests+XCTest.swift |  1 +
 .../HTTPRequestStateMachineTests.swift        | 20 ++++++++++++++++
 4 files changed, 40 insertions(+), 7 deletions(-)

diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift
index 366a375f5..d20c94d02 100644
--- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift
+++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift
@@ -50,7 +50,7 @@ extension HTTPRequestStateMachine {
                 self.state = .modifying
                 buffer.append(body)
                 self.state = .waitingForBytes(buffer)
-                
+
             case .modifying:
                 preconditionFailure("Invalid state: \(self.state)")
 
diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
index 553b1a32d..34d2ec433 100644
--- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
+++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
@@ -523,12 +523,22 @@ struct HTTPRequestStateMachine {
             where head.status.code < 300:
 
             return self.avoidingStateMachineCoW { state -> Action in
-                let (remainingBuffer, _) = responseStreamState.end()
-                state = .running(
-                    .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState),
-                    .endReceived
-                )
-                return .forwardResponseBodyParts(remainingBuffer)
+                let (remainingBuffer, connectionAction) = responseStreamState.end()
+                switch connectionAction {
+                case .none:
+                    state = .running(
+                        .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState),
+                        .endReceived
+                    )
+                    return .forwardResponseBodyParts(remainingBuffer)
+                case .close:
+                    // If we receive a `.close` as a connectionAction from the responseStreamState
+                    // this means, that the response end was signaled by a connection close. Since
+                    // the request is still uploading, we will not be able to finish the upload. For
+                    // this reason we can fail the request here.
+                    state = .failed(HTTPClientError.remoteConnectionClosed)
+                    return .failRequest(HTTPClientError.remoteConnectionClosed, .close)
+                }
             }
 
         case .running(.streaming(_, _, let producerState), .receivingBody(let head, var responseStreamState)):
@@ -536,6 +546,8 @@ struct HTTPRequestStateMachine {
             assert(producerState == .paused, "Expected to have paused the request body stream, when the head was received. Invalid state: \(self.state)")
 
             return self.avoidingStateMachineCoW { state -> Action in
+                // We can ignore the connectionAction from the responseStreamState, since the
+                // connection should be closed anyway.
                 let (remainingBuffer, _) = responseStreamState.end()
                 state = .finished
                 return .succeedRequest(.close, remainingBuffer)
diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift
index 05d2281e1..d600092ff 100644
--- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift
@@ -52,6 +52,7 @@ extension HTTPRequestStateMachineTests {
             ("testErrorWhileRunningARequestClosesTheStream", testErrorWhileRunningARequestClosesTheStream),
             ("testCanReadHTTP1_0ResponseWithoutBody", testCanReadHTTP1_0ResponseWithoutBody),
             ("testCanReadHTTP1_0ResponseWithBody", testCanReadHTTP1_0ResponseWithBody),
+            ("testFailHTTP1_0RequestThatIsStillUploading", testFailHTTP1_0RequestThatIsStillUploading),
         ]
     }
 }
diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift
index 7bf47ea37..3a11cfd43 100644
--- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift
@@ -485,6 +485,26 @@ class HTTPRequestStateMachineTests: XCTestCase {
         XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, [body]))
         XCTAssertEqual(state.channelInactive(), .wait)
     }
+
+    func testFailHTTP1_0RequestThatIsStillUploading() {
+        var state = HTTPRequestStateMachine(isChannelWritable: true)
+        let requestHead = HTTPRequestHead(version: .http1_1, method: .POST, uri: "/")
+        let metadata = RequestFramingMetadata(connectionClose: false, body: .stream)
+        XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: true))
+
+        let part1: ByteBuffer = .init(string: "foo")
+        XCTAssertEqual(state.requestStreamPartReceived(.byteBuffer(part1)), .sendBodyPart(.byteBuffer(part1)))
+        let responseHead = HTTPResponseHead(version: .http1_0, status: .ok)
+        let body = ByteBuffer(string: "foo bar")
+        XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false))
+        XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait)
+        XCTAssertEqual(state.channelReadComplete(), .wait)
+        XCTAssertEqual(state.read(), .read)
+        XCTAssertEqual(state.channelReadComplete(), .wait)
+        XCTAssertEqual(state.channelRead(.body(body)), .wait)
+        XCTAssertEqual(state.channelRead(.end(nil)), .failRequest(HTTPClientError.remoteConnectionClosed, .close))
+        XCTAssertEqual(state.channelInactive(), .wait)
+    }
 }
 
 extension HTTPRequestStateMachine.Action: Equatable {