@@ -66,7 +66,7 @@ public class HTTPClient {
66
66
public let eventLoopGroup : EventLoopGroup
67
67
let eventLoopGroupProvider : EventLoopGroupProvider
68
68
let configuration : Configuration
69
- let pool : ConnectionPool
69
+ let poolManager : HTTPConnectionPool . Manager
70
70
var state : State
71
71
private let stateLock = Lock ( )
72
72
@@ -108,14 +108,20 @@ public class HTTPClient {
108
108
#endif
109
109
}
110
110
self . configuration = configuration
111
- self . pool = ConnectionPool ( configuration: configuration,
112
- backgroundActivityLogger: backgroundActivityLogger)
111
+ self . poolManager = HTTPConnectionPool . Manager (
112
+ eventLoopGroup: self . eventLoopGroup,
113
+ configuration: self . configuration,
114
+ backgroundActivityLogger: backgroundActivityLogger
115
+ )
113
116
self . state = . upAndRunning
117
+
118
+ self . poolManager. delegate = self
114
119
}
115
120
116
121
deinit {
117
- assert ( self . pool. count == 0 )
118
- assert ( self . state == . shutDown, " Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed. " )
122
+ guard case . shutDown = self . state else {
123
+ preconditionFailure ( " Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed. " )
124
+ }
119
125
}
120
126
121
127
/// Shuts down the client and `EventLoopGroup` if it was created by the client.
@@ -189,36 +195,17 @@ public class HTTPClient {
189
195
private func shutdown( requiresCleanClose: Bool , queue: DispatchQueue , _ callback: @escaping ( Error ? ) -> Void ) {
190
196
do {
191
197
try self . stateLock. withLock {
192
- if self . state != . upAndRunning {
198
+ guard case . upAndRunning = self . state else {
193
199
throw HTTPClientError . alreadyShutdown
194
200
}
195
- self . state = . shuttingDown
201
+ self . state = . shuttingDown( requiresCleanClose : requiresCleanClose , callback : callback )
196
202
}
197
203
} catch {
198
204
callback ( error)
199
205
return
200
206
}
201
207
202
- self . pool. close ( on: self . eventLoopGroup. next ( ) ) . whenComplete { result in
203
- var closeError : Error ?
204
- switch result {
205
- case . failure( let error) :
206
- closeError = error
207
- case . success( let cleanShutdown) :
208
- if !cleanShutdown, requiresCleanClose {
209
- closeError = HTTPClientError . uncleanShutdown
210
- }
211
-
212
- self . shutdownEventLoop ( queue: queue) { eventLoopError in
213
- // we prioritise .uncleanShutdown here
214
- if let error = closeError {
215
- callback ( error)
216
- } else {
217
- callback ( eventLoopError)
218
- }
219
- }
220
- }
221
- }
208
+ self . poolManager. shutdown ( )
222
209
}
223
210
224
211
/// Execute `GET` request using specified URL.
@@ -490,7 +477,7 @@ public class HTTPClient {
490
477
let taskEL : EventLoop
491
478
switch eventLoopPreference. preference {
492
479
case . indifferent:
493
- taskEL = self . pool . associatedEventLoop ( for : ConnectionPool . Key ( request ) ) ?? self . eventLoopGroup. next ( )
480
+ taskEL = self . eventLoopGroup. next ( )
494
481
case . delegate( on: let eventLoop) :
495
482
precondition ( self . eventLoopGroup. makeIterator ( ) . contains { $0 === eventLoop } , " Provided EventLoop must be part of clients EventLoopGroup. " )
496
483
taskEL = eventLoop
@@ -538,77 +525,30 @@ public class HTTPClient {
538
525
}
539
526
540
527
let task = Task < Delegate . Response > ( eventLoop: taskEL, logger: logger)
541
- let setupComplete = taskEL. makePromise ( of: Void . self)
542
- let connection = self . pool. getConnection ( request,
543
- preference: eventLoopPreference,
544
- taskEventLoop: taskEL,
545
- deadline: deadline,
546
- setupComplete: setupComplete. futureResult,
547
- logger: logger)
548
-
549
- let taskHandler = TaskHandler ( task: task,
550
- kind: request. kind,
551
- delegate: delegate,
552
- redirectHandler: redirectHandler,
553
- ignoreUncleanSSLShutdown: self . configuration. ignoreUncleanSSLShutdown,
554
- logger: logger)
555
-
556
- connection. flatMap { connection -> EventLoopFuture < Void > in
557
- logger. debug ( " got connection for request " ,
558
- metadata: [ " ahc-connection " : " \( connection) " ,
559
- " ahc-request " : " \( request. method) \( request. url) " ,
560
- " ahc-channel-el " : " \( connection. channel. eventLoop) " ,
561
- " ahc-task-el " : " \( taskEL) " ] )
562
-
563
- let channel = connection. channel
564
-
565
- func prepareChannelForTask0( ) -> EventLoopFuture < Void > {
566
- do {
567
- let syncPipelineOperations = channel. pipeline. syncOperations
568
-
569
- if let timeout = self . resolve ( timeout: self . configuration. timeout. read, deadline: deadline) {
570
- try syncPipelineOperations. addHandler ( IdleStateHandler ( readTimeout: timeout) )
571
- }
572
-
573
- try syncPipelineOperations. addHandler ( taskHandler)
574
- } catch {
575
- connection. release ( closing: true , logger: logger)
576
- return channel. eventLoop. makeFailedFuture ( error)
577
- }
578
-
579
- task. setConnection ( connection)
580
528
581
- let isCancelled = task. lock. withLock {
582
- task. cancelled
583
- }
584
-
585
- if !isCancelled {
586
- return channel. writeAndFlush ( request) . flatMapError { _ in
587
- // At this point the `TaskHandler` will already be present
588
- // to handle the failure and pass it to the `promise`
589
- channel. eventLoop. makeSucceededVoidFuture ( )
590
- }
591
- } else {
592
- return channel. eventLoop. makeSucceededVoidFuture ( )
593
- }
529
+ let requestBag = RequestBag (
530
+ request: request,
531
+ eventLoopPreference: eventLoopPreference,
532
+ task: task,
533
+ redirectHandler: redirectHandler,
534
+ connectionDeadline: . now( ) + ( self . configuration. timeout. connect ?? . seconds( 10 ) ) ,
535
+ idleReadTimeout: self . configuration. timeout. read,
536
+ delegate: delegate
537
+ )
538
+
539
+ var deadlineSchedule : Scheduled < Void > ?
540
+ if let deadline = deadline {
541
+ deadlineSchedule = taskEL. scheduleTask ( deadline: deadline) {
542
+ requestBag. fail ( HTTPClientError . deadlineExceeded)
594
543
}
595
544
596
- if channel. eventLoop. inEventLoop {
597
- return prepareChannelForTask0 ( )
598
- } else {
599
- return channel. eventLoop. flatSubmit {
600
- return prepareChannelForTask0 ( )
601
- }
602
- }
603
- } . always { _ in
604
- setupComplete. succeed ( ( ) )
605
- } . whenFailure { error in
606
- taskHandler. callOutToDelegateFireAndForget { task in
607
- delegate. didReceiveError ( task: task, error)
545
+ task. promise. futureResult. whenComplete { _ in
546
+ deadlineSchedule? . cancel ( )
608
547
}
609
- task. promise. fail ( error)
610
548
}
611
549
550
+ self . poolManager. execute ( request: requestBag)
551
+
612
552
return task
613
553
}
614
554
@@ -815,7 +755,7 @@ public class HTTPClient {
815
755
816
756
enum State {
817
757
case upAndRunning
818
- case shuttingDown
758
+ case shuttingDown( requiresCleanClose : Bool , callback : ( Error ? ) -> Void )
819
759
case shutDown
820
760
}
821
761
}
@@ -882,6 +822,22 @@ extension HTTPClient.Configuration {
882
822
}
883
823
}
884
824
825
+ extension HTTPClient : HTTPConnectionPoolManagerDelegate {
826
+ func httpConnectionPoolManagerDidShutdown( _: HTTPConnectionPool . Manager , unclean: Bool ) {
827
+ let ( callback, error) = self . stateLock. withLock { ( ) -> ( ( Error ? ) -> Void , Error ? ) in
828
+ guard case . shuttingDown( let requiresClean, callback: let callback) = self . state else {
829
+ preconditionFailure ( " Why did the pool manager shut down, if it was not instructed to " )
830
+ }
831
+
832
+ self . state = . shutDown
833
+ let error : Error ? = ( requiresClean && unclean) ? HTTPClientError . uncleanShutdown : nil
834
+ return ( callback, error)
835
+ }
836
+
837
+ callback ( error)
838
+ }
839
+ }
840
+
885
841
/// Possible client errors.
886
842
public struct HTTPClientError : Error , Equatable , CustomStringConvertible {
887
843
private enum Code : Equatable {
@@ -909,6 +865,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
909
865
case incompatibleHeaders
910
866
case connectTimeout
911
867
case getConnectionFromPoolTimeout
868
+ case deadlineExceeded
912
869
}
913
870
914
871
private var code : Code
@@ -973,4 +930,6 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
973
930
/// - A connection could not be created within the timout period.
974
931
/// - Tasks are not processed fast enough on the existing connections, to process all waiters in time
975
932
public static let getConnectionFromPoolTimeout = HTTPClientError ( code: . getConnectionFromPoolTimeout)
933
+ /// The request deadline was exceeded. The request was cancelled because of this.
934
+ public static let deadlineExceeded = HTTPClientError ( code: . deadlineExceeded)
976
935
}
0 commit comments