Skip to content

Commit 861de16

Browse files
committed
Switch to new HTTPConnectionPool
1 parent bb1f31f commit 861de16

9 files changed

+538
-507
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ protocol HTTPConnectionPoolDelegate {
2929
}
3030

3131
class HTTPConnectionPool {
32-
3332
struct Connection: Equatable {
3433
typealias ID = Int
3534

Diff for: Sources/AsyncHTTPClient/HTTPClient.swift

+53-94
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class HTTPClient {
6666
public let eventLoopGroup: EventLoopGroup
6767
let eventLoopGroupProvider: EventLoopGroupProvider
6868
let configuration: Configuration
69-
let pool: ConnectionPool
69+
let poolManager: HTTPConnectionPool.Manager
7070
var state: State
7171
private let stateLock = Lock()
7272

@@ -108,14 +108,20 @@ public class HTTPClient {
108108
#endif
109109
}
110110
self.configuration = configuration
111-
self.pool = ConnectionPool(configuration: configuration,
112-
backgroundActivityLogger: backgroundActivityLogger)
111+
self.poolManager = HTTPConnectionPool.Manager(
112+
eventLoopGroup: self.eventLoopGroup,
113+
configuration: self.configuration,
114+
backgroundActivityLogger: backgroundActivityLogger
115+
)
113116
self.state = .upAndRunning
117+
118+
self.poolManager.delegate = self
114119
}
115120

116121
deinit {
117-
assert(self.pool.count == 0)
118-
assert(self.state == .shutDown, "Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.")
122+
guard case .shutDown = self.state else {
123+
preconditionFailure("Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.")
124+
}
119125
}
120126

121127
/// Shuts down the client and `EventLoopGroup` if it was created by the client.
@@ -189,36 +195,17 @@ public class HTTPClient {
189195
private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
190196
do {
191197
try self.stateLock.withLock {
192-
if self.state != .upAndRunning {
198+
guard case .upAndRunning = self.state else {
193199
throw HTTPClientError.alreadyShutdown
194200
}
195-
self.state = .shuttingDown
201+
self.state = .shuttingDown(requiresCleanClose: requiresCleanClose, callback: callback)
196202
}
197203
} catch {
198204
callback(error)
199205
return
200206
}
201207

202-
self.pool.close(on: self.eventLoopGroup.next()).whenComplete { result in
203-
var closeError: Error?
204-
switch result {
205-
case .failure(let error):
206-
closeError = error
207-
case .success(let cleanShutdown):
208-
if !cleanShutdown, requiresCleanClose {
209-
closeError = HTTPClientError.uncleanShutdown
210-
}
211-
212-
self.shutdownEventLoop(queue: queue) { eventLoopError in
213-
// we prioritise .uncleanShutdown here
214-
if let error = closeError {
215-
callback(error)
216-
} else {
217-
callback(eventLoopError)
218-
}
219-
}
220-
}
221-
}
208+
self.poolManager.shutdown()
222209
}
223210

224211
/// Execute `GET` request using specified URL.
@@ -490,7 +477,7 @@ public class HTTPClient {
490477
let taskEL: EventLoop
491478
switch eventLoopPreference.preference {
492479
case .indifferent:
493-
taskEL = self.pool.associatedEventLoop(for: ConnectionPool.Key(request)) ?? self.eventLoopGroup.next()
480+
taskEL = self.eventLoopGroup.next()
494481
case .delegate(on: let eventLoop):
495482
precondition(self.eventLoopGroup.makeIterator().contains { $0 === eventLoop }, "Provided EventLoop must be part of clients EventLoopGroup.")
496483
taskEL = eventLoop
@@ -538,77 +525,30 @@ public class HTTPClient {
538525
}
539526

540527
let task = Task<Delegate.Response>(eventLoop: taskEL, logger: logger)
541-
let setupComplete = taskEL.makePromise(of: Void.self)
542-
let connection = self.pool.getConnection(request,
543-
preference: eventLoopPreference,
544-
taskEventLoop: taskEL,
545-
deadline: deadline,
546-
setupComplete: setupComplete.futureResult,
547-
logger: logger)
548-
549-
let taskHandler = TaskHandler(task: task,
550-
kind: request.kind,
551-
delegate: delegate,
552-
redirectHandler: redirectHandler,
553-
ignoreUncleanSSLShutdown: self.configuration.ignoreUncleanSSLShutdown,
554-
logger: logger)
555-
556-
connection.flatMap { connection -> EventLoopFuture<Void> in
557-
logger.debug("got connection for request",
558-
metadata: ["ahc-connection": "\(connection)",
559-
"ahc-request": "\(request.method) \(request.url)",
560-
"ahc-channel-el": "\(connection.channel.eventLoop)",
561-
"ahc-task-el": "\(taskEL)"])
562-
563-
let channel = connection.channel
564-
565-
func prepareChannelForTask0() -> EventLoopFuture<Void> {
566-
do {
567-
let syncPipelineOperations = channel.pipeline.syncOperations
568-
569-
if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
570-
try syncPipelineOperations.addHandler(IdleStateHandler(readTimeout: timeout))
571-
}
572-
573-
try syncPipelineOperations.addHandler(taskHandler)
574-
} catch {
575-
connection.release(closing: true, logger: logger)
576-
return channel.eventLoop.makeFailedFuture(error)
577-
}
578-
579-
task.setConnection(connection)
580528

581-
let isCancelled = task.lock.withLock {
582-
task.cancelled
583-
}
584-
585-
if !isCancelled {
586-
return channel.writeAndFlush(request).flatMapError { _ in
587-
// At this point the `TaskHandler` will already be present
588-
// to handle the failure and pass it to the `promise`
589-
channel.eventLoop.makeSucceededVoidFuture()
590-
}
591-
} else {
592-
return channel.eventLoop.makeSucceededVoidFuture()
593-
}
529+
let requestBag = RequestBag(
530+
request: request,
531+
eventLoopPreference: eventLoopPreference,
532+
task: task,
533+
redirectHandler: redirectHandler,
534+
connectionDeadline: .now() + (self.configuration.timeout.connect ?? .seconds(10)),
535+
idleReadTimeout: self.configuration.timeout.read,
536+
delegate: delegate
537+
)
538+
539+
var deadlineSchedule: Scheduled<Void>?
540+
if let deadline = deadline {
541+
deadlineSchedule = taskEL.scheduleTask(deadline: deadline) {
542+
requestBag.fail(HTTPClientError.deadlineExceeded)
594543
}
595544

596-
if channel.eventLoop.inEventLoop {
597-
return prepareChannelForTask0()
598-
} else {
599-
return channel.eventLoop.flatSubmit {
600-
return prepareChannelForTask0()
601-
}
602-
}
603-
}.always { _ in
604-
setupComplete.succeed(())
605-
}.whenFailure { error in
606-
taskHandler.callOutToDelegateFireAndForget { task in
607-
delegate.didReceiveError(task: task, error)
545+
task.promise.futureResult.whenComplete { _ in
546+
deadlineSchedule?.cancel()
608547
}
609-
task.promise.fail(error)
610548
}
611549

550+
self.poolManager.execute(request: requestBag)
551+
612552
return task
613553
}
614554

@@ -815,7 +755,7 @@ public class HTTPClient {
815755

816756
enum State {
817757
case upAndRunning
818-
case shuttingDown
758+
case shuttingDown(requiresCleanClose: Bool, callback: (Error?) -> Void)
819759
case shutDown
820760
}
821761
}
@@ -882,6 +822,22 @@ extension HTTPClient.Configuration {
882822
}
883823
}
884824

825+
extension HTTPClient: HTTPConnectionPoolManagerDelegate {
826+
func httpConnectionPoolManagerDidShutdown(_: HTTPConnectionPool.Manager, unclean: Bool) {
827+
let (callback, error) = self.stateLock.withLock { () -> ((Error?) -> Void, Error?) in
828+
guard case .shuttingDown(let requiresClean, callback: let callback) = self.state else {
829+
preconditionFailure("Why did the pool manager shut down, if it was not instructed to")
830+
}
831+
832+
self.state = .shutDown
833+
let error: Error? = (requiresClean && unclean) ? HTTPClientError.uncleanShutdown : nil
834+
return (callback, error)
835+
}
836+
837+
callback(error)
838+
}
839+
}
840+
885841
/// Possible client errors.
886842
public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
887843
private enum Code: Equatable {
@@ -909,6 +865,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
909865
case incompatibleHeaders
910866
case connectTimeout
911867
case getConnectionFromPoolTimeout
868+
case deadlineExceeded
912869
}
913870

914871
private var code: Code
@@ -973,4 +930,6 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
973930
/// - A connection could not be created within the timout period.
974931
/// - Tasks are not processed fast enough on the existing connections, to process all waiters in time
975932
public static let getConnectionFromPoolTimeout = HTTPClientError(code: .getConnectionFromPoolTimeout)
933+
/// The request deadline was exceeded. The request was cancelled because of this.
934+
public static let deadlineExceeded = HTTPClientError(code: .deadlineExceeded)
976935
}

Diff for: Sources/AsyncHTTPClient/HTTPHandler.swift

+32-58
Original file line numberDiff line numberDiff line change
@@ -636,18 +636,33 @@ extension HTTPClient {
636636
public let eventLoop: EventLoop
637637

638638
let promise: EventLoopPromise<Response>
639-
var completion: EventLoopFuture<Void>
640-
var connection: Connection?
641-
var cancelled: Bool
642-
let lock: Lock
639+
640+
var connection: HTTPConnectionPool.Connection? {
641+
self.lock.withLock { self._connection }
642+
}
643+
644+
var isCancelled: Bool {
645+
self.lock.withLock { self._isCancelled }
646+
}
647+
648+
var taskDelegate: HTTPClientTaskDelegate? {
649+
get {
650+
self.lock.withLock { self._taskDelegate }
651+
}
652+
set {
653+
self.lock.withLock { self._taskDelegate = newValue }
654+
}
655+
}
656+
657+
private var _connection: HTTPConnectionPool.Connection?
658+
private var _isCancelled: Bool = false
659+
private var _taskDelegate: HTTPClientTaskDelegate?
660+
private let lock = Lock()
643661
let logger: Logger // We are okay to store the logger here because a Task is for only one request.
644662

645663
init(eventLoop: EventLoop, logger: Logger) {
646664
self.eventLoop = eventLoop
647665
self.promise = eventLoop.makePromise()
648-
self.completion = self.promise.futureResult.map { _ in }
649-
self.cancelled = false
650-
self.lock = Lock()
651666
self.logger = logger
652667
}
653668

@@ -672,69 +687,30 @@ extension HTTPClient {
672687

673688
/// Cancels the request execution.
674689
public func cancel() {
675-
let channel: Channel? = self.lock.withLock {
676-
if !self.cancelled {
677-
self.cancelled = true
678-
return self.connection?.channel
679-
} else {
680-
return nil
681-
}
690+
let taskDelegate = self.lock.withLock { () -> HTTPClientTaskDelegate? in
691+
self._isCancelled = true
692+
return self._taskDelegate
682693
}
683-
channel?.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
694+
695+
taskDelegate?.cancel()
684696
}
685697

686-
@discardableResult
687-
func setConnection(_ connection: Connection) -> Connection {
698+
func setConnection(_ connection: HTTPConnectionPool.Connection) {
688699
return self.lock.withLock {
689-
self.connection = connection
690-
if self.cancelled {
691-
connection.channel.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
692-
}
693-
return connection
700+
self._connection = connection
694701
}
695702
}
696703

697704
func succeed<Delegate: HTTPClientResponseDelegate>(promise: EventLoopPromise<Response>?,
698705
with value: Response,
699706
delegateType: Delegate.Type,
700707
closing: Bool) {
701-
self.releaseAssociatedConnection(delegateType: delegateType,
702-
closing: closing).whenSuccess {
703-
promise?.succeed(value)
704-
}
708+
promise?.succeed(value)
705709
}
706710

707711
func fail<Delegate: HTTPClientResponseDelegate>(with error: Error,
708712
delegateType: Delegate.Type) {
709-
if let connection = self.connection {
710-
self.releaseAssociatedConnection(delegateType: delegateType, closing: true)
711-
.whenSuccess {
712-
self.promise.fail(error)
713-
connection.channel.close(promise: nil)
714-
}
715-
} else {
716-
// this is used in tests where we don't want to bootstrap the whole connection pool
717-
self.promise.fail(error)
718-
}
719-
}
720-
721-
func releaseAssociatedConnection<Delegate: HTTPClientResponseDelegate>(delegateType: Delegate.Type,
722-
closing: Bool) -> EventLoopFuture<Void> {
723-
if let connection = self.connection {
724-
// remove read timeout handler
725-
return connection.removeHandler(IdleStateHandler.self).flatMap {
726-
connection.removeHandler(TaskHandler<Delegate>.self)
727-
}.map {
728-
connection.release(closing: closing, logger: self.logger)
729-
}.flatMapError { error in
730-
fatalError("Couldn't remove taskHandler: \(error)")
731-
}
732-
} else {
733-
// TODO: This seems only reached in some internal unit test
734-
// Maybe there could be a better handling in the future to make
735-
// it an error outside of testing contexts
736-
return self.eventLoop.makeSucceededFuture(())
737-
}
713+
self.promise.fail(error)
738714
}
739715
}
740716
}
@@ -1071,9 +1047,7 @@ extension TaskHandler: ChannelDuplexHandler {
10711047
break
10721048
case .redirected(let head, let redirectURL):
10731049
self.state = .endOrError
1074-
self.task.releaseAssociatedConnection(delegateType: Delegate.self, closing: self.closing).whenSuccess {
1075-
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
1076-
}
1050+
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
10771051
default:
10781052
self.state = .bufferedEnd
10791053
self.handleReadForDelegate(response, context: context)

Diff for: Sources/AsyncHTTPClient/RequestBag.swift

+5-5
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,14 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate>: HTTPRequestTask {
8686
self.idleReadTimeout = idleReadTimeout
8787
self.delegate = delegate
8888

89-
// self.task.taskDelegate = self
90-
// self.task.futureResult.whenComplete { _ in
91-
// self.task.taskDelegate = nil
92-
// }
89+
self.task.taskDelegate = self
90+
self.task.futureResult.whenComplete { _ in
91+
self.task.taskDelegate = nil
92+
}
9393
}
9494

9595
func willBeExecutedOnConnection(_ connection: HTTPConnectionPool.Connection) {
96-
// self.task.setConnection(connection)
96+
self.task.setConnection(connection)
9797
}
9898

9999
func requestWasQueued(_ queuer: HTTP1RequestQueuer) {

0 commit comments

Comments
 (0)