From be4f9aa207c5c102c17165873b36691ecc664f92 Mon Sep 17 00:00:00 2001
From: Artem Redkin <aredkin@apple.com>
Date: Mon, 15 Jun 2020 12:07:07 +0100
Subject: [PATCH 1/6] check body length

---
 Sources/AsyncHTTPClient/HTTPClient.swift      |  9 ++++--
 Sources/AsyncHTTPClient/HTTPHandler.swift     | 11 +++++++
 .../HTTPClientTests.swift                     | 31 +++++++++++++++++++
 3 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift
index ee0f2613e..faa31d06d 100644
--- a/Sources/AsyncHTTPClient/HTTPClient.swift
+++ b/Sources/AsyncHTTPClient/HTTPClient.swift
@@ -925,6 +925,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
         case uncleanShutdown
         case traceRequestWithBody
         case invalidHeaderFieldNames([String])
+        case bodyLengthMismatch
     }
 
     private var code: Code
@@ -969,10 +970,12 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
     public static let redirectLimitReached = HTTPClientError(code: .redirectLimitReached)
     /// Redirect Cycle detected.
     public static let redirectCycleDetected = HTTPClientError(code: .redirectCycleDetected)
-    /// Unclean shutdown
+    /// Unclean shutdown.
     public static let uncleanShutdown = HTTPClientError(code: .uncleanShutdown)
-    /// A body was sent in a request with method TRACE
+    /// A body was sent in a request with method TRACE.
     public static let traceRequestWithBody = HTTPClientError(code: .traceRequestWithBody)
-    /// Header field names contain invalid characters
+    /// Header field names contain invalid characters.
     public static func invalidHeaderFieldNames(_ names: [String]) -> HTTPClientError { return HTTPClientError(code: .invalidHeaderFieldNames(names)) }
+    /// Body length is not equal to `Content-Length`.
+    public static let bodyLengthMismatch = HTTPClientError(code: .bodyLengthMismatch)
 }
diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift
index 3dc32bf84..d86c3ed7e 100644
--- a/Sources/AsyncHTTPClient/HTTPHandler.swift
+++ b/Sources/AsyncHTTPClient/HTTPHandler.swift
@@ -651,6 +651,8 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
     let logger: Logger // We are okay to store the logger here because a TaskHandler is just for one request.
 
     var state: State = .idle
+    var expectedBodyLength: Int? = nil
+    var actualBodyLength: Int = 0
     var pendingRead = false
     var mayRead = true
     var closing = false {
@@ -794,11 +796,19 @@ extension TaskHandler: ChannelDuplexHandler {
         assert(head.version == HTTPVersion(major: 1, minor: 1),
                "Sending a request in HTTP version \(head.version) which is unsupported by the above `if`")
 
+        self.expectedBodyLength = head.headers[canonicalForm: "content-length"].first.flatMap { Int($0) }
+
         context.write(wrapOutboundOut(.head(head))).map {
             self.callOutToDelegateFireAndForget(value: head, self.delegate.didSendRequestHead)
         }.flatMap {
             self.writeBody(request: request, context: context)
         }.flatMap {
+            if let expectedBodyLength = self.expectedBodyLength, expectedBodyLength != self.actualBodyLength {
+                self.state = .end
+                let error = HTTPClientError.bodyLengthMismatch
+                self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
+                return context.eventLoop.makeFailedFuture(error)
+            }
             context.eventLoop.assertInEventLoop()
             return context.writeAndFlush(self.wrapOutboundOut(.end(nil)))
         }.map {
@@ -836,6 +846,7 @@ extension TaskHandler: ChannelDuplexHandler {
                 }
 
                 return promise.futureResult.map {
+                    self.actualBodyLength += part.readableBytes
                     self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart)
                 }
             })
diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
index 4cde4ab9f..7daa72630 100644
--- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
@@ -2051,4 +2051,35 @@ class HTTPClientTests: XCTestCase {
 
         XCTAssertNoThrow(try future.wait())
     }
