Skip to content

Report last connection error if request deadline is exceeded #601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ final class HTTPConnectionPool {
self.unlocked = Unlocked(connection: .none, request: .none)

switch stateMachineAction.request {
case .cancelRequestTimeout(let requestID):
self.locked.request = .cancelRequestTimeout(requestID)
case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout):
if cancelTimeout {
self.locked.request = .cancelRequestTimeout(request.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,11 @@ extension HTTPConnectionPool {

mutating func cancelRequest(_ requestID: Request.ID) -> Action {
// 1. check requests in queue
if self.requests.remove(requestID) != nil {
if let request = self.requests.remove(requestID) {
// Use the last connection error to let the user know why the request was never scheduled
let error = self.lastConnectFailure ?? HTTPClientError.cancelled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here, why we look into the lastConnectFailure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory we could check here if we are at max connections... In those cases we could issue a more explicit: deadlineExceededBecauseToQueueDepthToLong error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how useful this kind of error message would be because deadlineExceededBecauseToQueueDepthToLong may only be a symptom as other requests in front could not be scheduled because there was a connection failure.

return .init(
request: .cancelRequestTimeout(requestID),
request: .failRequest(request, error, cancelTimeout: true),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, we remove .cancelRequestTimeout here and in the http2 state machine, I think we can remove this action completely.

connection: .none
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,11 @@ extension HTTPConnectionPool {

mutating func cancelRequest(_ requestID: Request.ID) -> Action {
// 1. check requests in queue
if self.requests.remove(requestID) != nil {
if let request = self.requests.remove(requestID) {
// Use the last connection error to let the user know why the request was never scheduled
let error = self.lastConnectFailure ?? HTTPClientError.cancelled
return .init(
request: .cancelRequestTimeout(requestID),
request: .failRequest(request, error, cancelTimeout: true),
connection: .none
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ extension HTTPConnectionPool {
case failRequestsAndCancelTimeouts([Request], Error)

case scheduleRequestTimeout(for: Request, on: EventLoop)
case cancelRequestTimeout(Request.ID)

case none
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ public class HTTPClient {
var deadlineSchedule: Scheduled<Void>?
if let deadline = deadline {
deadlineSchedule = taskEL.scheduleTask(deadline: deadline) {
requestBag.fail(HTTPClientError.deadlineExceeded)
requestBag.deadlineExceeded()
}

task.promise.futureResult.whenComplete { _ in
Expand Down
85 changes: 66 additions & 19 deletions Sources/AsyncHTTPClient/RequestBag+StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ extension RequestBag {
fileprivate enum State {
case initialized
case queued(HTTPRequestScheduler)
/// if the deadline was exceeded while in the `.queued(_:)` state,
/// we wait until the request pool fails the request with a potential more descriptive error message,
/// if a connection failure has occured while the request was queued.
case deadlineExceededWhileQueued
case executing(HTTPRequestExecutor, RequestStreamState, ResponseStreamState)
case finished(error: Error?)
case redirected(HTTPRequestExecutor, Int, HTTPResponseHead, URL)
Expand Down Expand Up @@ -90,13 +94,23 @@ extension RequestBag.StateMachine {
self.state = .queued(scheduler)
}

mutating func willExecuteRequest(_ executor: HTTPRequestExecutor) -> Bool {
enum WillExecuteRequestAction {
case cancelExecuter(HTTPRequestExecutor)
case failTaskAndCancelExecutor(Error, HTTPRequestExecutor)
case none
}

mutating func willExecuteRequest(_ executor: HTTPRequestExecutor) -> WillExecuteRequestAction {
switch self.state {
case .initialized, .queued:
self.state = .executing(executor, .initialized, .initialized)
return true
return .none
case .deadlineExceededWhileQueued:
let error: Error = HTTPClientError.deadlineExceeded
self.state = .finished(error: error)
return .failTaskAndCancelExecutor(error, executor)
case .finished(error: .some):
return false
return .cancelExecuter(executor)
case .executing, .redirected, .finished(error: .none), .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
Expand All @@ -110,7 +124,7 @@ extension RequestBag.StateMachine {

mutating func resumeRequestBodyStream() -> ResumeProducingAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("A request stream can only be resumed, if the request was started")

case .executing(let executor, .initialized, .initialized):
Expand Down Expand Up @@ -150,7 +164,7 @@ extension RequestBag.StateMachine {

mutating func pauseRequestBodyStream() {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("A request stream can only be paused, if the request was started")
case .executing(let executor, let requestState, let responseState):
switch requestState {
Expand Down Expand Up @@ -185,7 +199,7 @@ extension RequestBag.StateMachine {

mutating func writeNextRequestPart(_ part: IOData, taskEventLoop: EventLoop) -> WriteAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("Invalid state: \(self.state)")
case .executing(let executor, let requestState, let responseState):
switch requestState {
Expand Down Expand Up @@ -231,7 +245,7 @@ extension RequestBag.StateMachine {

mutating func finishRequestBodyStream(_ result: Result<Void, Error>) -> FinishAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("Invalid state: \(self.state)")
case .executing(let executor, let requestState, let responseState):
switch requestState {
Expand Down Expand Up @@ -282,7 +296,7 @@ extension RequestBag.StateMachine {
/// - Returns: Whether the response should be forwarded to the delegate. Will be `false` if the request follows a redirect.
mutating func receiveResponseHead(_ head: HTTPResponseHead) -> ReceiveResponseHeadAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("How can we receive a response, if the request hasn't started yet.")
case .executing(let executor, let requestState, let responseState):
guard case .initialized = responseState else {
Expand Down Expand Up @@ -328,7 +342,7 @@ extension RequestBag.StateMachine {

mutating func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>) -> ReceiveResponseBodyAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("How can we receive a response body part, if the request hasn't started yet.")
case .executing(_, _, .initialized):
preconditionFailure("If we receive a response body, we must have received a head before")
Expand Down Expand Up @@ -385,7 +399,7 @@ extension RequestBag.StateMachine {

mutating func succeedRequest(_ newChunks: CircularBuffer<ByteBuffer>?) -> ReceiveResponseEndAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("How can we receive a response body part, if the request hasn't started yet.")
case .executing(_, _, .initialized):
preconditionFailure("If we receive a response body, we must have received a head before")
Expand Down Expand Up @@ -447,7 +461,7 @@ extension RequestBag.StateMachine {

private mutating func failWithConsumptionError(_ error: Error) -> ConsumeAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("Invalid state: \(self.state)")
case .executing(_, _, .initialized):
preconditionFailure("Invalid state: Must have received response head, before this method is called for the first time")
Expand Down Expand Up @@ -482,7 +496,7 @@ extension RequestBag.StateMachine {

private mutating func consumeMoreBodyData() -> ConsumeAction {
switch self.state {
case .initialized, .queued:
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("Invalid state: \(self.state)")

case .executing(_, _, .initialized):
Expand Down Expand Up @@ -532,8 +546,33 @@ extension RequestBag.StateMachine {
}
}

enum DeadlineExceededAction {
case cancelScheduler(HTTPRequestScheduler?)
case fail(FailAction)
}

mutating func deadlineExceeded() -> DeadlineExceededAction {
switch self.state {
case .queued(let queuer):
/// We do not fail the request immediately because we want to give the scheduler a chance of throwing a better error message
/// We therefore depend on the scheduler failing the request after we cancel the request.
self.state = .deadlineExceededWhileQueued
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love a comment here, why we do this dance. Please also make explicit that we depend on the scheduler to fail this request. What happens, however if we think we are still queued, but we aren't actually? There might be a race here:

  1. request hit's request's deadline (thread a)
  2. connection becomes available (thread b)
  3. request is moved from pool queue to connection (thread b)
  4. request depends on pool to be cancelled (thread a), but pool ignores request since it is on the connection already

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this we def. need a test case as well.

return .cancelScheduler(queuer)

case .initialized,
.deadlineExceededWhileQueued,
.executing,
.finished,
.redirected,
.modifying:
/// if we are not in the queued state, we can fail early by just calling down to `self.fail(_:)`
/// which does the appropriate state transition for us.
return .fail(self.fail(HTTPClientError.deadlineExceeded))
}
}

enum FailAction {
case failTask(HTTPRequestScheduler?, HTTPRequestExecutor?)
case failTask(Error, HTTPRequestScheduler?, HTTPRequestExecutor?)
case cancelExecutor(HTTPRequestExecutor)
case none
}
Expand All @@ -542,31 +581,39 @@ extension RequestBag.StateMachine {
switch self.state {
case .initialized:
self.state = .finished(error: error)
return .failTask(nil, nil)
return .failTask(error, nil, nil)
case .queued(let queuer):
self.state = .finished(error: error)
return .failTask(queuer, nil)
return .failTask(error, queuer, nil)
case .executing(let executor, let requestState, .buffering(_, next: .eof)):
self.state = .executing(executor, requestState, .buffering(.init(), next: .error(error)))
return .cancelExecutor(executor)
case .executing(let executor, _, .buffering(_, next: .askExecutorForMore)):
self.state = .finished(error: error)
return .failTask(nil, executor)
return .failTask(error, nil, executor)
case .executing(let executor, _, .buffering(_, next: .error(_))):
// this would override another error, let's keep the first one
return .cancelExecutor(executor)
case .executing(let executor, _, .initialized):
self.state = .finished(error: error)
return .failTask(nil, executor)
return .failTask(error, nil, executor)
case .executing(let executor, _, .waitingForRemote):
self.state = .finished(error: error)
return .failTask(nil, executor)
return .failTask(error, nil, executor)
case .redirected:
self.state = .finished(error: error)
return .failTask(nil, nil)
return .failTask(error, nil, nil)
case .finished(.none):
// An error occurred after the request has finished. Ignore...
return .none
case .deadlineExceededWhileQueued:
// if we just get a `HTTPClientError.cancelled` we can use the original cancellation reason
// to give a more descriptive error to the user.
if (error as? HTTPClientError) == .cancelled {
return .failTask(HTTPClientError.deadlineExceeded, nil, nil)
}
// otherwise we already had an intermediate connection error which we should present to the user instead
return .failTask(error, nil, nil)
case .finished(.some(_)):
// this might happen, if the stream consumer has failed... let's just drop the data
return .none
Expand Down
48 changes: 38 additions & 10 deletions Sources/AsyncHTTPClient/RequestBag.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,16 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {

private func willExecuteRequest0(_ executor: HTTPRequestExecutor) {
self.task.eventLoop.assertInEventLoop()
if !self.state.willExecuteRequest(executor) {
return executor.cancelRequest(self)
let action = self.state.willExecuteRequest(executor)
switch action {
case .cancelExecuter(let executor):
executor.cancelRequest(self)
case .failTaskAndCancelExecutor(let error, let executor):
self.delegate.didReceiveError(task: self.task, error)
self.task.fail(with: error, delegateType: Delegate.self)
executor.cancelRequest(self)
case .none:
break
}
}

Expand Down Expand Up @@ -320,8 +328,12 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {

let action = self.state.fail(error)

self.executeFailAction0(action)
}

private func executeFailAction0(_ action: RequestBag<Delegate>.StateMachine.FailAction) {
switch action {
case .failTask(let scheduler, let executor):
case .failTask(let error, let scheduler, let executor):
scheduler?.cancelRequest(self)
executor?.cancelRequest(self)
self.failTask0(error)
Expand All @@ -331,6 +343,28 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
break
}
}

func deadlineExceeded0() {
self.task.eventLoop.assertInEventLoop()
let action = self.state.deadlineExceeded()

switch action {
case .cancelScheduler(let scheduler):
scheduler?.cancelRequest(self)
case .fail(let failAction):
self.executeFailAction0(failAction)
}
}

func deadlineExceeded() {
if self.task.eventLoop.inEventLoop {
self.deadlineExceeded0()
} else {
self.task.eventLoop.execute {
self.deadlineExceeded0()
}
}
}
}

extension RequestBag: HTTPSchedulableRequest {
Expand Down Expand Up @@ -457,12 +491,6 @@ extension RequestBag: HTTPExecutableRequest {

extension RequestBag: HTTPClientTaskDelegate {
func cancel() {
if self.task.eventLoop.inEventLoop {
self.fail0(HTTPClientError.cancelled)
} else {
self.task.eventLoop.execute {
self.fail0(HTTPClientError.cancelled)
}
}
self.fail(HTTPClientError.cancelled)
}
}
1 change: 1 addition & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ extension HTTPClientTests {
("testStressGetHttps", testStressGetHttps),
("testStressGetHttpsSSLError", testStressGetHttpsSSLError),
("testSelfSignedCertificateIsRejectedWithCorrectError", testSelfSignedCertificateIsRejectedWithCorrectError),
("testSelfSignedCertificateIsRejectedWithCorrectErrorIfRequestDeadlineIsExceeded", testSelfSignedCertificateIsRejectedWithCorrectErrorIfRequestDeadlineIsExceeded),
("testFailingConnectionIsReleased", testFailingConnectionIsReleased),
("testResponseDelayGet", testResponseDelayGet),
("testIdleTimeoutNoReuse", testIdleTimeoutNoReuse),
Expand Down
41 changes: 41 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,47 @@ class HTTPClientTests: XCTestCase {
}
}

func testSelfSignedCertificateIsRejectedWithCorrectErrorIfRequestDeadlineIsExceeded() throws {
/// key + cert was created with the follwing command:
/// openssl req -x509 -newkey rsa:4096 -keyout self_signed_key.pem -out self_signed_cert.pem -sha256 -days 99999 -nodes -subj '/CN=localhost'
let certPath = Bundle.module.path(forResource: "self_signed_cert", ofType: "pem")!
let keyPath = Bundle.module.path(forResource: "self_signed_key", ofType: "pem")!
let configuration = TLSConfiguration.makeServerConfiguration(
certificateChain: try NIOSSLCertificate.fromPEMFile(certPath).map { .certificate($0) },
privateKey: .file(keyPath)
)
let sslContext = try NIOSSLContext(configuration: configuration)

let server = ServerBootstrap(group: serverGroup)
.childChannelInitializer { channel in
channel.pipeline.addHandler(NIOSSLServerHandler(context: sslContext))
}
let serverChannel = try server.bind(host: "localhost", port: 0).wait()
defer { XCTAssertNoThrow(try serverChannel.close().wait()) }
let port = serverChannel.localAddress!.port!

var config = HTTPClient.Configuration()
config.timeout.connect = .seconds(3)
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup), configuration: config)
defer { XCTAssertNoThrow(try localClient.syncShutdown()) }

XCTAssertThrowsError(try localClient.get(url: "https://localhost:\(port)", deadline: .now() + .seconds(2)).wait()) { error in
#if canImport(Network)
guard let nwTLSError = error as? HTTPClient.NWTLSError else {
XCTFail("could not cast \(error) of type \(type(of: error)) to \(HTTPClient.NWTLSError.self)")
return
}
XCTAssertEqual(nwTLSError.status, errSSLBadCert, "unexpected tls error: \(nwTLSError)")
#else
guard let sslError = error as? NIOSSLError,
case .handshakeFailed(.sslError) = sslError else {
XCTFail("unexpected error \(error)")
return
}
#endif
}
}

func testFailingConnectionIsReleased() {
let localHTTPBin = HTTPBin(.refuse)
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
Expand Down
Loading