Skip to content

Commit 1fe5485

Browse files
committed
Final implementation switch
1 parent 8bd8605 commit 1fe5485

File tree

6 files changed

+88
-639
lines changed

6 files changed

+88
-639
lines changed

Diff for: Sources/AsyncHTTPClient/HTTPClient.swift

+56-88
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class HTTPClient {
6868
public let eventLoopGroup: EventLoopGroup
6969
let eventLoopGroupProvider: EventLoopGroupProvider
7070
let configuration: Configuration
71-
let pool: ConnectionPool
71+
let poolManager: HTTPConnectionPool.Manager
7272
var state: State
7373
private let stateLock = Lock()
7474

@@ -110,14 +110,18 @@ public class HTTPClient {
110110
#endif
111111
}
112112
self.configuration = configuration
113-
self.pool = ConnectionPool(configuration: configuration,
114-
backgroundActivityLogger: backgroundActivityLogger)
113+
self.poolManager = HTTPConnectionPool.Manager(
114+
eventLoopGroup: self.eventLoopGroup,
115+
configuration: self.configuration,
116+
backgroundActivityLogger: backgroundActivityLogger
117+
)
115118
self.state = .upAndRunning
116119
}
117120

118121
deinit {
119-
assert(self.pool.count == 0)
120-
assert(self.state == .shutDown, "Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.")
122+
guard case .shutDown = self.state else {
123+
preconditionFailure("Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.")
124+
}
121125
}
122126

