Skip to content

Report last connection error if request deadline is exceeded #601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,10 @@ extension HTTPConnectionPool {

mutating func cancelRequest(_ requestID: Request.ID) -> Action {
// 1. check requests in queue
if self.requests.remove(requestID) != nil {
if let request = self.requests.remove(requestID) {
let error = self.lastConnectFailure ?? HTTPClientError.cancelled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here, why we look into the lastConnectFailure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory we could check here if we are at max connections... In those cases we could issue a more explicit: deadlineExceededBecauseToQueueDepthToLong error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how useful this kind of error message would be because deadlineExceededBecauseToQueueDepthToLong may only be a symptom as other requests in front could not be scheduled because there was a connection failure.

return .init(
request: .cancelRequestTimeout(requestID),
request: .failRequest(request, error, cancelTimeout: true),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, we remove .cancelRequestTimeout here and in the http2 state machine, I think we can remove this action completely.

connection: .none
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,10 @@ extension HTTPConnectionPool {

mutating func cancelRequest(_ requestID: Request.ID) -> Action {
// 1. check requests in queue
if self.requests.remove(requestID) != nil {
if let request = self.requests.remove(requestID) {
let error = self.lastConnectFailure ?? HTTPClientError.cancelled
return .init(
request: .cancelRequestTimeout(requestID),
request: .failRequest(request, error, cancelTimeout: true),
connection: .none
)
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ public class HTTPClient {
var deadlineSchedule: Scheduled<Void>?
if let deadline = deadline {
deadlineSchedule = taskEL.scheduleTask(deadline: deadline) {
requestBag.fail(HTTPClientError.deadlineExceeded)
requestBag.deadlineExceeded()
}

task.promise.futureResult.whenComplete { _ in
Expand Down
59 changes: 42 additions & 17 deletions Sources/AsyncHTTPClient/RequestBag+StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ extension RequestBag {
fileprivate enum State {
case initialized
case queued(HTTPRequestScheduler)
/// if the deadline was exceeded while in the `.queued(_:)` state, we wait until the request pool fails the request with a potential more descriptive error message if a connection failure has occured while the request was queued.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: can we wrap the comment here? I think normally I wrapped at about 100 characters.

case deadlineExceededWhileQueued
case executing(HTTPRequestExecutor, RequestStreamState, ResponseStreamState)
case finished(error: Error?)
case redirected(HTTPRequestExecutor, Int, HTTPResponseHead, URL)
Expand Down Expand Up @@ -95,7 +97,7 @@ extension RequestBag.StateMachine {
case .initialized, .queued:
self.state = .executing(executor, .initialized, .initialized)
return true
case .finished(error: .some):
case .finished(error: .some), .deadlineExceededWhileQueued:
return false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to fail the request here explicitly...

case .executing, .redirected, .finished(error: .none), .modifying:
preconditionFailure("Invalid state: \(self.state)")
Expand All @@ -110,7 +112,7 @@ extension RequestBag.StateMachine {

mutating func resumeRequestBodyStream() -> ResumeProducingAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("A request stream can only be resumed, if the request was started")

case .executing(let executor, .initialized, .initialized):
Expand Down Expand Up @@ -150,7 +152,7 @@ extension RequestBag.StateMachine {

mutating func pauseRequestBodyStream() {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("A request stream can only be paused, if the request was started")
case .executing(let executor, let requestState, let responseState):
switch requestState {
Expand Down Expand Up @@ -185,7 +187,7 @@ extension RequestBag.StateMachine {

mutating func writeNextRequestPart(_ part: IOData, taskEventLoop: EventLoop) -> WriteAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("Invalid state: \(self.state)")
case .executing(let executor, let requestState, let responseState):
switch requestState {
Expand Down Expand Up @@ -231,7 +233,7 @@ extension RequestBag.StateMachine {

mutating func finishRequestBodyStream(_ result: Result<Void, Error>) -> FinishAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("Invalid state: \(self.state)")
case .executing(let executor, let requestState, let responseState):
switch requestState {
Expand Down Expand Up @@ -282,7 +284,7 @@ extension RequestBag.StateMachine {
/// - Returns: Whether the response should be forwarded to the delegate. Will be `false` if the request follows a redirect.
mutating func receiveResponseHead(_ head: HTTPResponseHead) -> ReceiveResponseHeadAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("How can we receive a response, if the request hasn't started yet.")
case .executing(let executor, let requestState, let responseState):
guard case .initialized = responseState else {
Expand Down Expand Up @@ -328,7 +330,7 @@ extension RequestBag.StateMachine {

mutating func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>) -> ReceiveResponseBodyAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("How can we receive a response body part, if the request hasn't started yet.")
case .executing(_, _, .initialized):
preconditionFailure("If we receive a response body, we must have received a head before")
Expand Down Expand Up @@ -385,7 +387,7 @@ extension RequestBag.StateMachine {

mutating func succeedRequest(_ newChunks: CircularBuffer<ByteBuffer>?) -> ReceiveResponseEndAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("How can we receive a response body part, if the request hasn't started yet.")
case .executing(_, _, .initialized):
preconditionFailure("If we receive a response body, we must have received a head before")
Expand Down Expand Up @@ -447,7 +449,7 @@ extension RequestBag.StateMachine {

private mutating func failWithConsumptionError(_ error: Error) -> ConsumeAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("Invalid state: \(self.state)")
case .executing(_, _, .initialized):
preconditionFailure("Invalid state: Must have received response head, before this method is called for the first time")
Expand Down Expand Up @@ -482,7 +484,7 @@ extension RequestBag.StateMachine {

private mutating func consumeMoreBodyData() -> ConsumeAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("Invalid state: \(self.state)")

case .executing(_, _, .initialized):
Expand Down Expand Up @@ -532,8 +534,23 @@ extension RequestBag.StateMachine {
}
}

enum DeadlineExceededAction {
case cancelScheduler(HTTPRequestScheduler?)
case fail(FailAction)
}

mutating func deadlineExceeded() -> DeadlineExceededAction {
switch self.state {
case .queued(let queuer):
self.state = .deadlineExceededWhileQueued
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love a comment here, why we do this dance. Please also make explicit that we depend on the scheduler to fail this request. What happens, however if we think we are still queued, but we aren't actually? There might be a race here:

  1. request hit's request's deadline (thread a)
  2. connection becomes available (thread b)
  3. request is moved from pool queue to connection (thread b)
  4. request depends on pool to be cancelled (thread a), but pool ignores request since it is on the connection already

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this we def. need a test case as well.

return .cancelScheduler(queuer)
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make all the other cases explicit here?

return .fail(self.fail(HTTPClientError.deadlineExceeded))
}
}

enum FailAction {
case failTask(HTTPRequestScheduler?, HTTPRequestExecutor?)
case failTask(Error, HTTPRequestScheduler?, HTTPRequestExecutor?)
case cancelExecutor(HTTPRequestExecutor)
case none
}
Expand All @@ -542,31 +559,39 @@ extension RequestBag.StateMachine {
switch self.state {
case .initialized:
self.state = .finished(error: error)
return .failTask(nil, nil)
return .failTask(error, nil, nil)
case .queued(let queuer):
self.state = .finished(error: error)
return .failTask(queuer, nil)
return .failTask(error, queuer, nil)
case .executing(let executor, let requestState, .buffering(_, next: .eof)):
self.state = .executing(executor, requestState, .buffering(.init(), next: .error(error)))
return .cancelExecutor(executor)
case .executing(let executor, _, .buffering(_, next: .askExecutorForMore)):
self.state = .finished(error: error)
return .failTask(nil, executor)
return .failTask(error, nil, executor)
case .executing(let executor, _, .buffering(_, next: .error(_))):
// this would override another error, let's keep the first one
return .cancelExecutor(executor)
case .executing(let executor, _, .initialized):
self.state = .finished(error: error)
return .failTask(nil, executor)
return .failTask(error, nil, executor)
case .executing(let executor, _, .waitingForRemote):
self.state = .finished(error: error)
return .failTask(nil, executor)
return .failTask(error, nil, executor)
case .redirected:
self.state = .finished(error: error)
return .failTask(nil, nil)
return .failTask(error, nil, nil)
case .finished(.none):
// An error occurred after the request has finished. Ignore...
return .none
case .deadlineExceededWhileQueued:
// if we just get a `HTTPClientError.cancelled` we can use the orignal cancelation reason
// to give a more descriptive error to the user.
if (error as? HTTPClientError) == .cancelled {
return .failTask(HTTPClientError.deadlineExceeded, nil, nil)
}
// otherwise we already had an intermidate connection error which we should present to the user instead
return .failTask(error, nil, nil)
case .finished(.some(_)):
// this might happen, if the stream consumer has failed... let's just drop the data
return .none
Expand Down
36 changes: 28 additions & 8 deletions Sources/AsyncHTTPClient/RequestBag.swift
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,12 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {

let action = self.state.fail(error)

self.executeFailAction0(action)
}

private func executeFailAction0(_ action: RequestBag<Delegate>.StateMachine.FailAction) {
switch action {
case .failTask(let scheduler, let executor):
case .failTask(let error, let scheduler, let executor):
scheduler?.cancelRequest(self)
executor?.cancelRequest(self)
self.failTask0(error)
Expand All @@ -331,6 +335,28 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
break
}
}

func deadlineExceeded0() {
self.task.eventLoop.assertInEventLoop()
let action = self.state.deadlineExceeded()

switch action {
case .cancelScheduler(let scheduler):
scheduler?.cancelRequest(self)
case .fail(let failAction):
self.executeFailAction0(failAction)
}
}

func deadlineExceeded() {
if self.task.eventLoop.inEventLoop {
self.deadlineExceeded0()
} else {
self.task.eventLoop.execute {
self.deadlineExceeded0()
}
}
}
}

extension RequestBag: HTTPSchedulableRequest {
Expand Down Expand Up @@ -457,12 +483,6 @@ extension RequestBag: HTTPExecutableRequest {

extension RequestBag: HTTPClientTaskDelegate {
func cancel() {
if self.task.eventLoop.inEventLoop {
self.fail0(HTTPClientError.cancelled)
} else {
self.task.eventLoop.execute {
self.fail0(HTTPClientError.cancelled)
}
}
self.fail(HTTPClientError.cancelled)
}
}
1 change: 1 addition & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ extension HTTPClientTests {
("testStressGetHttps", testStressGetHttps),
("testStressGetHttpsSSLError", testStressGetHttpsSSLError),
("testSelfSignedCertificateIsRejectedWithCorrectError", testSelfSignedCertificateIsRejectedWithCorrectError),
("testSelfSignedCertificateIsRejectedWithCorrectErrorIfRequestDeadlineIsExceeded", testSelfSignedCertificateIsRejectedWithCorrectErrorIfRequestDeadlineIsExceeded),
("testFailingConnectionIsReleased", testFailingConnectionIsReleased),
("testResponseDelayGet", testResponseDelayGet),
("testIdleTimeoutNoReuse", testIdleTimeoutNoReuse),
Expand Down
41 changes: 41 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,47 @@ class HTTPClientTests: XCTestCase {
}
}

func testSelfSignedCertificateIsRejectedWithCorrectErrorIfRequestDeadlineIsExceeded() throws {
/// key + cert was created with the follwing command:
/// openssl req -x509 -newkey rsa:4096 -keyout self_signed_key.pem -out self_signed_cert.pem -sha256 -days 99999 -nodes -subj '/CN=localhost'
let certPath = Bundle.module.path(forResource: "self_signed_cert", ofType: "pem")!
let keyPath = Bundle.module.path(forResource: "self_signed_key", ofType: "pem")!
let configuration = TLSConfiguration.makeServerConfiguration(
certificateChain: try NIOSSLCertificate.fromPEMFile(certPath).map { .certificate($0) },
privateKey: .file(keyPath)
)
let sslContext = try NIOSSLContext(configuration: configuration)

let server = ServerBootstrap(group: serverGroup)
.childChannelInitializer { channel in
channel.pipeline.addHandler(NIOSSLServerHandler(context: sslContext))
}
let serverChannel = try server.bind(host: "localhost", port: 0).wait()
defer { XCTAssertNoThrow(try serverChannel.close().wait()) }
let port = serverChannel.localAddress!.port!

var config = HTTPClient.Configuration()
config.timeout.connect = .seconds(3)
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup), configuration: config)
defer { XCTAssertNoThrow(try localClient.syncShutdown()) }

XCTAssertThrowsError(try localClient.get(url: "https://localhost:\(port)", deadline: .now() + .seconds(2)).wait()) { error in
#if canImport(Network)
guard let nwTLSError = error as? HTTPClient.NWTLSError else {
XCTFail("could not cast \(error) of type \(type(of: error)) to \(HTTPClient.NWTLSError.self)")
return
}
XCTAssertEqual(nwTLSError.status, errSSLBadCert, "unexpected tls error: \(nwTLSError)")
#else
guard let sslError = error as? NIOSSLError,
case .handshakeFailed(.sslError) = sslError else {
XCTFail("unexpected error \(error)")
return
}
#endif
}
}

func testFailingConnectionIsReleased() {
let localHTTPBin = HTTPBin(.refuse)
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import XCTest

class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase {
func testCreatingAndFailingConnections() {
struct SomeError: Error, Equatable {}
let elg = EmbeddedEventLoopGroup(loops: 4)
defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) }

Expand Down Expand Up @@ -65,8 +66,6 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase {

// fail all connection attempts
while let randomConnectionID = connections.randomStartingConnection() {
struct SomeError: Error, Equatable {}

XCTAssertNoThrow(try connections.failConnectionCreation(randomConnectionID))
let action = state.failedToCreateNewConnection(SomeError(), connectionID: randomConnectionID)

Expand All @@ -86,9 +85,9 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase {

// cancel all queued requests
while let request = queuer.timeoutRandomRequest() {
let cancelAction = state.cancelRequest(request)
let cancelAction = state.cancelRequest(request.0)
XCTAssertEqual(cancelAction.connection, .none)
XCTAssertEqual(cancelAction.request, .cancelRequestTimeout(request))
XCTAssertEqual(cancelAction.request, .failRequest(.init(request.1), SomeError(), cancelTimeout: true))
}

// connection backoff done
Expand Down Expand Up @@ -184,7 +183,7 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase {
// 2. cancel request

let cancelAction = state.cancelRequest(request.id)
XCTAssertEqual(cancelAction.request, .cancelRequestTimeout(request.id))
XCTAssertEqual(cancelAction.request, .failRequest(request, HTTPClientError.cancelled, cancelTimeout: true))
XCTAssertEqual(cancelAction.connection, .none)

// 3. request timeout triggers to late
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {

// 2. cancel request
let cancelAction = state.cancelRequest(request.id)
XCTAssertEqual(cancelAction.request, .cancelRequestTimeout(request.id))
XCTAssertEqual(cancelAction.request, .failRequest(request, HTTPClientError.cancelled, cancelTimeout: true))
XCTAssertEqual(cancelAction.connection, .none)

// 3. request timeout triggers to late
Expand Down Expand Up @@ -1242,9 +1242,9 @@ func XCTAssertEqualTypeAndValue<Left, Right: Equatable>(
let lhs = try lhs()
let rhs = try rhs()
guard let lhsAsRhs = lhs as? Right else {
XCTFail("could not cast \(lhs) of type \(type(of: lhs)) to \(type(of: rhs))")
XCTFail("could not cast \(lhs) of type \(type(of: lhs)) to \(type(of: rhs))", file: file, line: line)
return
}
XCTAssertEqual(lhsAsRhs, rhs)
XCTAssertEqual(lhsAsRhs, rhs, file: file, line: line)
}(), file: file, line: line)
}
8 changes: 4 additions & 4 deletions Tests/AsyncHTTPClientTests/Mocks/MockRequestQueuer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ struct MockRequestQueuer {
return waiter.request
}

mutating func timeoutRandomRequest() -> RequestID? {
guard let waiterID = self.waiters.randomElement().map(\.0) else {
mutating func timeoutRandomRequest() -> (RequestID, HTTPSchedulableRequest)? {
guard let waiter = self.waiters.randomElement() else {
return nil
}
self.waiters.removeValue(forKey: waiterID)
return waiterID
self.waiters.removeValue(forKey: waiter.key)
return (waiter.key, waiter.value.request)
}
}