Skip to content

[RequestBag] Fix consumption error in state machine #441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions Sources/AsyncHTTPClient/RequestBag+StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,9 @@ extension RequestBag.StateMachine {
preconditionFailure("If we have received an error or eof before, why did we get another body part? Next: \(next)")
}

if buffer.isEmpty, newChunks == nil || newChunks!.isEmpty {
self.state = .finished(error: nil)
return .succeedRequest
} else if buffer.isEmpty, let newChunks = newChunks {
if buffer.isEmpty, let newChunks = newChunks, !newChunks.isEmpty {
buffer = newChunks
} else if let newChunks = newChunks {
} else if let newChunks = newChunks, !newChunks.isEmpty {
buffer.append(contentsOf: newChunks)
}

Expand Down Expand Up @@ -433,9 +430,11 @@ extension RequestBag.StateMachine {
private mutating func consumeMoreBodyData() -> ConsumeAction {
switch self.state {
case .initialized, .queued:
preconditionFailure("Invalid state")
preconditionFailure("Invalid state: \(self.state)")

case .executing(_, _, .initialized):
preconditionFailure("Invalid state: Must have received response head, before this method is called for the first time")

case .executing(let executor, let requestState, .buffering(var buffer, next: .askExecutorForMore)):
self.state = .modifying

Expand Down Expand Up @@ -473,7 +472,7 @@ extension RequestBag.StateMachine {
return .doNothing

case .finished(error: .none):
preconditionFailure("Invalid state... If no error occured, this must not be called, after the request was finished")
preconditionFailure("Invalid state... If no error occurred, this must not be called, after the request was finished")

case .modifying:
preconditionFailure()
Expand Down
1 change: 1 addition & 0 deletions Tests/AsyncHTTPClientTests/RequestBagTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extension RequestBagTests {
("testCancelFailsTaskWhenTaskIsQueued", testCancelFailsTaskWhenTaskIsQueued),
("testFailsTaskWhenTaskIsWaitingForMoreFromServer", testFailsTaskWhenTaskIsWaitingForMoreFromServer),
("testHTTPUploadIsCancelledEvenThoughRequestSucceeds", testHTTPUploadIsCancelledEvenThoughRequestSucceeds),
("testRaceBetweenConnectionCloseAndDemandMoreData", testRaceBetweenConnectionCloseAndDemandMoreData),
]
}
}
57 changes: 57 additions & 0 deletions Tests/AsyncHTTPClientTests/RequestBagTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ final class RequestBagTests: XCTestCase {
XCTAssertEqual(executor.nextBodyPart(), .body(.byteBuffer(.init(bytes: 0...3))))
// receive a 301 response immediately.
bag.receiveResponseHead(.init(version: .http1_1, status: .movedPermanently))
XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(()))
bag.succeedRequest(.init())

// if we now write our second part of the response this should fail the backpressure promise
Expand All @@ -407,6 +408,62 @@ final class RequestBagTests: XCTestCase {
XCTAssertEqual(delegate.receivedHead?.status, .movedPermanently)
XCTAssertNoThrow(try bag.task.futureResult.wait())
}

func testRaceBetweenConnectionCloseAndDemandMoreData() {
let embeddedEventLoop = EmbeddedEventLoop()
defer { XCTAssertNoThrow(try embeddedEventLoop.syncShutdownGracefully()) }
let logger = Logger(label: "test")

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://swift.org"))
guard let request = maybeRequest else { return XCTFail("Expected to have a request") }

let delegate = UploadCountingDelegate(eventLoop: embeddedEventLoop)
var maybeRequestBag: RequestBag<UploadCountingDelegate>?
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
request: request,
eventLoopPreference: .delegate(on: embeddedEventLoop),
task: .init(eventLoop: embeddedEventLoop, logger: logger),
redirectHandler: nil,
connectionDeadline: .now() + .seconds(30),
requestOptions: .forTests(),
delegate: delegate
))
guard let bag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag.") }

let executor = MockRequestExecutor()
bag.willExecuteRequest(executor)
bag.requestHeadSent()
bag.receiveResponseHead(.init(version: .http1_1, status: .ok))
XCTAssertFalse(executor.signalledDemandForResponseBody)
XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(()))
XCTAssertTrue(executor.signalledDemandForResponseBody)
executor.resetDemandSignal()

// "foo" is forwarded for consumption. We expect the RequestBag to consume "foo" with the
// delegate and call demandMoreBody afterwards.
XCTAssertEqual(delegate.hitDidReceiveBodyPart, 0)
bag.receiveResponseBodyParts([ByteBuffer(string: "foo")])
XCTAssertFalse(executor.signalledDemandForResponseBody)
XCTAssertEqual(delegate.hitDidReceiveBodyPart, 1)
XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(()))
XCTAssertTrue(executor.signalledDemandForResponseBody)
executor.resetDemandSignal()

bag.receiveResponseBodyParts([ByteBuffer(string: "bar")])
XCTAssertEqual(delegate.hitDidReceiveBodyPart, 2)

// the remote closes the connection, which leads to more data and a succeed of the request
bag.succeedRequest([ByteBuffer(string: "baz")])
XCTAssertEqual(delegate.hitDidReceiveBodyPart, 2)

XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(()))
XCTAssertEqual(delegate.hitDidReceiveBodyPart, 3)

XCTAssertEqual(delegate.hitDidReceiveResponse, 0)
XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(()))
XCTAssertEqual(delegate.hitDidReceiveResponse, 1)
}
}

class MockRequestExecutor: HTTPRequestExecutor {
Expand Down