@@ -25,15 +25,17 @@ final class HTTPConnectionPool {
25
25
26
26
private let stateLock = Lock ( )
27
27
private var _state : StateMachine
28
+ /// The connection idle timeout timers. Protected by the stateLock
29
+ private var _idleTimer = [ Connection . ID : Scheduled < Void > ] ( )
30
+ /// The connection backoff timeout timers. Protected by the stateLock
31
+ private var _backoffTimer = [ Connection . ID : Scheduled < Void > ] ( )
28
32
29
33
private static let fallbackConnectTimeout : TimeAmount = . seconds( 30 )
30
34
31
35
let key : ConnectionPool . Key
32
36
33
37
private let timerLock = Lock ( )
34
38
private var _requestTimer = [ Request . ID : Scheduled < Void > ] ( )
35
- private var _idleTimer = [ Connection . ID : Scheduled < Void > ] ( )
36
- private var _backoffTimer = [ Connection . ID : Scheduled < Void > ] ( )
37
39
38
40
private var logger : Logger
39
41
@@ -76,40 +78,117 @@ final class HTTPConnectionPool {
76
78
}
77
79
78
80
func executeRequest( _ request: HTTPSchedulableRequest ) {
79
- let action = self . stateLock. withLock { ( ) -> StateMachine . Action in
80
- self . _state. executeRequest ( . init( request) )
81
- }
82
- self . run ( action: action)
81
+ self . modifyStateAndRunActions { $0. executeRequest ( . init( request) ) }
83
82
}
84
83
85
84
func shutdown( ) {
86
85
self . logger. debug ( " Shutting down connection pool " )
87
- let action = self . stateLock. withLock { ( ) -> StateMachine . Action in
88
- self . _state. shutdown ( )
86
+ self . modifyStateAndRunActions { $0. shutdown ( ) }
87
+ }
88
+
89
+ // MARK: - Private Methods -
90
+
91
+ // MARK: Actions
92
+
93
+ ///
94
+ private struct Actions {
95
+ enum ConnectionAction {
96
+ enum Unlocked {
97
+ case createConnection( Connection . ID , on: EventLoop )
98
+ case closeConnection( Connection , isShutdown: StateMachine . ConnectionAction . IsShutdown )
99
+ case cleanupConnections( CleanupContext , isShutdown: StateMachine . ConnectionAction . IsShutdown )
100
+ case none
101
+ }
102
+
103
+ enum Locked {
104
+ case scheduleBackoffTimer( Connection . ID , backoff: TimeAmount , on: EventLoop )
105
+ case cancelBackoffTimers( [ Connection . ID ] )
106
+ case scheduleTimeoutTimer( Connection . ID , on: EventLoop )
107
+ case cancelTimeoutTimer( Connection . ID )
108
+ case none
109
+ }
110
+ }
111
+
112
+ struct Locked {
113
+ var connection : ConnectionAction . Locked
114
+ }
115
+
116
+ struct Unlocked {
117
+ var connection : ConnectionAction . Unlocked
118
+ var request : StateMachine . RequestAction
119
+ }
120
+
121
+ var locked : Locked
122
+ var unlocked : Unlocked
123
+
124
+ init ( from stateMachineAction: StateMachine . Action ) {
125
+ self . locked = Locked ( connection: . none)
126
+ self . unlocked = Unlocked ( connection: . none, request: stateMachineAction. request)
127
+
128
+ switch stateMachineAction. connection {
129
+ case . createConnection( let connectionID, on: let eventLoop) :
130
+ self . unlocked. connection = . createConnection( connectionID, on: eventLoop)
131
+ case . scheduleBackoffTimer( let connectionID, backoff: let backoff, on: let eventLoop) :
132
+ self . locked. connection = . scheduleBackoffTimer( connectionID, backoff: backoff, on: eventLoop)
133
+ case . scheduleTimeoutTimer( let connectionID, on: let eventLoop) :
134
+ self . locked. connection = . scheduleTimeoutTimer( connectionID, on: eventLoop)
135
+ case . cancelTimeoutTimer( let connectionID) :
136
+ self . locked. connection = . cancelTimeoutTimer( connectionID)
137
+ case . closeConnection( let connection, isShutdown: let isShutdown) :
138
+ self . unlocked. connection = . closeConnection( connection, isShutdown: isShutdown)
139
+ case . cleanupConnections( var cleanupContext, isShutdown: let isShutdown) :
140
+ //
141
+ self . locked. connection = . cancelBackoffTimers( cleanupContext. connectBackoff)
142
+ cleanupContext. connectBackoff = [ ]
143
+ self . unlocked. connection = . cleanupConnections( cleanupContext, isShutdown: isShutdown)
144
+ case . none:
145
+ break
146
+ }
89
147
}
90
- self . run ( action: action)
91
148
}
92
149
93
150
// MARK: Run actions
94
-
95
- private func run( action: StateMachine . Action ) {
96
- self . runConnectionAction ( action. connection)
97
- self . runRequestAction ( action. request)
151
+
152
+ private func modifyStateAndRunActions( _ closure: ( inout StateMachine ) -> StateMachine . Action ) {
153
+ let unlockedActions = self . stateLock. withLock { ( ) -> Actions . Unlocked in
154
+ let stateMachineAction = closure ( & self . _state)
155
+ let poolAction = Actions ( from: stateMachineAction)
156
+ self . runLockedActions ( poolAction. locked)
157
+ return poolAction. unlocked
158
+ }
159
+ self . runUnlockedActions ( unlockedActions)
98
160
}
99
-
100
- private func runConnectionAction( _ action: StateMachine . ConnectionAction ) {
101
- switch action {
102
- case . createConnection( let connectionID, let eventLoop) :
103
- self . createConnection ( connectionID, on: eventLoop)
104
-
105
- case . scheduleBackoffTimer( let connectionID, let backoff, on: let eventLoop) :
161
+
162
+ private func runLockedActions( _ actions: Actions . Locked ) {
163
+ switch actions. connection {
164
+ case . scheduleBackoffTimer( let connectionID, backoff: let backoff, on: let eventLoop) :
106
165
self . scheduleConnectionStartBackoffTimer ( connectionID, backoff, on: eventLoop)
107
-
166
+
108
167
case . scheduleTimeoutTimer( let connectionID, on: let eventLoop) :
109
168
self . scheduleIdleTimerForConnection ( connectionID, on: eventLoop)
110
-
169
+
111
170
case . cancelTimeoutTimer( let connectionID) :
112
171
self . cancelIdleTimerForConnection ( connectionID)
172
+
173
+ case . cancelBackoffTimers( let connectionIDs) :
174
+ for connectionID in connectionIDs {
175
+ self . cancelConnectionStartBackoffTimer ( connectionID)
176
+ }
177
+
178
+ case . none:
179
+ break
180
+ }
181
+ }
182
+
183
+ private func runUnlockedActions( _ actions: Actions . Unlocked ) {
184
+ self . runUnlockedConnectionAction ( actions. connection)
185
+ self . runUnlockedRequestAction ( actions. request)
186
+ }
187
+
188
+ private func runUnlockedConnectionAction( _ action: Actions . ConnectionAction . Unlocked ) {
189
+ switch action {
190
+ case . createConnection( let connectionID, let eventLoop) :
191
+ self . createConnection ( connectionID, on: eventLoop)
113
192
114
193
case . closeConnection( let connection, isShutdown: let isShutdown) :
115
194
self . logger. trace ( " close connection " , metadata: [
@@ -145,7 +224,7 @@ final class HTTPConnectionPool {
145
224
}
146
225
}
147
226
148
- private func runRequestAction ( _ action: StateMachine . RequestAction ) {
227
+ private func runUnlockedRequestAction ( _ action: StateMachine . RequestAction ) {
149
228
// The order of execution fail/execute request vs cancelling the request timeout timer does
150
229
// not matter in the actions here. The actions don't cause any side effects that will be
151
230
// reported back to the state machine and are not dependent on each other.
@@ -217,11 +296,9 @@ final class HTTPConnectionPool {
217
296
guard timeoutFired else { return }
218
297
219
298
// 3. Tell the state machine about the timeout
220
- let action = self . stateLock . withLock {
221
- self . _state . timeoutRequest ( requestID)
299
+ self . modifyStateAndRunActions {
300
+ $0 . timeoutRequest ( requestID)
222
301
}
223
-
224
- self . run ( action: action)
225
302
}
226
303
227
304
self . timerLock. withLockVoid {
@@ -256,34 +333,27 @@ final class HTTPConnectionPool {
256
333
let scheduled = eventLoop. scheduleTask ( in: self . idleConnectionTimeout) {
257
334
// there might be a race between a cancelTimer call and the triggering
258
335
// of this scheduled task. both want to acquire the lock
259
- let timerExisted = self . timerLock. withLock {
260
- self . _idleTimer. removeValue ( forKey: connectionID) != nil
261
- }
262
-
263
- guard timerExisted else { return }
264
-
265
- let action = self . stateLock. withLock {
266
- self . _state. connectionIdleTimeout ( connectionID)
336
+ self . modifyStateAndRunActions { stateMachine in
337
+ if self . _idleTimer. removeValue ( forKey: connectionID) != nil {
338
+ // The timer still exists. State Machines assumes it is alive
339
+ return stateMachine. connectionIdleTimeout ( connectionID)
340
+ }
341
+ return . none
267
342
}
268
- self . run ( action: action)
269
- }
270
-
271
- self . timerLock. withLock {
272
- assert ( self . _idleTimer [ connectionID] == nil )
273
- self . _idleTimer [ connectionID] = scheduled
274
343
}
344
+
345
+ assert ( self . _idleTimer [ connectionID] == nil )
346
+ self . _idleTimer [ connectionID] = scheduled
275
347
}
276
348
277
349
private func cancelIdleTimerForConnection( _ connectionID: Connection . ID ) {
278
350
self . logger. trace ( " Cancel idle connection timeout timer " , metadata: [
279
351
" ahc-connection-id " : " \( connectionID) " ,
280
352
] )
281
-
282
- let cancelTimer = self . timerLock. withLock {
283
- self . _idleTimer. removeValue ( forKey: connectionID)
353
+ guard let cancelTimer = self . _idleTimer. removeValue ( forKey: connectionID) else {
354
+ preconditionFailure ( " Expected to have an idle timer for connection \( connectionID) at this point. " )
284
355
}
285
-
286
- cancelTimer? . cancel ( )
356
+ cancelTimer. cancel ( )
287
357
}
288
358
289
359
private func scheduleConnectionStartBackoffTimer(
@@ -297,30 +367,24 @@ final class HTTPConnectionPool {
297
367
298
368
let scheduled = eventLoop. scheduleTask ( in: timeAmount) {
299
369
// there might be a race between a backoffTimer and the pool shutting down.
300
- let timerExisted = self . timerLock. withLock {
301
- self . _backoffTimer. removeValue ( forKey: connectionID) != nil
370
+ self . modifyStateAndRunActions { stateMachine in
371
+ if self . _backoffTimer. removeValue ( forKey: connectionID) != nil {
372
+ // The timer still exists. State Machines assumes it is alive
373
+ return stateMachine. connectionCreationBackoffDone ( connectionID)
374
+ }
375
+ return . none
302
376
}
303
-
304
- guard timerExisted else { return }
305
-
306
- let action = self . stateLock. withLock {
307
- self . _state. connectionCreationBackoffDone ( connectionID)
308
- }
309
- self . run ( action: action)
310
377
}
311
378
312
- self . timerLock. withLock {
313
- assert ( self . _backoffTimer [ connectionID] == nil )
314
- self . _backoffTimer [ connectionID] = scheduled
315
- }
379
+ assert ( self . _backoffTimer [ connectionID] == nil )
380
+ self . _backoffTimer [ connectionID] = scheduled
316
381
}
317
382
318
383
private func cancelConnectionStartBackoffTimer( _ connectionID: Connection . ID ) {
319
- let backoffTimer = self . timerLock . withLock {
320
- self . _backoffTimer [ connectionID]
384
+ guard let backoffTimer = self . _backoffTimer . removeValue ( forKey : connectionID ) else {
385
+ preconditionFailure ( " Expected to have a backoff timer for connection \( connectionID) at this point. " )
321
386
}
322
-
323
- backoffTimer? . cancel ( )
387
+ backoffTimer. cancel ( )
324
388
}
325
389
}
326
390
@@ -332,10 +396,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
332
396
" ahc-connection-id " : " \( connection. id) " ,
333
397
" ahc-http-version " : " http/1.1 " ,
334
398
] )
335
- let action = self . stateLock . withLock {
336
- self . _state . newHTTP1ConnectionCreated ( . http1_1( connection) )
399
+ self . modifyStateAndRunActions {
400
+ $0 . newHTTP1ConnectionCreated ( . http1_1( connection) )
337
401
}
338
- self . run ( action: action)
339
402
}
340
403
341
404
func http2ConnectionCreated( _ connection: HTTP2Connection , maximumStreams: Int ) {
@@ -358,10 +421,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester {
358
421
" ahc-error " : " \( error) " ,
359
422
" ahc-connection-id " : " \( connectionID) " ,
360
423
] )
361
- let action = self . stateLock . withLock {
362
- self . _state . failedToCreateNewConnection ( error, connectionID: connectionID)
424
+ self . modifyStateAndRunActions {
425
+ $0 . failedToCreateNewConnection ( error, connectionID: connectionID)
363
426
}
364
- self . run ( action: action)
365
427
}
366
428
}
367
429
@@ -371,21 +433,19 @@ extension HTTPConnectionPool: HTTP1ConnectionDelegate {
371
433
" ahc-connection-id " : " \( connection. id) " ,
372
434
" ahc-http-version " : " http/1.1 " ,
373
435
] )
374
- let action = self . stateLock . withLock {
375
- self . _state . connectionClosed ( connection. id)
436
+ self . modifyStateAndRunActions {
437
+ $0 . connectionClosed ( connection. id)
376
438
}
377
- self . run ( action: action)
378
439
}
379
440
380
441
func http1ConnectionReleased( _ connection: HTTP1Connection ) {
381
442
self . logger. trace ( " releasing connection " , metadata: [
382
443
" ahc-connection-id " : " \( connection. id) " ,
383
444
" ahc-http-version " : " http/1.1 " ,
384
445
] )
385
- let action = self . stateLock . withLock {
386
- self . _state . http1ConnectionReleased ( connection. id)
446
+ self . modifyStateAndRunActions {
447
+ $0 . http1ConnectionReleased ( connection. id)
387
448
}
388
- self . run ( action: action)
389
449
}
390
450
}
391
451
@@ -418,10 +478,9 @@ extension HTTPConnectionPool: HTTP2ConnectionDelegate {
418
478
extension HTTPConnectionPool : HTTPRequestScheduler {
419
479
func cancelRequest( _ request: HTTPSchedulableRequest ) {
420
480
let requestID = Request ( request) . id
421
- let action = self . stateLock . withLock {
422
- self . _state . cancelRequest ( requestID)
481
+ self . modifyStateAndRunActions {
482
+ $0 . cancelRequest ( requestID)
423
483
}
424
- self . run ( action: action)
425
484
}
426
485
}
427
486
0 commit comments