13
13
//===----------------------------------------------------------------------===//
14
14
15
15
import Foundation
16
+ import Logging
16
17
import NIO
17
18
import NIOConcurrencyHelpers
18
19
import NIOHTTP1
@@ -64,21 +65,29 @@ final class ConnectionPool {
64
65
/// When the pool is asked for a new connection, it creates a `Key` from the url associated to the `request`. This key
65
66
/// is used to determine if there already exists an associated `HTTP1ConnectionProvider` in `providers`.
66
67
/// If there is, the connection provider then takes care of leasing a new connection. If a connection provider doesn't exist, it is created.
67
- func getConnection( for request: HTTPClient . Request , preference: HTTPClient . EventLoopPreference , on eventLoop: EventLoop , deadline: NIODeadline ? , setupComplete: EventLoopFuture < Void > ) -> EventLoopFuture < Connection > {
68
+ func getConnection( _ request: HTTPClient . Request ,
69
+ preference: HTTPClient . EventLoopPreference ,
70
+ taskEventLoop: EventLoop ,
71
+ deadline: NIODeadline ? ,
72
+ setupComplete: EventLoopFuture < Void > ,
73
+ logger: Logger ) -> EventLoopFuture < Connection > {
68
74
let key = Key ( request)
69
75
70
76
let provider : HTTP1ConnectionProvider = self . lock. withLock {
71
77
if let existing = self . providers [ key] , existing. enqueue ( ) {
72
78
return existing
73
79
} else {
74
80
// Connection provider will be created with `pending = 1`
75
- let provider = HTTP1ConnectionProvider ( key: key, eventLoop: eventLoop, configuration: self . configuration, pool: self )
81
+ let provider = HTTP1ConnectionProvider ( key: key,
82
+ eventLoop: taskEventLoop,
83
+ configuration: self . configuration,
84
+ pool: self )
76
85
self . providers [ key] = provider
77
86
return provider
78
87
}
79
88
}
80
89
81
- return provider. getConnection ( preference: preference, setupComplete: setupComplete)
90
+ return provider. getConnection ( preference: preference, setupComplete: setupComplete, logger : logger )
82
91
}
83
92
84
93
func delete( _ provider: HTTP1ConnectionProvider ) {
@@ -167,21 +176,21 @@ class Connection {
167
176
/// Release this `Connection` to its associated `HTTP1ConnectionProvider`.
168
177
///
169
178
/// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline.
170
- func release( closing: Bool ) {
179
+ func release( closing: Bool , logger : Logger ) {
171
180
assert ( self . channel. eventLoop. inEventLoop)
172
- self . provider. release ( connection: self , closing: closing)
181
+ self . provider. release ( connection: self , closing: closing, logger : logger )
173
182
}
174
183
175
184
/// Called when channel exceeds idle time in pool.
176
- func timeout( ) {
185
+ func timeout( logger : Logger ) {
177
186
assert ( self . channel. eventLoop. inEventLoop)
178
- self . provider. timeout ( connection: self )
187
+ self . provider. timeout ( connection: self , logger : logger )
179
188
}
180
189
181
190
/// Called when channel goes inactive while in the pool.
182
- func remoteClosed( ) {
191
+ func remoteClosed( logger : Logger ) {
183
192
assert ( self . channel. eventLoop. inEventLoop)
184
- self . provider. remoteClosed ( connection: self )
193
+ self . provider. remoteClosed ( connection: self , logger : logger )
185
194
}
186
195
187
196
func cancel( ) -> EventLoopFuture < Void > {
@@ -194,9 +203,10 @@ class Connection {
194
203
}
195
204
196
205
/// Sets idle timeout handler and channel inactivity listener.
197
- func setIdleTimeout( timeout: TimeAmount ? ) {
206
+ func setIdleTimeout( timeout: TimeAmount ? , logger : Logger ) {
198
207
_ = self . channel. pipeline. addHandler ( IdleStateHandler ( writeTimeout: timeout) , position: . first) . flatMap { _ in
199
- self . channel. pipeline. addHandler ( IdlePoolConnectionHandler ( connection: self ) )
208
+ self . channel. pipeline. addHandler ( IdlePoolConnectionHandler ( connection: self ,
209
+ logger: logger) )
200
210
}
201
211
}
202
212
@@ -280,50 +290,76 @@ class HTTP1ConnectionProvider {
280
290
self . state. assertInvariants ( )
281
291
}
282
292
283
- private func execute( _ action: Action ) {
293
+ private func execute( _ action: Action , logger : Logger ) {
284
294
switch action {
285
295
case . lease( let connection, let waiter) :
286
296
// if connection is became inactive, we create a new one.
287
297
connection. cancelIdleTimeout ( ) . whenComplete { _ in
288
298
if connection. isActiveEstimation {
299
+ logger. trace ( " leasing existing connection " ,
300
+ metadata: [ " ahc-connection " : " \( connection) " ] )
289
301
waiter. promise. succeed ( connection)
290
302
} else {
303
+ logger. trace ( " opening fresh connection (found matching but inactive connection) " ,
304
+ metadata: [ " ahc-dead-connection " : " \( connection) " ] )
291
305
self . makeChannel ( preference: waiter. preference) . whenComplete { result in
292
- self . connect ( result, waiter: waiter, replacing: connection)
306
+ self . connect ( result, waiter: waiter, replacing: connection, logger : logger )
293
307
}
294
308
}
295
309
}
296
310
case . create( let waiter) :
311
+ logger. trace ( " opening fresh connection (no connections to reuse available) " )
297
312
self . makeChannel ( preference: waiter. preference) . whenComplete { result in
298
- self . connect ( result, waiter: waiter)
313
+ self . connect ( result, waiter: waiter, logger : logger )
299
314
}
300
315
case . replace( let connection, let waiter) :
301
316
connection. cancelIdleTimeout ( ) . flatMap {
302
317
connection. close ( )
303
318
} . whenComplete { _ in
319
+ logger. trace ( " opening fresh connection (replacing exising connection) " ,
320
+ metadata: [ " ahc-old-connection " : " \( connection) " ,
321
+ " ahc-waiter " : " \( waiter) " ] )
304
322
self . makeChannel ( preference: waiter. preference) . whenComplete { result in
305
- self . connect ( result, waiter: waiter, replacing: connection)
323
+ self . connect ( result, waiter: waiter, replacing: connection, logger : logger )
306
324
}
307
325
}
308
326
case . park( let connection) :
309
- connection. setIdleTimeout ( timeout: self . configuration. maximumAllowedIdleTimeInConnectionPool)
327
+ logger. trace ( " parking connection " ,
328
+ metadata: [ " ahc-connection " : " \( connection) " ] )
329
+ connection. setIdleTimeout ( timeout: self . configuration. maximumAllowedIdleTimeInConnectionPool,
330
+ logger: logger. detachingRequestInformation ( ) )
310
331
case . closeProvider:
332
+ logger. debug ( " closing provider " ,
333
+ metadata: [ " ahc-provider " : " \( self ) " ] )
311
334
self . closeAndDelete ( )
312
335
case . none:
313
336
break
314
337
case . parkAnd( let connection, let action) :
315
- connection. setIdleTimeout ( timeout: self . configuration. maximumAllowedIdleTimeInConnectionPool)
316
- self . execute ( action)
338
+ logger. trace ( " parking connection & doing further action " ,
339
+ metadata: [ " ahc-connection " : " \( connection) " ,
340
+ " ahc-action " : " \( action) " ] )
341
+ connection. setIdleTimeout ( timeout: self . configuration. maximumAllowedIdleTimeInConnectionPool,
342
+ logger: logger. detachingRequestInformation ( ) )
343
+ self . execute ( action, logger: logger)
317
344
case . closeAnd( let connection, let action) :
345
+ logger. trace ( " closing connection & doing further action " ,
346
+ metadata: [ " ahc-connection " : " \( connection) " ,
347
+ " ahc-action " : " \( action) " ] )
318
348
connection. channel. close ( promise: nil )
319
- self . execute ( action)
349
+ self . execute ( action, logger : logger )
320
350
case . cancel( let connection, let close) :
351
+ logger. trace ( " cancelling connection " ,
352
+ metadata: [ " ahc-connection " : " \( connection) " ,
353
+ " ahc-close " : " \( close) " ] )
321
354
connection. cancel ( ) . whenComplete { _ in
322
355
if close {
323
356
self . closeAndDelete ( )
324
357
}
325
358
}
326
359
case . fail( let waiter, let error) :
360
+ logger. debug ( " failing connection for waiter " ,
361
+ metadata: [ " ahc-waiter " : " \( waiter) " ,
362
+ " ahc-error " : " \( error) " ] )
327
363
waiter. promise. fail ( error)
328
364
}
329
365
}
@@ -335,19 +371,24 @@ class HTTP1ConnectionProvider {
335
371
}
336
372
}
337
373
338
- func getConnection( preference: HTTPClient . EventLoopPreference , setupComplete: EventLoopFuture < Void > ) -> EventLoopFuture < Connection > {
374
+ func getConnection( preference: HTTPClient . EventLoopPreference ,
375
+ setupComplete: EventLoopFuture < Void > ,
376
+ logger: Logger ) -> EventLoopFuture < Connection > {
339
377
let waiter = Waiter ( promise: self . eventLoop. makePromise ( ) , setupComplete: setupComplete, preference: preference)
340
378
341
379
let action : Action = self . lock. withLock {
342
380
self . state. acquire ( waiter: waiter)
343
381
}
344
382
345
- self . execute ( action)
383
+ self . execute ( action, logger : logger )
346
384
347
385
return waiter. promise. futureResult
348
386
}
349
387
350
- func connect( _ result: Result < Channel , Error > , waiter: Waiter , replacing closedConnection: Connection ? = nil ) {
388
+ func connect( _ result: Result < Channel , Error > ,
389
+ waiter: Waiter ,
390
+ replacing closedConnection: Connection ? = nil ,
391
+ logger: Logger ) {
351
392
let action : Action
352
393
switch result {
353
394
case . success( let channel) :
@@ -366,11 +407,13 @@ class HTTP1ConnectionProvider {
366
407
waiter. promise. fail ( error)
367
408
}
368
409
waiter. setupComplete. whenComplete { _ in
369
- self . execute ( action)
410
+ self . execute ( action, logger : logger )
370
411
}
371
412
}
372
413
373
- func release( connection: Connection , closing: Bool ) {
414
+ func release( connection: Connection , closing: Bool , logger: Logger ) {
415
+ logger. debug ( " releasing connection, request complete " ,
416
+ metadata: [ " ahc-closing " : " \( closing) " ] )
374
417
let action : Action = self . lock. withLock {
375
418
self . state. release ( connection: connection, closing: closing)
376
419
}
@@ -381,31 +424,31 @@ class HTTP1ConnectionProvider {
381
424
case . park, . closeProvider:
382
425
// Since both `.park` and `.deleteProvider` are terminal in terms of execution,
383
426
// we can execute them immediately
384
- self . execute ( action)
427
+ self . execute ( action, logger : logger )
385
428
case . cancel, . closeAnd, . create, . fail, . lease, . parkAnd, . replace:
386
429
// This is needed to start a new stack, otherwise, since this is called on a previous
387
430
// future completion handler chain, it will be growing indefinitely until the connection is closed.
388
431
// We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved.
389
432
connection. channel. eventLoop. execute {
390
- self . execute ( action)
433
+ self . execute ( action, logger : logger )
391
434
}
392
435
}
393
436
}
394
437
395
- func remoteClosed( connection: Connection ) {
438
+ func remoteClosed( connection: Connection , logger : Logger ) {
396
439
let action : Action = self . lock. withLock {
397
440
self . state. remoteClosed ( connection: connection)
398
441
}
399
442
400
- self . execute ( action)
443
+ self . execute ( action, logger : logger )
401
444
}
402
445
403
- func timeout( connection: Connection ) {
446
+ func timeout( connection: Connection , logger : Logger ) {
404
447
let action : Action = self . lock. withLock {
405
448
self . state. timeout ( connection: connection)
406
449
}
407
450
408
- self . execute ( action)
451
+ self . execute ( action, logger : logger )
409
452
}
410
453
411
454
private func closeAndDelete( ) {
@@ -510,25 +553,27 @@ class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler
510
553
511
554
let connection : Connection
512
555
var eventSent : Bool
556
+ let logger : Logger
513
557
514
- init ( connection: Connection ) {
558
+ init ( connection: Connection , logger : Logger ) {
515
559
self . connection = connection
516
560
self . eventSent = false
561
+ self . logger = logger
517
562
}
518
563
519
564
// this is needed to detect when remote end closes connection while connection is in the pool idling
520
565
func channelInactive( context: ChannelHandlerContext ) {
521
566
if !self . eventSent {
522
567
self . eventSent = true
523
- self . connection. remoteClosed ( )
568
+ self . connection. remoteClosed ( logger : self . logger )
524
569
}
525
570
}
526
571
527
572
func userInboundEventTriggered( context: ChannelHandlerContext , event: Any ) {
528
573
if let idleEvent = event as? IdleStateHandler . IdleStateEvent , idleEvent == . write {
529
574
if !self . eventSent {
530
575
self . eventSent = true
531
- self . connection. timeout ( )
576
+ self . connection. timeout ( logger : self . logger )
532
577
}
533
578
} else {
534
579
context. fireUserInboundEventTriggered ( event)
0 commit comments