Skip to content

[HTTPConnectionPool] Fix request timer scheduling #438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 87 additions & 66 deletions Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift
Original file line number Diff line number Diff line change
@@ -28,14 +28,13 @@ final class HTTPConnectionPool {
private var _idleTimer = [Connection.ID: Scheduled<Void>]()
/// The connection backoff timeout timers. Protected by the stateLock
private var _backoffTimer = [Connection.ID: Scheduled<Void>]()
/// The request connection timeout timers. Protected by the stateLock
private var _requestTimer = [Request.ID: Scheduled<Void>]()

private static let fallbackConnectTimeout: TimeAmount = .seconds(30)

let key: ConnectionPool.Key

private let timerLock = Lock()
private var _requestTimer = [Request.ID: Scheduled<Void>]()

private var logger: Logger

private let eventLoopGroup: EventLoopGroup
@@ -110,21 +109,64 @@ final class HTTPConnectionPool {
}
}

enum RequestAction {
enum Unlocked {
case executeRequest(Request, Connection)
case executeRequests([Request], Connection)
case failRequest(Request, Error)
case failRequests([Request], Error)
case none
}

enum Locked {
case scheduleRequestTimeout(for: Request, on: EventLoop)
case cancelRequestTimeout(Request.ID)
case cancelRequestTimeouts([Request])
case none
}
}

struct Locked {
var connection: ConnectionAction.Locked
var request: RequestAction.Locked
}

struct Unlocked {
var connection: ConnectionAction.Unlocked
var request: StateMachine.RequestAction
var request: RequestAction.Unlocked
}

var locked: Locked
var unlocked: Unlocked

init(from stateMachineAction: StateMachine.Action) {
self.locked = Locked(connection: .none)
self.unlocked = Unlocked(connection: .none, request: stateMachineAction.request)
self.locked = Locked(connection: .none, request: .none)
self.unlocked = Unlocked(connection: .none, request: .none)

switch stateMachineAction.request {
case .cancelRequestTimeout(let requestID):
self.locked.request = .cancelRequestTimeout(requestID)
case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout):
if cancelTimeout {
self.locked.request = .cancelRequestTimeout(request.id)
}
self.unlocked.request = .executeRequest(request, connection)
case .executeRequestsAndCancelTimeouts(let requests, let connection):
self.locked.request = .cancelRequestTimeouts(requests)
self.unlocked.request = .executeRequests(requests, connection)
case .failRequest(let request, let error, cancelTimeout: let cancelTimeout):
if cancelTimeout {
self.locked.request = .cancelRequestTimeout(request.id)
}
self.unlocked.request = .failRequest(request, error)
case .failRequestsAndCancelTimeouts(let requests, let error):
self.locked.request = .cancelRequestTimeouts(requests)
self.unlocked.request = .failRequests(requests, error)
case .scheduleRequestTimeout(for: let request, on: let eventLoop):
self.locked.request = .scheduleRequestTimeout(for: request, on: eventLoop)
case .none:
break
}