+
+    func testContentLengthTooLongFails() {
+        let url = self.defaultHTTPBinURLPrefix + "/post"
+        XCTAssertThrowsError(
+            try self.defaultClient.execute(request:
+                                            Request(url: url,
+                                                    body: .stream(length: 10) { streamWriter in
+                                                        streamWriter.write(.byteBuffer(ByteBuffer(string: "1")))
+                                                    })).wait()) { error in
+            XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch)
+        }
+        // Quickly try another request and check that it works.
+        XCTAssertNoThrow(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "/get").wait())
+    }
+
+    // currently gets stuck because of #250 the server just never replies
+    func testContentLengthTooShortFails() {
+        let url = self.defaultHTTPBinURLPrefix + "/post"
+        let tooLong = "XBAD BAD BAD NOT HTTP/1.1\r\n\r\n"
+        XCTAssertThrowsError(
+            try self.defaultClient.execute(request:
+                                            Request(url: url,
+                                                    body: .stream(length: 1) { streamWriter in
+                                                        streamWriter.write(.byteBuffer(ByteBuffer(string: tooLong)))
+                                                    })).wait()) { error in
+            XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch)
+        }
+        // Quickly try another request and check that it works. If we by accident wrote some extra bytes into the
+        // stream (and reuse the connection) that could cause problems.
+        XCTAssertNoThrow(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "/get").wait())
+    }
 }

From a7c49776adde0ac15d660d4f700f7b523df67daa Mon Sep 17 00:00:00 2001
From: Artem Redkin <aredkin@apple.com>
Date: Mon, 15 Jun 2020 12:11:55 +0100
Subject: [PATCH 2/6] swiftformat and linux tests

---
 Sources/AsyncHTTPClient/HTTPHandler.swift            |  2 +-
 .../HTTPClientTests+XCTest.swift                     |  2 ++
 Tests/AsyncHTTPClientTests/HTTPClientTests.swift     | 12 ++++++------
 3 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift
index d86c3ed7e..7a7299073 100644
--- a/Sources/AsyncHTTPClient/HTTPHandler.swift
+++ b/Sources/AsyncHTTPClient/HTTPHandler.swift
@@ -651,7 +651,7 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
     let logger: Logger // We are okay to store the logger here because a TaskHandler is just for one request.
 
     var state: State = .idle
-    var expectedBodyLength: Int? = nil
+    var expectedBodyLength: Int?
     var actualBodyLength: Int = 0
     var pendingRead = false
     var mayRead = true
diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
index 3e95a5879..5d8ba873f 100644
--- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
@@ -110,6 +110,8 @@ extension HTTPClientTests {
             ("testAllMethodsLog", testAllMethodsLog),
             ("testClosingIdleConnectionsInPoolLogsInTheBackground", testClosingIdleConnectionsInPoolLogsInTheBackground),
             ("testDelegateCallinsTolerateRandomEL", testDelegateCallinsTolerateRandomEL),
+            ("testContentLengthTooLongFails", testContentLengthTooLongFails),
+            ("testContentLengthTooShortFails", testContentLengthTooShortFails),
         ]
     }
 }
diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
index 7daa72630..3e4f3a0e7 100644
--- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
@@ -2056,9 +2056,9 @@ class HTTPClientTests: XCTestCase {
         let url = self.defaultHTTPBinURLPrefix + "/post"
         XCTAssertThrowsError(
             try self.defaultClient.execute(request:
-                                            Request(url: url,
-                                                    body: .stream(length: 10) { streamWriter in
-                                                        streamWriter.write(.byteBuffer(ByteBuffer(string: "1")))
+                Request(url: url,
+                        body: .stream(length: 10) { streamWriter in
+                            streamWriter.write(.byteBuffer(ByteBuffer(string: "1")))
                                                     })).wait()) { error in
             XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch)
         }
@@ -2072,9 +2072,9 @@ class HTTPClientTests: XCTestCase {
         let tooLong = "XBAD BAD BAD NOT HTTP/1.1\r\n\r\n"
         XCTAssertThrowsError(
             try self.defaultClient.execute(request:
-                                            Request(url: url,
-                                                    body: .stream(length: 1) { streamWriter in
-                                                        streamWriter.write(.byteBuffer(ByteBuffer(string: tooLong)))
+                Request(url: url,
+                        body: .stream(length: 1) { streamWriter in
+                            streamWriter.write(.byteBuffer(ByteBuffer(string: tooLong)))
                                                     })).wait()) { error in
             XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch)
         }

From 34ad4a6fbf157ae209400bb76fa8e8cc919c0a5c Mon Sep 17 00:00:00 2001
From: Artem Redkin <aredkin@apple.com>
Date: Mon, 15 Jun 2020 13:35:49 +0100
Subject: [PATCH 3/6] review fixes

---
 Sources/AsyncHTTPClient/HTTPHandler.swift     | 41 +++++++++++--------
 .../HTTPClientTestUtils.swift                 | 33 ++++++++++-----
 .../HTTPClientTests.swift                     | 32 ++++++++++-----
 .../RequestValidationTests.swift              | 10 +++++
 4 files changed, 77 insertions(+), 39 deletions(-)

diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift
index 7a7299073..6c816b8fb 100644
--- a/Sources/AsyncHTTPClient/HTTPHandler.swift
+++ b/Sources/AsyncHTTPClient/HTTPHandler.swift
@@ -641,7 +641,7 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
         case head
         case redirected(HTTPResponseHead, URL)
         case body
-        case end
+        case endOrError
     }
 
     let task: HTTPClient.Task<Delegate.Response>
@@ -782,7 +782,7 @@ extension TaskHandler: ChannelDuplexHandler {
         } catch {
             promise?.fail(error)
             self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
-            self.state = .end
+            self.state = .endOrError
             return
         }
 
@@ -796,20 +796,24 @@ extension TaskHandler: ChannelDuplexHandler {
         assert(head.version == HTTPVersion(major: 1, minor: 1),
                "Sending a request in HTTP version \(head.version) which is unsupported by the above `if`")
 
-        self.expectedBodyLength = head.headers[canonicalForm: "content-length"].first.flatMap { Int($0) }
+
+        let contentLengths = head.headers[canonicalForm: "content-length"]
+        assert(contentLengths.count <= 1)
+
+        self.expectedBodyLength = contentLengths.first.flatMap { Int($0) }
 
         context.write(wrapOutboundOut(.head(head))).map {
             self.callOutToDelegateFireAndForget(value: head, self.delegate.didSendRequestHead)
         }.flatMap {
             self.writeBody(request: request, context: context)
         }.flatMap {
+            context.eventLoop.assertInEventLoop()
             if let expectedBodyLength = self.expectedBodyLength, expectedBodyLength != self.actualBodyLength {
-                self.state = .end
+                self.state = .endOrError
                 let error = HTTPClientError.bodyLengthMismatch
                 self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
                 return context.eventLoop.makeFailedFuture(error)
             }
-            context.eventLoop.assertInEventLoop()
             return context.writeAndFlush(self.wrapOutboundOut(.end(nil)))
         }.map {
             context.eventLoop.assertInEventLoop()
@@ -818,10 +822,10 @@ extension TaskHandler: ChannelDuplexHandler {
         }.flatMapErrorThrowing { error in
             context.eventLoop.assertInEventLoop()
             switch self.state {
-            case .end:
+            case .endOrError:
                 break
             default:
-                self.state = .end
+                self.state = .endOrError
                 self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
             }
             throw error
@@ -839,14 +843,15 @@ extension TaskHandler: ChannelDuplexHandler {
                 // All writes have to be switched to the channel EL if channel and task ELs differ
                 if context.eventLoop.inEventLoop {
                     context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
+                    self.actualBodyLength += part.readableBytes
                 } else {
                     context.eventLoop.execute {
                         context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
+                        self.actualBodyLength += part.readableBytes
                     }
                 }
 
                 return promise.futureResult.map {
-                    self.actualBodyLength += part.readableBytes
                     self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart)
                 }
             })
@@ -904,12 +909,12 @@ extension TaskHandler: ChannelDuplexHandler {
         case .end:
             switch self.state {
             case .redirected(let head, let redirectURL):
-                self.state = .end
+                self.state = .endOrError
                 self.task.releaseAssociatedConnection(delegateType: Delegate.self, closing: self.closing).whenSuccess {
                     self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
                 }
             default:
-                self.state = .end
+                self.state = .endOrError
                 self.callOutToDelegate(promise: self.task.promise, self.delegate.didFinishRequest)
             }
         }
@@ -924,14 +929,14 @@ extension TaskHandler: ChannelDuplexHandler {
                 context.read()
             }
         case .failure(let error):
-            self.state = .end
+            self.state = .endOrError
             self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
         }
     }
 
     func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
         if (event as? IdleStateHandler.IdleStateEvent) == .read {
-            self.state = .end
+            self.state = .endOrError
             let error = HTTPClientError.readTimeout
             self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
         } else {
@@ -941,7 +946,7 @@ extension TaskHandler: ChannelDuplexHandler {
 
     func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
         if (event as? TaskCancelEvent) != nil {
-            self.state = .end
+            self.state = .endOrError
             let error = HTTPClientError.cancelled
             self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
             promise?.succeed(())
@@ -952,10 +957,10 @@ extension TaskHandler: ChannelDuplexHandler {
 
     func channelInactive(context: ChannelHandlerContext) {
         switch self.state {
-        case .end:
+        case .endOrError:
             break
         case .body, .head, .idle, .redirected, .sent:
-            self.state = .end
+            self.state = .endOrError
             let error = HTTPClientError.remoteConnectionClosed
             self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
         }
@@ -966,7 +971,7 @@ extension TaskHandler: ChannelDuplexHandler {
         switch error {
         case NIOSSLError.uncleanShutdown:
             switch self.state {
-            case .end:
+            case .endOrError:
                 /// Some HTTP Servers can 'forget' to respond with CloseNotify when client is closing connection,
                 /// this could lead to incomplete SSL shutdown. But since request is already processed, we can ignore this error.
                 break
@@ -975,11 +980,11 @@ extension TaskHandler: ChannelDuplexHandler {
                 /// We can also ignore this error like `.end`.
                 break
             default:
-                self.state = .end
+                self.state = .endOrError
                 self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
             }
         default:
-            self.state = .end
+            self.state = .endOrError
             self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
         }
     }
diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
index a7eee57b5..cffad6291 100644
--- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
@@ -188,6 +188,7 @@ internal final class HTTPBin {
     let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
     let serverChannel: Channel
     let isShutdown: NIOAtomic<Bool> = .makeAtomic(value: false)
+    var connections: NIOAtomic<Int>
     var connectionCount: NIOAtomic<Int> = .makeAtomic(value: 0)
     private let activeConnCounterHandler: CountActiveConnectionsHandler
     var activeConnections: Int {
@@ -233,6 +234,9 @@ internal final class HTTPBin {
         let activeConnCounterHandler = CountActiveConnectionsHandler()
         self.activeConnCounterHandler = activeConnCounterHandler
 
+        let connections = NIOAtomic.makeAtomic(value: 0)
+        self.connections = connections
+
         self.serverChannel = try! ServerBootstrap(group: self.group)
             .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
             .serverChannelInitializer { channel in
@@ -261,10 +265,10 @@ internal final class HTTPBin {
                     }.flatMap {
                         if ssl {
                             return HTTPBin.configureTLS(channel: channel).flatMap {
-                                channel.pipeline.addHandler(HttpBinHandler(channelPromise: channelPromise, maxChannelAge: maxChannelAge))
+                                channel.pipeline.addHandler(HttpBinHandler(channelPromise: channelPromise, maxChannelAge: maxChannelAge, connectionId: connections.add(1)))
                             }
                         } else {
-                            return channel.pipeline.addHandler(HttpBinHandler(channelPromise: channelPromise, maxChannelAge: maxChannelAge))
+                            return channel.pipeline.addHandler(HttpBinHandler(channelPromise: channelPromise, maxChannelAge: maxChannelAge, connectionId: connections.add(1)))
                         }
                     }
                 }
@@ -357,8 +361,8 @@ internal struct HTTPResponseBuilder {
     }
 }
 
-let globalRequestCounter = NIOAtomic<Int>.makeAtomic(value: 0)
-let globalConnectionCounter = NIOAtomic<Int>.makeAtomic(value: 0)
+//let globalRequestCounter = NIOAtomic<Int>.makeAtomic(value: 0)
+//let globalConnectionCounter = NIOAtomic<Int>.makeAtomic(value: 0)
 
 internal struct RequestInfo: Codable {
     var data: String
@@ -378,13 +382,13 @@ internal final class HttpBinHandler: ChannelInboundHandler {
     let maxChannelAge: TimeAmount?
     var shouldClose = false
     var isServingRequest = false
-    let myConnectionNumber: Int
-    var currentRequestNumber: Int = -1
+    let connectionId: Int
+    var requestId: Int = 0
 
-    init(channelPromise: EventLoopPromise<Channel>? = nil, maxChannelAge: TimeAmount? = nil) {
+    init(channelPromise: EventLoopPromise<Channel>? = nil, maxChannelAge: TimeAmount? = nil, connectionId: Int) {
         self.channelPromise = channelPromise
         self.maxChannelAge = maxChannelAge
-        self.myConnectionNumber = globalConnectionCounter.add(1)
+        self.connectionId = connectionId
     }
 
     func handlerAdded(context: ChannelHandlerContext) {
@@ -424,7 +428,7 @@ internal final class HttpBinHandler: ChannelInboundHandler {
         switch self.unwrapInboundIn(data) {
         case .head(let req):
             self.responseHeaders = HTTPHeaders()
-            self.currentRequestNumber = globalRequestCounter.add(1)
+            self.requestId += 1
             self.parseAndSetOptions(from: req)
             let urlComponents = URLComponents(string: req.uri)!
             switch urlComponents.percentEncodedPath {
@@ -552,8 +556,15 @@ internal final class HttpBinHandler: ChannelInboundHandler {
             context.write(wrapOutboundOut(.head(response.head)), promise: nil)
             if let body = response.body {
                 let requestInfo = RequestInfo(data: String(buffer: body),
-                                              requestNumber: self.currentRequestNumber,
-                                              connectionNumber: self.myConnectionNumber)
+                                              requestNumber: self.requestId,
+                                              connectionNumber: self.connectionId)
+                let responseBody = try! JSONEncoder().encodeAsByteBuffer(requestInfo,
+                                                                         allocator: context.channel.allocator)
+                context.write(wrapOutboundOut(.body(.byteBuffer(responseBody))), promise: nil)
+            } else {
+                let requestInfo = RequestInfo(data: "",
+                                              requestNumber: self.requestId,
+                                              connectionNumber: self.connectionId)
                 let responseBody = try! JSONEncoder().encodeAsByteBuffer(requestInfo,
                                                                          allocator: context.channel.allocator)
                 context.write(wrapOutboundOut(.body(.byteBuffer(responseBody))), promise: nil)
diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
index 3e4f3a0e7..6f80c9ea9 100644
--- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
@@ -1713,7 +1713,8 @@ class HTTPClientTests: XCTestCase {
 
         // req 1 and 2 cannot share the same connection (close header)
         XCTAssertEqual(stats1.connectionNumber + 1, stats2.connectionNumber)
-        XCTAssertEqual(stats1.requestNumber + 1, stats2.requestNumber)
+        XCTAssertEqual(stats1.requestNumber, 1)
+        XCTAssertEqual(stats2.requestNumber, 1)
 
         // req 2 and 3 should share the same connection (keep-alive is default)
         XCTAssertEqual(stats2.requestNumber + 1, stats3.requestNumber)
@@ -1742,7 +1743,8 @@ class HTTPClientTests: XCTestCase {
 
         // req 1 and 2 cannot share the same connection (close header)
         XCTAssertEqual(stats1.connectionNumber + 1, stats2.connectionNumber)
-        XCTAssertEqual(stats1.requestNumber + 1, stats2.requestNumber)
+        XCTAssertEqual(stats1.requestNumber, 1)
+        XCTAssertEqual(stats2.requestNumber, 1)
 
         // req 2 and 3 should share the same connection (keep-alive is default)
         XCTAssertEqual(stats2.requestNumber + 1, stats3.requestNumber)
@@ -1773,7 +1775,7 @@ class HTTPClientTests: XCTestCase {
 
             // req 1 and 2 cannot share the same connection (close header)
             XCTAssertEqual(stats1.connectionNumber + 1, stats2.connectionNumber)
-            XCTAssertEqual(stats1.requestNumber + 1, stats2.requestNumber)
+            XCTAssertEqual(stats2.requestNumber, 1)
 
             // req 2 and 3 should share the same connection (keep-alive is default)
             XCTAssertEqual(stats2.requestNumber + 1, stats3.requestNumber)
@@ -1805,7 +1807,7 @@ class HTTPClientTests: XCTestCase {
 
             // req 1 and 2 cannot share the same connection (close header)
             XCTAssertEqual(stats1.connectionNumber + 1, stats2.connectionNumber)
-            XCTAssertEqual(stats1.requestNumber + 1, stats2.requestNumber)
+            XCTAssertEqual(stats2.requestNumber, 1)
 
             // req 2 and 3 should share the same connection (keep-alive is default)
             XCTAssertEqual(stats2.requestNumber + 1, stats3.requestNumber)
@@ -2052,22 +2054,29 @@ class HTTPClientTests: XCTestCase {
         XCTAssertNoThrow(try future.wait())
     }
 
-    func testContentLengthTooLongFails() {
+    func testContentLengthTooLongFails() throws {
         let url = self.defaultHTTPBinURLPrefix + "/post"
         XCTAssertThrowsError(
             try self.defaultClient.execute(request:
                 Request(url: url,
                         body: .stream(length: 10) { streamWriter in
-                            streamWriter.write(.byteBuffer(ByteBuffer(string: "1")))
-                                                    })).wait()) { error in
+                            let promise = self.defaultClient.eventLoopGroup.next().makePromise(of: Void.self)
+                            DispatchQueue(label: "content-length-test").async {
+                                streamWriter.write(.byteBuffer(ByteBuffer(string: "1"))).cascade(to: promise)
+                            }
+                            return promise.futureResult
+                        })).wait()) { error in
             XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch)
         }
         // Quickly try another request and check that it works.
-        XCTAssertNoThrow(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "/get").wait())
+        var response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait()
+        let info = try response.body!.readJSONDecodable(RequestInfo.self, length: response.body!.readableBytes)
+        XCTAssertEqual(info!.connectionNumber, 1)
+        XCTAssertEqual(info!.requestNumber, 1)
     }
 
     // currently gets stuck because of #250 the server just never replies
-    func testContentLengthTooShortFails() {
+    func testContentLengthTooShortFails() throws {
         let url = self.defaultHTTPBinURLPrefix + "/post"
         let tooLong = "XBAD BAD BAD NOT HTTP/1.1\r\n\r\n"
         XCTAssertThrowsError(
@@ -2080,6 +2089,9 @@ class HTTPClientTests: XCTestCase {
         }
         // Quickly try another request and check that it works. If we by accident wrote some extra bytes into the
         // stream (and reuse the connection) that could cause problems.
-        XCTAssertNoThrow(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "/get").wait())
+        var response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait()
+        let info = try response.body!.readJSONDecodable(RequestInfo.self, length: response.body!.readableBytes)
+        XCTAssertEqual(info!.connectionNumber, 1)
+        XCTAssertEqual(info!.requestNumber, 1)
     }
 }
diff --git a/Tests/AsyncHTTPClientTests/RequestValidationTests.swift b/Tests/AsyncHTTPClientTests/RequestValidationTests.swift
index 5f4db05b7..6433e6b92 100644
--- a/Tests/AsyncHTTPClientTests/RequestValidationTests.swift
+++ b/Tests/AsyncHTTPClientTests/RequestValidationTests.swift
@@ -104,4 +104,14 @@ class RequestValidationTests: XCTestCase {
 
         XCTAssertNoThrow(try headers.validate(method: .GET, body: nil))
     }
+
+    func testMultipleContentLengthOnNilStreamLength() {
+        var headers = HTTPHeaders([("Content-Length", "1"), ("Content-Length", "2")])
+        var buffer = ByteBufferAllocator().buffer(capacity: 10)
+        buffer.writeBytes([UInt8](repeating: 12, count: 10))
+        let body: HTTPClient.Body = .stream() { writer in
+            writer.write(.byteBuffer(buffer))
+        }
+        XCTAssertThrowsError(try headers.validate(method: .PUT, body: body))
+    }
 }

From 044765e30bdd01fbfdf1e6ad5f1fec4db3620dcd Mon Sep 17 00:00:00 2001
From: Artem Redkin <aredkin@apple.com>
Date: Mon, 15 Jun 2020 13:40:06 +0100
Subject: [PATCH 4/6] swiftformat and linux tests

---
 Sources/AsyncHTTPClient/HTTPHandler.swift                     | 1 -
 Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift          | 4 ++--
 .../AsyncHTTPClientTests/RequestValidationTests+XCTest.swift  | 1 +
 Tests/AsyncHTTPClientTests/RequestValidationTests.swift       | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift
index 6c816b8fb..1d224e653 100644
--- a/Sources/AsyncHTTPClient/HTTPHandler.swift
+++ b/Sources/AsyncHTTPClient/HTTPHandler.swift
@@ -796,7 +796,6 @@ extension TaskHandler: ChannelDuplexHandler {
         assert(head.version == HTTPVersion(major: 1, minor: 1),
                "Sending a request in HTTP version \(head.version) which is unsupported by the above `if`")
 
-
         let contentLengths = head.headers[canonicalForm: "content-length"]
         assert(contentLengths.count <= 1)
 
diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
index cffad6291..83657e2d1 100644
--- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
@@ -361,8 +361,8 @@ internal struct HTTPResponseBuilder {
     }
 }
 
-//let globalRequestCounter = NIOAtomic<Int>.makeAtomic(value: 0)
-//let globalConnectionCounter = NIOAtomic<Int>.makeAtomic(value: 0)
+// let globalRequestCounter = NIOAtomic<Int>.makeAtomic(value: 0)
+// let globalConnectionCounter = NIOAtomic<Int>.makeAtomic(value: 0)
 
 internal struct RequestInfo: Codable {
     var data: String
diff --git a/Tests/AsyncHTTPClientTests/RequestValidationTests+XCTest.swift b/Tests/AsyncHTTPClientTests/RequestValidationTests+XCTest.swift
index b4d88344c..51e6ff016 100644
--- a/Tests/AsyncHTTPClientTests/RequestValidationTests+XCTest.swift
+++ b/Tests/AsyncHTTPClientTests/RequestValidationTests+XCTest.swift
@@ -33,6 +33,7 @@ extension RequestValidationTests {
             ("testGET_HEAD_DELETE_CONNECTRequestCanHaveBody", testGET_HEAD_DELETE_CONNECTRequestCanHaveBody),
             ("testInvalidHeaderFieldNames", testInvalidHeaderFieldNames),
             ("testValidHeaderFieldNames", testValidHeaderFieldNames),
+            ("testMultipleContentLengthOnNilStreamLength", testMultipleContentLengthOnNilStreamLength),
         ]
     }
 }
diff --git a/Tests/AsyncHTTPClientTests/RequestValidationTests.swift b/Tests/AsyncHTTPClientTests/RequestValidationTests.swift
index 6433e6b92..fe3cb9330 100644
--- a/Tests/AsyncHTTPClientTests/RequestValidationTests.swift
+++ b/Tests/AsyncHTTPClientTests/RequestValidationTests.swift
@@ -109,7 +109,7 @@ class RequestValidationTests: XCTestCase {
         var headers = HTTPHeaders([("Content-Length", "1"), ("Content-Length", "2")])
         var buffer = ByteBufferAllocator().buffer(capacity: 10)
         buffer.writeBytes([UInt8](repeating: 12, count: 10))
-        let body: HTTPClient.Body = .stream() { writer in
+        let body: HTTPClient.Body = .stream { writer in
             writer.write(.byteBuffer(buffer))
         }
         XCTAssertThrowsError(try headers.validate(method: .PUT, body: body))

From 97a8e96299db08e8cd88ba336888602756f52fbc Mon Sep 17 00:00:00 2001
From: Artem Redkin <aredkin@apple.com>
Date: Mon, 15 Jun 2020 13:47:43 +0100
Subject: [PATCH 5/6] review fixes

---
 Sources/AsyncHTTPClient/HTTPHandler.swift            | 4 ++--
 Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift | 3 ---
 2 files changed, 2 insertions(+), 5 deletions(-)

diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift
index 1d224e653..c79d1e99d 100644
--- a/Sources/AsyncHTTPClient/HTTPHandler.swift
+++ b/Sources/AsyncHTTPClient/HTTPHandler.swift
@@ -841,12 +841,12 @@ extension TaskHandler: ChannelDuplexHandler {
                 let promise = self.task.eventLoop.makePromise(of: Void.self)
                 // All writes have to be switched to the channel EL if channel and task ELs differ
                 if context.eventLoop.inEventLoop {
-                    context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
                     self.actualBodyLength += part.readableBytes
+                    context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
                 } else {
                     context.eventLoop.execute {
-                        context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
                         self.actualBodyLength += part.readableBytes
+                        context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
                     }
                 }
 
diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
index 83657e2d1..3d0e279d9 100644
--- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
@@ -361,9 +361,6 @@ internal struct HTTPResponseBuilder {
     }
 }
 
-// let globalRequestCounter = NIOAtomic<Int>.makeAtomic(value: 0)
-// let globalConnectionCounter = NIOAtomic<Int>.makeAtomic(value: 0)
-
 internal struct RequestInfo: Codable {
     var data: String
     var requestNumber: Int

From 715ab1429919860fe34c1d729b33bf8f62baebbe Mon Sep 17 00:00:00 2001
From: Artem Redkin <aredkin@apple.com>
Date: Tue, 16 Jun 2020 08:14:57 +0100
Subject: [PATCH 6/6] fix missing ELG close in test

---
 .../HTTPClientInternalTests.swift             |  2 +-
 .../HTTPClientTests.swift                     | 31 ++++++++++++++-----
 2 files changed, 24 insertions(+), 9 deletions(-)

diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift
index f0b841fdc..f8b0bd59c 100644
--- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift
@@ -949,8 +949,8 @@ class HTTPClientInternalTests: XCTestCase {
 
         defer {
             XCTAssertNoThrow(try client.syncShutdown())
-            XCTAssertNoThrow(try elg.syncShutdownGracefully())
             XCTAssertNoThrow(try httpBin.shutdown())
+            XCTAssertNoThrow(try elg.syncShutdownGracefully())
         }
 
         let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)//get")
diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
index bfabed2fe..faf3099e0 100644
--- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
+++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift
@@ -2038,6 +2038,7 @@ class HTTPClientTests: XCTestCase {
         defer {
             XCTAssertNoThrow(try httpClient.syncShutdown())
             XCTAssertNoThrow(try httpServer.stop())
+            XCTAssertNoThrow(try elg.syncShutdownGracefully())
         }
 
         let delegate = TestDelegate(eventLoop: second)
@@ -2069,10 +2070,17 @@ class HTTPClientTests: XCTestCase {
             XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch)
         }
         // Quickly try another request and check that it works.
-        var response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait()
-        let info = try response.body!.readJSONDecodable(RequestInfo.self, length: response.body!.readableBytes)
-        XCTAssertEqual(info!.connectionNumber, 1)
-        XCTAssertEqual(info!.requestNumber, 1)
+        let response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait()
+        guard var body = response.body else {
+            XCTFail("Body missing: \(response)")
+            return
+        }
+        guard let info = try body.readJSONDecodable(RequestInfo.self, length: body.readableBytes) else {
+            XCTFail("Cannot parse body: \(body.readableBytesView.map { $0 })")
+            return
+        }
+        XCTAssertEqual(info.connectionNumber, 1)
+        XCTAssertEqual(info.requestNumber, 1)
     }
 
     // currently gets stuck because of #250 the server just never replies
@@ -2089,9 +2097,16 @@ class HTTPClientTests: XCTestCase {
         }
         // Quickly try another request and check that it works. If we by accident wrote some extra bytes into the
         // stream (and reuse the connection) that could cause problems.
-        var response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait()
-        let info = try response.body!.readJSONDecodable(RequestInfo.self, length: response.body!.readableBytes)
-        XCTAssertEqual(info!.connectionNumber, 1)
-        XCTAssertEqual(info!.requestNumber, 1)
+        let response = try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "get").wait()
+        guard var body = response.body else {
+            XCTFail("Body missing: \(response)")
+            return
+        }
+        guard let info = try body.readJSONDecodable(RequestInfo.self, length: body.readableBytes) else {
+            XCTFail("Cannot parse body: \(body.readableBytesView.map { $0 })")
+            return
+        }
+        XCTAssertEqual(info.connectionNumber, 1)
+        XCTAssertEqual(info.requestNumber, 1)
     }
 }