Skip to content

Add new HTTPConnectionPool Manager #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,14 @@ final class HTTP1Connection {
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
}

func close(promise: EventLoopPromise<Void>?) {
return self.channel.close(mode: .all, promise: promise)
}

func close() -> EventLoopFuture<Void> {
return self.channel.close()
let promise = self.channel.eventLoop.makePromise(of: Void.self)
self.close(promise: promise)
return promise.futureResult
}

func taskCompleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,14 @@ final class HTTP2Connection {
}
}

func close(promise: EventLoopPromise<Void>?) {
return self.channel.close(mode: .all, promise: promise)
}

func close() -> EventLoopFuture<Void> {
self.channel.close()
let promise = self.channel.eventLoop.makePromise(of: Void.self)
self.close(promise: promise)
return promise.futureResult
}

private func start() -> EventLoopFuture<Void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,154 @@
//
//===----------------------------------------------------------------------===//

import Logging
import NIO
import NIOConcurrencyHelpers
import NIOHTTP1

extension HTTPConnectionPool {
final class Manager {
private typealias Key = ConnectionPool.Key

private enum State {
case active
case shuttingDown(promise: EventLoopPromise<Bool>?, unclean: Bool)
case shutDown
}

private let eventLoopGroup: EventLoopGroup
private let configuration: HTTPClient.Configuration
private let connectionIDGenerator = Connection.ID.globalGenerator
private let logger: Logger

private var state: State = .active
private var _pools: [Key: HTTPConnectionPool] = [:]
private let lock = Lock()

private let sslContextCache = SSLContextCache()

init(eventLoopGroup: EventLoopGroup,
configuration: HTTPClient.Configuration,
backgroundActivityLogger logger: Logger) {
self.eventLoopGroup = eventLoopGroup
self.configuration = configuration
self.logger = logger
}

func executeRequest(_ request: HTTPSchedulableRequest) {
let poolKey = request.poolKey
let poolResult = self.lock.withLock { () -> Result<HTTPConnectionPool, HTTPClientError> in
switch self.state {
case .active:
if let pool = self._pools[poolKey] {
return .success(pool)
}

let pool = HTTPConnectionPool(
eventLoopGroup: self.eventLoopGroup,
sslContextCache: self.sslContextCache,
tlsConfiguration: request.tlsConfiguration,
clientConfiguration: self.configuration,
key: poolKey,
delegate: self,
idGenerator: self.connectionIDGenerator,
backgroundActivityLogger: self.logger
)
self._pools[poolKey] = pool
return .success(pool)

case .shuttingDown, .shutDown:
return .failure(HTTPClientError.alreadyShutdown)
}
}

switch poolResult {
case .success(let pool):
pool.executeRequest(request)
case .failure(let error):
request.fail(error)
}
}

/// Shutdown the connection pool manager. You **must** shutdown the pool manager, since it leak otherwise.
///
/// - Parameter promise: An `EventLoopPromise` that is succeeded once all connections pools are shutdown.
/// - Returns: An EventLoopFuture that is succeeded once the pool is shutdown. The bool indicates if the
/// shutdown was unclean.
func shutdown(promise: EventLoopPromise<Bool>?) {
enum ShutdownAction {
case done(EventLoopPromise<Bool>?)
case shutdown([Key: HTTPConnectionPool])
}

let action = self.lock.withLock { () -> ShutdownAction in
switch self.state {
case .active:
// If there aren't any pools, we can mark the pool as shut down right away.
if self._pools.isEmpty {
self.state = .shutDown
return .done(promise)
} else {
// this promise will be succeeded once all connection pools are shutdown
self.state = .shuttingDown(promise: promise, unclean: false)
return .shutdown(self._pools)
}

case .shuttingDown, .shutDown:
preconditionFailure("PoolManager already shutdown")
}
}

// if no pools are returned, the manager is already shutdown completely. Inform the
// delegate. This is a very clean shutdown...
switch action {
case .done(let promise):
promise?.succeed(false)

case .shutdown(let pools):
pools.values.forEach { pool in
pool.shutdown()
}
}
}
}
}

extension HTTPConnectionPool.Manager: HTTPConnectionPoolDelegate {
func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool) {
enum CloseAction {
case close(EventLoopPromise<Bool>?, unclean: Bool)
case wait
}

let closeAction = self.lock.withLock { () -> CloseAction in
switch self.state {
case .active, .shutDown:
preconditionFailure("Why are pools shutting down, if the manager did not give a signal")

case .shuttingDown(let promise, let soFarUnclean):
guard self._pools.removeValue(forKey: pool.key) === pool else {
preconditionFailure("Expected that the pool was created by this manager and is known for this reason.")
}

if self._pools.isEmpty {
self.state = .shutDown
return .close(promise, unclean: soFarUnclean || unclean)
} else {
self.state = .shuttingDown(promise: promise, unclean: soFarUnclean || unclean)
return .wait
}
}
}

switch closeAction {
case .close(let promise, unclean: let unclean):
promise?.succeed(unclean)
case .wait:
break
}
}
}

extension HTTPConnectionPool.Connection.ID {
static var globalGenerator = Generator()
Expand Down
Loading