Skip to content

[HTTPConnectionPool] Implementation switch #427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ final class HTTPConnectionPool {
}

func shutdown() {
self.logger.debug("Shutting down connection pool")
self.modifyStateAndRunActions { $0.shutdown() }
}

Expand Down
144 changes: 56 additions & 88 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class HTTPClient {
public let eventLoopGroup: EventLoopGroup
let eventLoopGroupProvider: EventLoopGroupProvider
let configuration: Configuration
let pool: ConnectionPool
let poolManager: HTTPConnectionPool.Manager
var state: State
private let stateLock = Lock()

Expand Down Expand Up @@ -110,14 +110,18 @@ public class HTTPClient {
#endif
}
self.configuration = configuration
self.pool = ConnectionPool(configuration: configuration,
backgroundActivityLogger: backgroundActivityLogger)
self.poolManager = HTTPConnectionPool.Manager(
eventLoopGroup: self.eventLoopGroup,
configuration: self.configuration,
backgroundActivityLogger: backgroundActivityLogger
)
self.state = .upAndRunning
}

deinit {
assert(self.pool.count == 0)
assert(self.state == .shutDown, "Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.")
guard case .shutDown = self.state else {
preconditionFailure("Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.")
}
}

/// Shuts down the client and `EventLoopGroup` if it was created by the client.
Expand Down Expand Up @@ -175,14 +179,16 @@ public class HTTPClient {
switch self.eventLoopGroupProvider {
case .shared:
self.state = .shutDown
callback(nil)
queue.async {
callback(nil)
}
case .createNew:
switch self.state {
case .shuttingDown:
self.state = .shutDown
self.eventLoopGroup.shutdownGracefully(queue: queue, callback)
case .shutDown, .upAndRunning:
assertionFailure("The only valid state at this point is \(State.shutDown)")
assertionFailure("The only valid state at this point is \(String(describing: State.shuttingDown))")
}
}
}
Expand All @@ -191,33 +197,35 @@ public class HTTPClient {
private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
do {
try self.stateLock.withLock {
if self.state != .upAndRunning {
guard case .upAndRunning = self.state else {
throw HTTPClientError.alreadyShutdown
}
self.state = .shuttingDown
self.state = .shuttingDown(requiresCleanClose: requiresCleanClose, callback: callback)
}
} catch {
callback(error)
return
}

self.pool.close(on: self.eventLoopGroup.next()).whenComplete { result in
var closeError: Error?
let promise = self.eventLoopGroup.next().makePromise(of: Bool.self)
self.poolManager.shutdown(promise: promise)
promise.futureResult.whenComplete { result in
switch result {
case .failure(let error):
closeError = error
case .success(let cleanShutdown):
if !cleanShutdown, requiresCleanClose {
closeError = HTTPClientError.uncleanShutdown
case .failure:
preconditionFailure("Shutting down the connection pool must not fail, ever.")
case .success(let unclean):
let (callback, uncleanError) = self.stateLock.withLock { () -> ((Error?) -> Void, Error?) in
guard case .shuttingDown(let requiresClean, callback: let callback) = self.state else {
preconditionFailure("Why did the pool manager shut down, if it was not instructed to")
}

let error: Error? = (requiresClean && unclean) ? HTTPClientError.uncleanShutdown : nil
return (callback, error)
}

self.shutdownEventLoop(queue: queue) { eventLoopError in
// we prioritise .uncleanShutdown here
if let error = closeError {
callback(error)
} else {
callback(eventLoopError)
}
self.shutdownEventLoop(queue: queue) { error in
let reportedError = error ?? uncleanError
callback(reportedError)
}
}
}
Expand Down Expand Up @@ -492,7 +500,7 @@ public class HTTPClient {
let taskEL: EventLoop
switch eventLoopPreference.preference {
case .indifferent:
taskEL = self.pool.associatedEventLoop(for: ConnectionPool.Key(request)) ?? self.eventLoopGroup.next()
taskEL = self.eventLoopGroup.next()
case .delegate(on: let eventLoop):
precondition(self.eventLoopGroup.makeIterator().contains { $0 === eventLoop }, "Provided EventLoop must be part of clients EventLoopGroup.")
taskEL = eventLoop
Expand Down Expand Up @@ -540,75 +548,31 @@ public class HTTPClient {
}

let task = Task<Delegate.Response>(eventLoop: taskEL, logger: logger)
let setupComplete = taskEL.makePromise(of: Void.self)
let connection = self.pool.getConnection(request,
preference: eventLoopPreference,
taskEventLoop: taskEL,
deadline: deadline,
setupComplete: setupComplete.futureResult,
logger: logger)

let taskHandler = TaskHandler(task: task,
kind: request.kind,
delegate: delegate,
redirectHandler: redirectHandler,
ignoreUncleanSSLShutdown: self.configuration.ignoreUncleanSSLShutdown,
logger: logger)

connection.flatMap { connection -> EventLoopFuture<Void> in
logger.debug("got connection for request",
metadata: ["ahc-connection": "\(connection)",
"ahc-request": "\(request.method) \(request.url)",
"ahc-channel-el": "\(connection.channel.eventLoop)",
"ahc-task-el": "\(taskEL)"])

let channel = connection.channel

func prepareChannelForTask0() -> EventLoopFuture<Void> {
do {
let syncPipelineOperations = channel.pipeline.syncOperations

if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
try syncPipelineOperations.addHandler(IdleStateHandler(readTimeout: timeout))
}

try syncPipelineOperations.addHandler(taskHandler)
} catch {
connection.release(closing: true, logger: logger)
return channel.eventLoop.makeFailedFuture(error)
}

task.setConnection(connection)
do {
let requestBag = try RequestBag(
request: request,
eventLoopPreference: eventLoopPreference,
task: task,
redirectHandler: redirectHandler,
connectionDeadline: .now() + (self.configuration.timeout.connect ?? .seconds(10)),
requestOptions: .fromClientConfiguration(self.configuration),
delegate: delegate
)

let isCancelled = task.lock.withLock {
task.cancelled
var deadlineSchedule: Scheduled<Void>?
if let deadline = deadline {
deadlineSchedule = taskEL.scheduleTask(deadline: deadline) {
requestBag.fail(HTTPClientError.deadlineExceeded)
}

if !isCancelled {
return channel.writeAndFlush(request).flatMapError { _ in
// At this point the `TaskHandler` will already be present
// to handle the failure and pass it to the `promise`
channel.eventLoop.makeSucceededVoidFuture()
}
} else {
return channel.eventLoop.makeSucceededVoidFuture()
task.promise.futureResult.whenComplete { _ in
deadlineSchedule?.cancel()
}
}

if channel.eventLoop.inEventLoop {
return prepareChannelForTask0()
} else {
return channel.eventLoop.flatSubmit {
return prepareChannelForTask0()
}
}
}.always { _ in
setupComplete.succeed(())
}.whenFailure { error in
taskHandler.callOutToDelegateFireAndForget { task in
delegate.didReceiveError(task: task, error)
}
task.promise.fail(error)
self.poolManager.executeRequest(requestBag)
} catch {
task.fail(with: error, delegateType: Delegate.self)
}

return task
Expand Down Expand Up @@ -821,7 +785,7 @@ public class HTTPClient {

enum State {
case upAndRunning
case shuttingDown
case shuttingDown(requiresCleanClose: Bool, callback: (Error?) -> Void)
case shutDown
}
}
Expand Down Expand Up @@ -926,6 +890,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
case serverOfferedUnsupportedApplicationProtocol(String)
case requestStreamCancelled
case getConnectionFromPoolTimeout
case deadlineExceeded
}

private var code: Code
Expand Down Expand Up @@ -995,6 +960,9 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
return HTTPClientError(code: .serverOfferedUnsupportedApplicationProtocol(proto))
}

/// The request deadline was exceeded. The request was cancelled because of this.
public static let deadlineExceeded = HTTPClientError(code: .deadlineExceeded)

/// The remote server responded with a status code >= 300, before the full request was sent. The request stream
/// was therefore cancelled
public static let requestStreamCancelled = HTTPClientError(code: .requestStreamCancelled)
Expand Down
85 changes: 24 additions & 61 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -658,18 +658,28 @@ extension HTTPClient {
public let eventLoop: EventLoop

let promise: EventLoopPromise<Response>
var completion: EventLoopFuture<Void>
var connection: Connection?
var cancelled: Bool
let lock: Lock
let logger: Logger // We are okay to store the logger here because a Task is for only one request.

var isCancelled: Bool {
self.lock.withLock { self._isCancelled }
}

var taskDelegate: HTTPClientTaskDelegate? {
get {
self.lock.withLock { self._taskDelegate }
}
set {
self.lock.withLock { self._taskDelegate = newValue }
}
}

private var _isCancelled: Bool = false
private var _taskDelegate: HTTPClientTaskDelegate?
private let lock = Lock()

init(eventLoop: EventLoop, logger: Logger) {
self.eventLoop = eventLoop
self.promise = eventLoop.makePromise()
self.completion = self.promise.futureResult.map { _ in }
self.cancelled = false
self.lock = Lock()
self.logger = logger
}

Expand All @@ -694,69 +704,24 @@ extension HTTPClient {

/// Cancels the request execution.
public func cancel() {
let channel: Channel? = self.lock.withLock {
if !self.cancelled {
self.cancelled = true
return self.connection?.channel
} else {
return nil
}
let taskDelegate = self.lock.withLock { () -> HTTPClientTaskDelegate? in
self._isCancelled = true
return self._taskDelegate
}
channel?.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
}

@discardableResult
func setConnection(_ connection: Connection) -> Connection {
return self.lock.withLock {
self.connection = connection
if self.cancelled {
connection.channel.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
}
return connection
}
taskDelegate?.cancel()
}

func succeed<Delegate: HTTPClientResponseDelegate>(promise: EventLoopPromise<Response>?,
with value: Response,
delegateType: Delegate.Type,
closing: Bool) {
self.releaseAssociatedConnection(delegateType: delegateType,
closing: closing).whenSuccess {
promise?.succeed(value)
}
promise?.succeed(value)
}

func fail<Delegate: HTTPClientResponseDelegate>(with error: Error,
delegateType: Delegate.Type) {
if let connection = self.connection {
self.releaseAssociatedConnection(delegateType: delegateType, closing: true)
.whenSuccess {
self.promise.fail(error)
connection.channel.close(promise: nil)
}
} else {
// this is used in tests where we don't want to bootstrap the whole connection pool
self.promise.fail(error)
}
}

func releaseAssociatedConnection<Delegate: HTTPClientResponseDelegate>(delegateType: Delegate.Type,
closing: Bool) -> EventLoopFuture<Void> {
if let connection = self.connection {
// remove read timeout handler
return connection.removeHandler(IdleStateHandler.self).flatMap {
connection.removeHandler(TaskHandler<Delegate>.self)
}.map {
connection.release(closing: closing, logger: self.logger)
}.flatMapError { error in
fatalError("Couldn't remove taskHandler: \(error)")
}
} else {
// TODO: This seems only reached in some internal unit test
// Maybe there could be a better handling in the future to make
// it an error outside of testing contexts
return self.eventLoop.makeSucceededFuture(())
}
self.promise.fail(error)
Comment on lines 715 to +724
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those functions are not hit from the new implementation. However they are hit from the old implementation. Since we don't want to remove the old implementation in this pr, they should stay for now. We will be able to remove them in a follow up pr.

}
}
}
Expand Down Expand Up @@ -1076,9 +1041,7 @@ extension TaskHandler: ChannelDuplexHandler {
break
case .redirected(let head, let redirectURL):
self.state = .endOrError
self.task.releaseAssociatedConnection(delegateType: Delegate.self, closing: self.closing).whenSuccess {
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
}
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
default:
self.state = .bufferedEnd
self.handleReadForDelegate(response, context: context)
Expand Down
13 changes: 6 additions & 7 deletions Sources/AsyncHTTPClient/RequestBag.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,10 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
self.requestHead = head
self.requestFramingMetadata = metadata

// TODO: comment in once we switch to using the Request bag in AHC
// self.task.taskDelegate = self
// self.task.futureResult.whenComplete { _ in
// self.task.taskDelegate = nil
// }
self.task.taskDelegate = self
self.task.futureResult.whenComplete { _ in
self.task.taskDelegate = nil
}
}

private func requestWasQueued0(_ scheduler: HTTPRequestScheduler) {
Expand Down Expand Up @@ -113,7 +112,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
self.writeNextRequestPart($0)
}

body.stream(writer).whenComplete {
body.stream(writer).hop(to: self.eventLoop).whenComplete {
self.finishRequestBodyStream($0)
}

Expand Down Expand Up @@ -142,7 +141,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
}

private func writeNextRequestPart0(_ part: IOData) -> EventLoopFuture<Void> {
self.task.eventLoop.assertInEventLoop()
self.eventLoop.assertInEventLoop()

let action = self.state.writeNextRequestPart(part, taskEventLoop: self.task.eventLoop)

Expand Down
Loading