switch stateMachineAction.connection {
case .createConnection(let connectionID, on: let eventLoop):
@@ -154,14 +196,15 @@ final class HTTPConnectionPool {
let unlockedActions = self.stateLock.withLock { () -> Actions.Unlocked in
let stateMachineAction = closure(&self._state)
let poolAction = Actions(from: stateMachineAction)
self.runLockedActions(poolAction.locked)
self.runLockedConnectionAction(poolAction.locked.connection)
self.runLockedRequestAction(poolAction.locked.request)
return poolAction.unlocked
}
self.runUnlockedActions(unlockedActions)
}

private func runLockedActions(_ actions: Actions.Locked) {
switch actions.connection {
private func runLockedConnectionAction(_ action: Actions.ConnectionAction.Locked) {
switch action {
case .scheduleBackoffTimer(let connectionID, backoff: let backoff, on: let eventLoop):
self.scheduleConnectionStartBackoffTimer(connectionID, backoff, on: eventLoop)

@@ -181,6 +224,22 @@ final class HTTPConnectionPool {
}
}

private func runLockedRequestAction(_ action: Actions.RequestAction.Locked) {
switch action {
case .scheduleRequestTimeout(for: let request, on: let eventLoop):
self.scheduleRequestTimeout(request, on: eventLoop)

case .cancelRequestTimeout(let requestID):
self.cancelRequestTimeout(requestID)

case .cancelRequestTimeouts(let requests):
requests.forEach { self.cancelRequestTimeout($0.id) }

case .none:
break
}
}

private func runUnlockedActions(_ actions: Actions.Unlocked) {
self.runUnlockedConnectionAction(actions.connection)
self.runUnlockedRequestAction(actions.request)
@@ -225,38 +284,20 @@ final class HTTPConnectionPool {
}
}

private func runUnlockedRequestAction(_ action: StateMachine.RequestAction) {
// The order of execution fail/execute request vs cancelling the request timeout timer does
// not matter in the actions here. The actions don't cause any side effects that will be
// reported back to the state machine and are not dependent on each other.

private func runUnlockedRequestAction(_ action: Actions.RequestAction.Unlocked) {
switch action {
case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout):
if cancelTimeout {
self.cancelRequestTimeout(request.id)
}
case .executeRequest(let request, let connection):
connection.executeRequest(request.req)

case .executeRequestsAndCancelTimeouts(let requests, let connection):
self.cancelRequestTimeouts(requests)
case .executeRequests(let requests, let connection):
requests.forEach { connection.executeRequest($0.req) }

case .failRequest(let request, let error, cancelTimeout: let cancelTimeout):
if cancelTimeout {
self.cancelRequestTimeout(request.id)
}
case .failRequest(let request, let error):
request.req.fail(error)

case .failRequestsAndCancelTimeouts(let requests, let error):
self.cancelRequestTimeouts(requests)
case .failRequests(let requests, let error):
requests.forEach { $0.req.fail(error) }

case .scheduleRequestTimeout(let request, on: let eventLoop):
self.scheduleRequestTimeout(request, on: eventLoop)

case .cancelRequestTimeout(let requestID):
self.cancelRequestTimeout(requestID)

case .none:
break
}
@@ -282,49 +323,29 @@ final class HTTPConnectionPool {
private func scheduleRequestTimeout(_ request: Request, on eventLoop: EventLoop) {
let requestID = request.id
let scheduled = eventLoop.scheduleTask(deadline: request.connectionDeadline) {
// The timer has fired. Now we need to do a couple of things:
//
// 1. Remove ourselves from the timer dictionary to not leak any data. If our
// waiter entry still exists, we need to tell the state machine, that we want
// to fail the request.
let timeoutFired = self.timerLock.withLock {
self._requestTimer.removeValue(forKey: requestID) != nil
}

// 2. If the entry did not exists anymore, we can assume that the request was
// scheduled on another connection. The timer still fired anyhow because of a
// race. In such a situation we don't need to do anything.
guard timeoutFired else { return }

// 3. Tell the state machine about the timeout
self.modifyStateAndRunActions {
$0.timeoutRequest(requestID)
// there might be a race between a the timeout timer and the pool scheduling the
// request on another thread.
self.modifyStateAndRunActions { stateMachine in
if self._requestTimer.removeValue(forKey: requestID) != nil {
// The timer still exists. State Machines assumes it is alive. Inform state
// machine.
return stateMachine.timeoutRequest(requestID)
}
return .none
}
}

self.timerLock.withLockVoid {
assert(self._requestTimer[requestID] == nil)
self._requestTimer[requestID] = scheduled
}
assert(self._requestTimer[requestID] == nil)
self._requestTimer[requestID] = scheduled

request.req.requestWasQueued(self)
}

private func cancelRequestTimeout(_ id: Request.ID) {
let scheduled = self.timerLock.withLock {
self._requestTimer.removeValue(forKey: id)
guard let cancelTimer = self._requestTimer.removeValue(forKey: id) else {
preconditionFailure("Expected to have a timer for request \(id) at this point.")
}

scheduled?.cancel()
}

private func cancelRequestTimeouts(_ requests: [Request]) {
let scheduled = self.timerLock.withLock {
requests.compactMap {
self._requestTimer.removeValue(forKey: $0.id)
}
}
scheduled.forEach { $0.cancel() }
cancelTimer.cancel()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do these cancels outside the lock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do at a later point, tracked here: #439

}

private func scheduleIdleTimerForConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) {