123127
/// Shuts down the client and `EventLoopGroup` if it was created by the client.
@@ -175,14 +179,16 @@ public class HTTPClient {
175179
switch self.eventLoopGroupProvider {
176180
case .shared:
177181
self.state = .shutDown
178-
callback(nil)
182+
queue.async {
183+
callback(nil)
184+
}
179185
case .createNew:
180186
switch self.state {
181187
case .shuttingDown:
182188
self.state = .shutDown
183189
self.eventLoopGroup.shutdownGracefully(queue: queue, callback)
184190
case .shutDown, .upAndRunning:
185-
assertionFailure("The only valid state at this point is \(State.shutDown)")
191+
assertionFailure("The only valid state at this point is \(String(describing: State.shuttingDown))")
186192
}
187193
}
188194
}
@@ -191,33 +197,35 @@ public class HTTPClient {
191197
private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
192198
do {
193199
try self.stateLock.withLock {
194-
if self.state != .upAndRunning {
200+
guard case .upAndRunning = self.state else {
195201
throw HTTPClientError.alreadyShutdown
196202
}
197-
self.state = .shuttingDown
203+
self.state = .shuttingDown(requiresCleanClose: requiresCleanClose, callback: callback)
198204
}
199205
} catch {
200206
callback(error)
201207
return
202208
}
203209

204-
self.pool.close(on: self.eventLoopGroup.next()).whenComplete { result in
205-
var closeError: Error?
210+
let promise = self.eventLoopGroup.next().makePromise(of: Bool.self)
211+
self.poolManager.shutdown(promise: promise)
212+
promise.futureResult.whenComplete { result in
206213
switch result {
207-
case .failure(let error):
208-
closeError = error
209-
case .success(let cleanShutdown):
210-
if !cleanShutdown, requiresCleanClose {
211-
closeError = HTTPClientError.uncleanShutdown
214+
case .failure:
215+
preconditionFailure("Shutting down the connection pool must not fail, ever.")
216+
case .success(let unclean):
217+
let (callback, uncleanError) = self.stateLock.withLock { () -> ((Error?) -> Void, Error?) in
218+
guard case .shuttingDown(let requiresClean, callback: let callback) = self.state else {
219+
preconditionFailure("Why did the pool manager shut down, if it was not instructed to")
220+
}
221+
222+
let error: Error? = (requiresClean && unclean) ? HTTPClientError.uncleanShutdown : nil
223+
return (callback, error)
212224
}
213225

214-
self.shutdownEventLoop(queue: queue) { eventLoopError in
215-
// we prioritise .uncleanShutdown here
216-
if let error = closeError {
217-
callback(error)
218-
} else {
219-
callback(eventLoopError)
220-
}
226+
self.shutdownEventLoop(queue: queue) { error in
227+
let reportedError = error ?? uncleanError
228+
callback(reportedError)
221229
}
222230
}
223231
}
@@ -492,7 +500,7 @@ public class HTTPClient {
492500
let taskEL: EventLoop
493501
switch eventLoopPreference.preference {
494502
case .indifferent:
495-
taskEL = self.pool.associatedEventLoop(for: ConnectionPool.Key(request)) ?? self.eventLoopGroup.next()
503+
taskEL = self.eventLoopGroup.next()
496504
case .delegate(on: let eventLoop):
497505
precondition(self.eventLoopGroup.makeIterator().contains { $0 === eventLoop }, "Provided EventLoop must be part of clients EventLoopGroup.")
498506
taskEL = eventLoop
@@ -540,75 +548,31 @@ public class HTTPClient {
540548
}
541549

542550
let task = Task<Delegate.Response>(eventLoop: taskEL, logger: logger)
543-
let setupComplete = taskEL.makePromise(of: Void.self)
544-
let connection = self.pool.getConnection(request,
545-
preference: eventLoopPreference,
546-
taskEventLoop: taskEL,
547-
deadline: deadline,
548-
setupComplete: setupComplete.futureResult,
549-
logger: logger)
550-
551-
let taskHandler = TaskHandler(task: task,
552-
kind: request.kind,
553-
delegate: delegate,
554-
redirectHandler: redirectHandler,
555-
ignoreUncleanSSLShutdown: self.configuration.ignoreUncleanSSLShutdown,
556-
logger: logger)
557-
558-
connection.flatMap { connection -> EventLoopFuture<Void> in
559-
logger.debug("got connection for request",
560-
metadata: ["ahc-connection": "\(connection)",
561-
"ahc-request": "\(request.method) \(request.url)",
562-
"ahc-channel-el": "\(connection.channel.eventLoop)",
563-
"ahc-task-el": "\(taskEL)"])
564-
565-
let channel = connection.channel
566-
567-
func prepareChannelForTask0() -> EventLoopFuture<Void> {
568-
do {
569-
let syncPipelineOperations = channel.pipeline.syncOperations
570-
571-
if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
572-
try syncPipelineOperations.addHandler(IdleStateHandler(readTimeout: timeout))
573-
}
574-
575-
try syncPipelineOperations.addHandler(taskHandler)
576-
} catch {
577-
connection.release(closing: true, logger: logger)
578-
return channel.eventLoop.makeFailedFuture(error)
579-
}
580-
581-
task.setConnection(connection)
551+
do {
552+
let requestBag = try RequestBag(
553+
request: request,
554+
eventLoopPreference: eventLoopPreference,
555+
task: task,
556+
redirectHandler: redirectHandler,
557+
connectionDeadline: .now() + (self.configuration.timeout.connect ?? .seconds(10)),
558+
idleReadTimeout: self.configuration.timeout.read,
559+
delegate: delegate
560+
)
582561

583-
let isCancelled = task.lock.withLock {
584-
task.cancelled
562+
var deadlineSchedule: Scheduled<Void>?
563+
if let deadline = deadline {
564+
deadlineSchedule = taskEL.scheduleTask(deadline: deadline) {
565+
requestBag.fail(HTTPClientError.deadlineExceeded)
585566
}
586567

587-
if !isCancelled {
588-
return channel.writeAndFlush(request).flatMapError { _ in
589-
// At this point the `TaskHandler` will already be present
590-
// to handle the failure and pass it to the `promise`
591-
channel.eventLoop.makeSucceededVoidFuture()
592-
}
593-
} else {
594-
return channel.eventLoop.makeSucceededVoidFuture()
568+
task.promise.futureResult.whenComplete { _ in
569+
deadlineSchedule?.cancel()
595570
}
596571
}
597572

598-
if channel.eventLoop.inEventLoop {
599-
return prepareChannelForTask0()
600-
} else {
601-
return channel.eventLoop.flatSubmit {
602-
return prepareChannelForTask0()
603-
}
604-
}
605-
}.always { _ in
606-
setupComplete.succeed(())
607-
}.whenFailure { error in
608-
taskHandler.callOutToDelegateFireAndForget { task in
609-
delegate.didReceiveError(task: task, error)
610-
}
611-
task.promise.fail(error)
573+
self.poolManager.executeRequest(requestBag)
574+
} catch {
575+
task.fail(with: error, delegateType: Delegate.self)
612576
}
613577

614578
return task
@@ -821,7 +785,7 @@ public class HTTPClient {
821785

822786
enum State {
823787
case upAndRunning
824-
case shuttingDown
788+
case shuttingDown(requiresCleanClose: Bool, callback: (Error?) -> Void)
825789
case shutDown
826790
}
827791
}
@@ -926,6 +890,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
926890
case serverOfferedUnsupportedApplicationProtocol(String)
927891
case requestStreamCancelled
928892
case getConnectionFromPoolTimeout
893+
case deadlineExceeded
929894
}
930895

931896
private var code: Code
@@ -995,6 +960,9 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
995960
return HTTPClientError(code: .serverOfferedUnsupportedApplicationProtocol(proto))
996961
}
997962

963+
/// The request deadline was exceeded. The request was cancelled because of this.
964+
public static let deadlineExceeded = HTTPClientError(code: .deadlineExceeded)
965+
998966
/// The remote server responded with a status code >= 300, before the full request was sent. The request stream
999967
/// was therefore cancelled
1000968
public static let requestStreamCancelled = HTTPClientError(code: .requestStreamCancelled)

Diff for: Sources/AsyncHTTPClient/HTTPHandler.swift

+24-61
Original file line numberDiff line numberDiff line change
@@ -658,18 +658,28 @@ extension HTTPClient {
658658
public let eventLoop: EventLoop
659659

660660
let promise: EventLoopPromise<Response>
661-
var completion: EventLoopFuture<Void>
662-
var connection: Connection?
663-
var cancelled: Bool
664-
let lock: Lock
665661
let logger: Logger // We are okay to store the logger here because a Task is for only one request.
666662

663+
var isCancelled: Bool {
664+
self.lock.withLock { self._isCancelled }
665+
}
666+
667+
var taskDelegate: HTTPClientTaskDelegate? {
668+
get {
669+
self.lock.withLock { self._taskDelegate }
670+
}
671+
set {
672+
self.lock.withLock { self._taskDelegate = newValue }
673+
}
674+
}
675+
676+
private var _isCancelled: Bool = false
677+
private var _taskDelegate: HTTPClientTaskDelegate?
678+
private let lock = Lock()
679+
667680
init(eventLoop: EventLoop, logger: Logger) {
668681
self.eventLoop = eventLoop
669682
self.promise = eventLoop.makePromise()
670-
self.completion = self.promise.futureResult.map { _ in }
671-
self.cancelled = false
672-
self.lock = Lock()
673683
self.logger = logger
674684
}
675685

@@ -694,69 +704,24 @@ extension HTTPClient {
694704

695705
/// Cancels the request execution.
696706
public func cancel() {
697-
let channel: Channel? = self.lock.withLock {
698-
if !self.cancelled {
699-
self.cancelled = true
700-
return self.connection?.channel
701-
} else {
702-
return nil
703-
}
707+
let taskDelegate = self.lock.withLock { () -> HTTPClientTaskDelegate? in
708+
self._isCancelled = true
709+
return self._taskDelegate
704710
}
705-
channel?.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
706-
}
707711

708-
@discardableResult
709-
func setConnection(_ connection: Connection) -> Connection {
710-
return self.lock.withLock {
711-
self.connection = connection
712-
if self.cancelled {
713-
connection.channel.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
714-
}
715-
return connection
716-
}
712+
taskDelegate?.cancel()
717713
}
718714

719715
func succeed<Delegate: HTTPClientResponseDelegate>(promise: EventLoopPromise<Response>?,
720716
with value: Response,
721717
delegateType: Delegate.Type,
722718
closing: Bool) {
723-
self.releaseAssociatedConnection(delegateType: delegateType,
724-
closing: closing).whenSuccess {
725-
promise?.succeed(value)
726-
}
719+
promise?.succeed(value)
727720
}
728721

729722
func fail<Delegate: HTTPClientResponseDelegate>(with error: Error,
730723
delegateType: Delegate.Type) {
731-
if let connection = self.connection {
732-
self.releaseAssociatedConnection(delegateType: delegateType, closing: true)
733-
.whenSuccess {
734-
self.promise.fail(error)
735-
connection.channel.close(promise: nil)
736-
}
737-
} else {
738-
// this is used in tests where we don't want to bootstrap the whole connection pool
739-
self.promise.fail(error)
740-
}
741-
}
742-
743-
func releaseAssociatedConnection<Delegate: HTTPClientResponseDelegate>(delegateType: Delegate.Type,
744-
closing: Bool) -> EventLoopFuture<Void> {
745-
if let connection = self.connection {
746-
// remove read timeout handler
747-
return connection.removeHandler(IdleStateHandler.self).flatMap {
748-
connection.removeHandler(TaskHandler<Delegate>.self)
749-
}.map {
750-
connection.release(closing: closing, logger: self.logger)
751-
}.flatMapError { error in
752-
fatalError("Couldn't remove taskHandler: \(error)")
753-
}
754-
} else {
755-
// TODO: This seems only reached in some internal unit test
756-
// Maybe there could be a better handling in the future to make
757-
// it an error outside of testing contexts
758-
return self.eventLoop.makeSucceededFuture(())
759-
}
724+
self.promise.fail(error)
760725
}
761726
}
762727
}
@@ -1076,9 +1041,7 @@ extension TaskHandler: ChannelDuplexHandler {
10761041
break
10771042
case .redirected(let head, let redirectURL):
10781043
self.state = .endOrError
1079-
self.task.releaseAssociatedConnection(delegateType: Delegate.self, closing: self.closing).whenSuccess {
1080-
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
1081-
}
1044+
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
10821045
default:
10831046
self.state = .bufferedEnd
10841047
self.handleReadForDelegate(response, context: context)

Diff for: Sources/AsyncHTTPClient/RequestBag.swift

+6-6
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
6666
self.requestFramingMetadata = metadata
6767

6868
// TODO: comment in once we switch to using the Request bag in AHC
69-
// self.task.taskDelegate = self
70-
// self.task.futureResult.whenComplete { _ in
71-
// self.task.taskDelegate = nil
72-
// }
69+
self.task.taskDelegate = self
70+
self.task.futureResult.whenComplete { _ in
71+
self.task.taskDelegate = nil
72+
}
7373
}
7474

7575
private func requestWasQueued0(_ scheduler: HTTPRequestScheduler) {
@@ -111,7 +111,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
111111
self.writeNextRequestPart($0)
112112
}
113113

114-
body.stream(writer).whenComplete {
114+
body.stream(writer).hop(to: self.eventLoop).whenComplete {
115115
self.finishRequestBodyStream($0)
116116
}
117117

@@ -140,7 +140,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
140140
}
141141

142142
private func writeNextRequestPart0(_ part: IOData) -> EventLoopFuture<Void> {
143-
self.task.eventLoop.assertInEventLoop()
143+
self.eventLoop.assertInEventLoop()
144144

145145
let action = self.state.writeNextRequestPart(part, taskEventLoop: self.task.eventLoop)
146146

0 commit comments

Comments
 (0)