Skip to content

Commit 7deafde

Browse files
committed
[ConnectionPool] Split up locked and unlocked actions
# Conflicts: # Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift
1 parent b8e2efe commit 7deafde

File tree

1 file changed

+132
-73
lines changed

1 file changed

+132
-73
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

+132-73
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,17 @@ final class HTTPConnectionPool {
131131

132132
private let stateLock = Lock()
133133
private var _state: StateMachine
134+
/// The connection idle timeout timers. Protected by the stateLock
135+
private var _idleTimer = [Connection.ID: Scheduled<Void>]()
136+
/// The connection backoff timeout timers. Protected by the stateLock
137+
private var _backoffTimer = [Connection.ID: Scheduled<Void>]()
134138

135139
private static let fallbackConnectTimeout: TimeAmount = .seconds(30)
136140

137141
let key: ConnectionPool.Key
138142

139143
private let timerLock = Lock()
140144
private var _requestTimer = [Request.ID: Scheduled<Void>]()
141-
private var _idleTimer = [Connection.ID: Scheduled<Void>]()
142-
private var _backoffTimer = [Connection.ID: Scheduled<Void>]()
143145

144146
private var logger: Logger
145147

@@ -182,33 +184,90 @@ final class HTTPConnectionPool {
182184
}
183185

184186
func executeRequest(_ request: HTTPSchedulableRequest) {
185-
let action = self.stateLock.withLock { () -> StateMachine.Action in
186-
self._state.executeRequest(.init(request))
187-
}
188-
self.run(action: action)
187+
self.modifyStateAndRunActions { $0.executeRequest(.init(request)) }
189188
}
190189

191190
func shutdown() {
192191
self.logger.debug("Shutting down connection pool")
193-
let action = self.stateLock.withLock { () -> StateMachine.Action in
194-
self._state.shutdown()
192+
self.modifyStateAndRunActions { $0.shutdown() }
193+
}
194+
195+
// MARK: - Private Methods -
196+
197+
// MARK: Actions
198+
199+
///
200+
private struct Actions {
201+
enum ConnectionAction {
202+
enum Unlocked {
203+
case createConnection(Connection.ID, on: EventLoop)
204+
case closeConnection(Connection, isShutdown: StateMachine.ConnectionAction.IsShutdown)
205+
case cleanupConnections(CleanupContext, isShutdown: StateMachine.ConnectionAction.IsShutdown)
206+
case none
207+
}
208+
209+
enum Locked {
210+
case scheduleBackoffTimer(Connection.ID, backoff: TimeAmount, on: EventLoop)
211+
case cancelBackoffTimers([Connection.ID])
212+
case scheduleTimeoutTimer(Connection.ID, on: EventLoop)
213+
case cancelTimeoutTimer(Connection.ID)
214+
case none
215+
}
216+
}
217+
218+
struct Locked {
219+
var connection: ConnectionAction.Locked
220+
}
221+
222+
struct Unlocked {
223+
var connection: ConnectionAction.Unlocked
224+
var request: StateMachine.RequestAction
225+
}
226+
227+
var locked: Locked
228+
var unlocked: Unlocked
229+
230+
init(from stateMachineAction: StateMachine.Action) {
231+
self.locked = Locked(connection: .none)
232+
self.unlocked = Unlocked(connection: .none, request: stateMachineAction.request)
233+
234+
switch stateMachineAction.connection {
235+
case .createConnection(let connectionID, on: let eventLoop):
236+
self.unlocked.connection = .createConnection(connectionID, on: eventLoop)
237+
case .scheduleBackoffTimer(let connectionID, backoff: let backoff, on: let eventLoop):
238+
self.locked.connection = .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop)
239+
case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
240+
self.locked.connection = .scheduleTimeoutTimer(connectionID, on: eventLoop)
241+
case .cancelTimeoutTimer(let connectionID):
242+
self.locked.connection = .cancelTimeoutTimer(connectionID)
243+
case .closeConnection(let connection, isShutdown: let isShutdown):
244+
self.unlocked.connection = .closeConnection(connection, isShutdown: isShutdown)
245+
case .cleanupConnections(var cleanupContext, isShutdown: let isShutdown):
246+
//
247+
self.locked.connection = .cancelBackoffTimers(cleanupContext.connectBackoff)
248+
cleanupContext.connectBackoff = []
249+
self.unlocked.connection = .cleanupConnections(cleanupContext, isShutdown: isShutdown)
250+
case .none:
251+
break
252+
}
195253
}
196-
self.run(action: action)
197254
}
198255

199256
// MARK: Run actions
200257

201-
private func run(action: StateMachine.Action) {
202-
self.runConnectionAction(action.connection)
203-
self.runRequestAction(action.request)
258+
private func modifyStateAndRunActions(_ closure: (inout StateMachine) -> StateMachine.Action) {
259+
let unlockedActions = self.stateLock.withLock { () -> Actions.Unlocked in
260+
let stateMachineAction = closure(&self._state)
261+
let poolAction = Actions(from: stateMachineAction)
262+
self.runLockedActions(poolAction.locked)
263+
return poolAction.unlocked
264+
}
265+
self.runUnlockedActions(unlockedActions)
204266
}
205267

206-
private func runConnectionAction(_ action: StateMachine.ConnectionAction) {
207-
switch action {
208-
case .createConnection(let connectionID, let eventLoop):
209-
self.createConnection(connectionID, on: eventLoop)
210-
211-
case .scheduleBackoffTimer(let connectionID, let backoff, on: let eventLoop):
268+
private func runLockedActions(_ actions: Actions.Locked) {
269+
switch actions.connection {
270+
case .scheduleBackoffTimer(let connectionID, backoff: let backoff, on: let eventLoop):
212271
self.scheduleConnectionStartBackoffTimer(connectionID, backoff, on: eventLoop)
213272

214273
case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
@@ -217,6 +276,26 @@ final class HTTPConnectionPool {
217276
case .cancelTimeoutTimer(let connectionID):
218277
self.cancelIdleTimerForConnection(connectionID)
219278

279+
case .cancelBackoffTimers(let connectionIDs):
280+
for connectionID in connectionIDs {
281+
self.cancelConnectionStartBackoffTimer(connectionID)
282+
}
283+
284+
case .none:
285+
break
286+
}
287+
}
288+
289+
private func runUnlockedActions(_ actions: Actions.Unlocked) {
290+
self.runUnlockedConnectionAction(actions.connection)
291+
self.runUnlockedRequestAction(actions.request)
292+
}
293+
294+
private func runUnlockedConnectionAction(_ action: Actions.ConnectionAction.Unlocked) {
295+
switch action {
296+
case .createConnection(let connectionID, let eventLoop):
297+
self.createConnection(connectionID, on: eventLoop)
298+
220299
case .closeConnection(let connection, isShutdown: let isShutdown):
221300
self.logger.trace("close connection", metadata: [
222301
"ahc-connection-id": "\(connection.id)",
@@ -251,7 +330,7 @@ final class HTTPConnectionPool {
251330
}
252331
}
253332

254-
private func runRequestAction(_ action: StateMachine.RequestAction) {
333+
private func runUnlockedRequestAction(_ action: StateMachine.RequestAction) {
255334
// The order of execution fail/execute request vs cancelling the request timeout timer does
256335
// not matter in the actions here. The actions don't cause any side effects that will be
257336
// reported back to the state machine and are not dependent on each other.
@@ -323,11 +402,9 @@ final class HTTPConnectionPool {
323402
guard timeoutFired else { return }
324403

325404
// 3. Tell the state machine about the timeout
326-
let action = self.stateLock.withLock {
327-
self._state.timeoutRequest(requestID)
405+
self.modifyStateAndRunActions {
406+
$0.timeoutRequest(requestID)
328407
}
329-
330-
self.run(action: action)
331408
}
332409

333410
self.timerLock.withLockVoid {
@@ -362,34 +439,27 @@ final class HTTPConnectionPool {
362439
let scheduled = eventLoop.scheduleTask(in: self.idleConnectionTimeout) {
363440
// there might be a race between a cancelTimer call and the triggering
364441
// of this scheduled task. both want to acquire the lock
365-
let timerExisted = self.timerLock.withLock {
366-
self._idleTimer.removeValue(forKey: connectionID) != nil
442+
self.modifyStateAndRunActions { stateMachine in
443+
if self._idleTimer.removeValue(forKey: connectionID) != nil {
444+
// The timer still exists. State Machines assumes it is alive
445+
return stateMachine.connectionIdleTimeout(connectionID)
446+
}
447+
return .none
367448
}
368-
369-
guard timerExisted else { return }
370-
371-
let action = self.stateLock.withLock {
372-
self._state.connectionIdleTimeout(connectionID)
373-
}
374-
self.run(action: action)
375449
}
376450

377-
self.timerLock.withLock {
378-
assert(self._idleTimer[connectionID] == nil)
379-
self._idleTimer[connectionID] = scheduled
380-
}
451+
assert(self._idleTimer[connectionID] == nil)
452+
self._idleTimer[connectionID] = scheduled
381453
}
382454

383455
private func cancelIdleTimerForConnection(_ connectionID: Connection.ID) {
384456
self.logger.trace("Cancel idle connection timeout timer", metadata: [
385457
"ahc-connection-id": "\(connectionID)",
386458
])
387-
388-
let cancelTimer = self.timerLock.withLock {
389-
self._idleTimer.removeValue(forKey: connectionID)
459+
guard let cancelTimer = self._idleTimer.removeValue(forKey: connectionID) else {
460+
preconditionFailure("Expected to have an idle timer for connection \(connectionID) at this point.")
390461
}
391-
392-
cancelTimer?.cancel()
462+
cancelTimer.cancel()
393463
}
394464

395465
private func scheduleConnectionStartBackoffTimer(
@@ -403,30 +473,24 @@ final class HTTPConnectionPool {
403473

404474
let scheduled = eventLoop.scheduleTask(in: timeAmount) {
405475
// there might be a race between a backoffTimer and the pool shutting down.
406-
let timerExisted = self.timerLock.withLock {
407-
self._backoffTimer.removeValue(forKey: connectionID) != nil
408-
}
409-
410-
guard timerExisted else { return }
411-
412-
let action = self.stateLock.withLock {
413-
self._state.connectionCreationBackoffDone(connectionID)
476+
self.modifyStateAndRunActions { stateMachine in
477+
if self._backoffTimer.removeValue(forKey: connectionID) != nil {
478+
// The timer still exists. State Machines assumes it is alive
479+
return stateMachine.connectionCreationBackoffDone(connectionID)
480+
}
481+
return .none
414482
}
415-
self.run(action: action)
416483
}
417484

418-
self.timerLock.withLock {
419-
assert(self._backoffTimer[connectionID] == nil)
420-
self._backoffTimer[connectionID] = scheduled
421-
}
485+
assert(self._backoffTimer[connectionID] == nil)
486+
self._backoffTimer[connectionID] = scheduled
422487
}
423488

424489
private func cancelConnectionStartBackoffTimer(_ connectionID: Connection.ID) {
425-
let backoffTimer = self.timerLock.withLock {
426-
self._backoffTimer[connectionID]
490+
guard let backoffTimer = self._backoffTimer.removeValue(forKey: connectionID) else {
491+
preconditionFailure("Expected to have a backoff timer for connection \(connectionID) at this point.")
427492
}
428-
429-
backoffTimer?.cancel()
493+
backoffTimer.cancel()
430494
}
431495
}
432496

@@ -438,10 +502,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
438502
"ahc-connection-id": "\(connection.id)",
439503
"ahc-http-version": "http/1.1",
440504
])
441-
let action = self.stateLock.withLock {
442-
self._state.newHTTP1ConnectionCreated(.http1_1(connection))
505+
self.modifyStateAndRunActions {
506+
$0.newHTTP1ConnectionCreated(.http1_1(connection))
443507
}
444-
self.run(action: action)
445508
}
446509

447510
func http2ConnectionCreated(_ connection: HTTP2Connection, maximumStreams: Int) {
@@ -464,10 +527,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
464527
"ahc-error": "\(error)",
465528
"ahc-connection-id": "\(connectionID)",
466529
])
467-
let action = self.stateLock.withLock {
468-
self._state.failedToCreateNewConnection(error, connectionID: connectionID)
530+
self.modifyStateAndRunActions {
531+
$0.failedToCreateNewConnection(error, connectionID: connectionID)
469532
}
470-
self.run(action: action)
471533
}
472534
}
473535

@@ -477,21 +539,19 @@ extension HTTPConnectionPool: HTTP1ConnectionDelegate {
477539
"ahc-connection-id": "\(connection.id)",
478540
"ahc-http-version": "http/1.1",
479541
])
480-
let action = self.stateLock.withLock {
481-
self._state.connectionClosed(connection.id)
542+
self.modifyStateAndRunActions {
543+
$0.connectionClosed(connection.id)
482544
}
483-
self.run(action: action)
484545
}
485546

486547
func http1ConnectionReleased(_ connection: HTTP1Connection) {
487548
self.logger.trace("releasing connection", metadata: [
488549
"ahc-connection-id": "\(connection.id)",
489550
"ahc-http-version": "http/1.1",
490551
])
491-
let action = self.stateLock.withLock {
492-
self._state.http1ConnectionReleased(connection.id)
552+
self.modifyStateAndRunActions {
553+
$0.http1ConnectionReleased(connection.id)
493554
}
494-
self.run(action: action)
495555
}
496556
}
497557

@@ -524,10 +584,9 @@ extension HTTPConnectionPool: HTTP2ConnectionDelegate {
524584
extension HTTPConnectionPool: HTTPRequestScheduler {
525585
func cancelRequest(_ request: HTTPSchedulableRequest) {
526586
let requestID = Request(request).id
527-
let action = self.stateLock.withLock {
528-
self._state.cancelRequest(requestID)
587+
self.modifyStateAndRunActions {
588+
$0.cancelRequest(requestID)
529589
}
530-
self.run(action: action)
531590
}
532591
}
533592

0 commit comments

Comments
 (0)