diff --git a/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift b/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift index 96f936197..c20d5e211 100644 --- a/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift +++ b/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift @@ -342,12 +342,9 @@ extension RequestBag.StateMachine { preconditionFailure("If we have received an error or eof before, why did we get another body part? Next: \(next)") } - if buffer.isEmpty, newChunks == nil || newChunks!.isEmpty { - self.state = .finished(error: nil) - return .succeedRequest - } else if buffer.isEmpty, let newChunks = newChunks { + if buffer.isEmpty, let newChunks = newChunks, !newChunks.isEmpty { buffer = newChunks - } else if let newChunks = newChunks { + } else if let newChunks = newChunks, !newChunks.isEmpty { buffer.append(contentsOf: newChunks) } @@ -433,9 +430,11 @@ extension RequestBag.StateMachine { private mutating func consumeMoreBodyData() -> ConsumeAction { switch self.state { case .initialized, .queued: - preconditionFailure("Invalid state") + preconditionFailure("Invalid state: \(self.state)") + case .executing(_, _, .initialized): preconditionFailure("Invalid state: Must have received response head, before this method is called for the first time") + case .executing(let executor, let requestState, .buffering(var buffer, next: .askExecutorForMore)): self.state = .modifying @@ -473,7 +472,7 @@ extension RequestBag.StateMachine { return .doNothing case .finished(error: .none): - preconditionFailure("Invalid state... If no error occured, this must not be called, after the request was finished") + preconditionFailure("Invalid state... If no error occurred, this must not be called, after the request was finished") case .modifying: preconditionFailure() diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests+XCTest.swift b/Tests/AsyncHTTPClientTests/RequestBagTests+XCTest.swift index 308c8dd07..b2919081c 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests+XCTest.swift @@ -32,6 +32,7 @@ extension RequestBagTests { ("testCancelFailsTaskWhenTaskIsQueued", testCancelFailsTaskWhenTaskIsQueued), ("testFailsTaskWhenTaskIsWaitingForMoreFromServer", testFailsTaskWhenTaskIsWaitingForMoreFromServer), ("testHTTPUploadIsCancelledEvenThoughRequestSucceeds", testHTTPUploadIsCancelledEvenThoughRequestSucceeds), + ("testRaceBetweenConnectionCloseAndDemandMoreData", testRaceBetweenConnectionCloseAndDemandMoreData), ] } } diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests.swift b/Tests/AsyncHTTPClientTests/RequestBagTests.swift index 895061e31..d1c6d6c87 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests.swift @@ -399,6 +399,7 @@ final class RequestBagTests: XCTestCase { XCTAssertEqual(executor.nextBodyPart(), .body(.byteBuffer(.init(bytes: 0...3)))) // receive a 301 response immediately. bag.receiveResponseHead(.init(version: .http1_1, status: .movedPermanently)) + XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(())) bag.succeedRequest(.init()) // if we now write our second part of the response this should fail the backpressure promise @@ -407,6 +408,62 @@ final class RequestBagTests: XCTestCase { XCTAssertEqual(delegate.receivedHead?.status, .movedPermanently) XCTAssertNoThrow(try bag.task.futureResult.wait()) } + + func testRaceBetweenConnectionCloseAndDemandMoreData() { + let embeddedEventLoop = EmbeddedEventLoop() + defer { XCTAssertNoThrow(try embeddedEventLoop.syncShutdownGracefully()) } + let logger = Logger(label: "test") + + var maybeRequest: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://swift.org")) + guard let request = maybeRequest else { return XCTFail("Expected to have a request") } + + let delegate = UploadCountingDelegate(eventLoop: embeddedEventLoop) + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: request, + eventLoopPreference: .delegate(on: embeddedEventLoop), + task: .init(eventLoop: embeddedEventLoop, logger: logger), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(30), + requestOptions: .forTests(), + delegate: delegate + )) + guard let bag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag.") } + + let executor = MockRequestExecutor() + bag.willExecuteRequest(executor) + bag.requestHeadSent() + bag.receiveResponseHead(.init(version: .http1_1, status: .ok)) + XCTAssertFalse(executor.signalledDemandForResponseBody) + XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(())) + XCTAssertTrue(executor.signalledDemandForResponseBody) + executor.resetDemandSignal() + + // "foo" is forwarded for consumption. We expect the RequestBag to consume "foo" with the + // delegate and call demandMoreBody afterwards. + XCTAssertEqual(delegate.hitDidReceiveBodyPart, 0) + bag.receiveResponseBodyParts([ByteBuffer(string: "foo")]) + XCTAssertFalse(executor.signalledDemandForResponseBody) + XCTAssertEqual(delegate.hitDidReceiveBodyPart, 1) + XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(())) + XCTAssertTrue(executor.signalledDemandForResponseBody) + executor.resetDemandSignal() + + bag.receiveResponseBodyParts([ByteBuffer(string: "bar")]) + XCTAssertEqual(delegate.hitDidReceiveBodyPart, 2) + + // the remote closes the connection, which leads to more data and a succeed of the request + bag.succeedRequest([ByteBuffer(string: "baz")]) + XCTAssertEqual(delegate.hitDidReceiveBodyPart, 2) + + XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(())) + XCTAssertEqual(delegate.hitDidReceiveBodyPart, 3) + + XCTAssertEqual(delegate.hitDidReceiveResponse, 0) + XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(())) + XCTAssertEqual(delegate.hitDidReceiveResponse, 1) + } } class MockRequestExecutor: HTTPRequestExecutor {