diff --git a/Sources/AsyncHTTPClient/Connection.swift b/Sources/AsyncHTTPClient/Connection.swift deleted file mode 100644 index a4a9b7c6a..000000000 --- a/Sources/AsyncHTTPClient/Connection.swift +++ /dev/null @@ -1,178 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2019-2020 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Foundation -import Logging -import NIOConcurrencyHelpers -import NIOCore -import NIOHTTP1 -import NIOHTTPCompression -import NIOTLS -import NIOTransportServices - -/// A `Connection` represents a `Channel` in the context of the connection pool -/// -/// In the `ConnectionPool`, each `Channel` belongs to a given `HTTP1ConnectionProvider` -/// and has a certain "lease state" (see the `inUse` property). -/// The role of `Connection` is to model this by storing a `Channel` alongside its associated properties -/// so that they can be passed around together and correct provider can be identified when connection is released. -class Connection { - /// The provider this `Connection` belongs to. - /// - /// This enables calling methods like `release()` directly on a `Connection` instead of - /// calling `provider.release(connection)`. This gives a more object oriented feel to the API - /// and can avoid having to keep explicit references to the pool at call site. - private let provider: HTTP1ConnectionProvider - - /// The `Channel` of this `Connection` - /// - /// - Warning: Requests that lease connections from the `ConnectionPool` are responsible - /// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool. - let channel: Channel - - init(channel: Channel, provider: HTTP1ConnectionProvider) { - self.channel = channel - self.provider = provider - } -} - -extension Connection { - /// Release this `Connection` to its associated `HTTP1ConnectionProvider`. - /// - /// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline. - func release(closing: Bool, logger: Logger) { - self.channel.eventLoop.assertInEventLoop() - self.provider.release(connection: self, closing: closing, logger: logger) - } - - /// Called when channel exceeds idle time in pool. - func timeout(logger: Logger) { - self.channel.eventLoop.assertInEventLoop() - self.provider.timeout(connection: self, logger: logger) - } - - /// Called when channel goes inactive while in the pool. - func remoteClosed(logger: Logger) { - self.channel.eventLoop.assertInEventLoop() - self.provider.remoteClosed(connection: self, logger: logger) - } - - /// Called from `HTTP1ConnectionProvider.close` when client is shutting down. - func close() -> EventLoopFuture { - return self.channel.close() - } - - func close(promise: EventLoopPromise?) { - return self.channel.close(promise: promise) - } -} - -/// Methods of Connection which are used in ConnectionsState extracted as protocol -/// to facilitate test of ConnectionsState. -protocol PoolManageableConnection: AnyObject { - func cancel() -> EventLoopFuture - var eventLoop: EventLoop { get } - var isActiveEstimation: Bool { get } -} - -/// Implementation of methods used by ConnectionsState and its tests to manage Connection -extension Connection: PoolManageableConnection { - /// Convenience property indicating whether the underlying `Channel` is active or not. - var isActiveEstimation: Bool { - return self.channel.isActive - } - - var eventLoop: EventLoop { - return self.channel.eventLoop - } - - func cancel() -> EventLoopFuture { - return self.channel.triggerUserOutboundEvent(TaskCancelEvent()) - } -} - -extension Connection { - /// Sets idle timeout handler and channel inactivity listener. - func setIdleTimeout(timeout: TimeAmount?, logger: Logger) { - _ = self.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: timeout), position: .first).flatMap { _ in - self.channel.pipeline.addHandler(IdlePoolConnectionHandler(connection: self, logger: logger)) - } - } - - /// Removes idle timeout handler and channel inactivity listener - func cancelIdleTimeout() -> EventLoopFuture { - return self.removeHandler(IdleStateHandler.self).flatMap { _ in - self.removeHandler(IdlePoolConnectionHandler.self) - } - } -} - -class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler { - typealias InboundIn = NIOAny - - let connection: Connection - var eventSent: Bool - let logger: Logger - - init(connection: Connection, logger: Logger) { - self.connection = connection - self.eventSent = false - self.logger = logger - } - - // this is needed to detect when remote end closes connection while connection is in the pool idling - func channelInactive(context: ChannelHandlerContext) { - if !self.eventSent { - self.eventSent = true - self.connection.remoteClosed(logger: self.logger) - } - } - - func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { - if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { - if !self.eventSent { - self.eventSent = true - self.connection.timeout(logger: self.logger) - } - } else { - context.fireUserInboundEventTriggered(event) - } - } -} - -extension Connection: CustomStringConvertible { - var description: String { - return "\(self.channel)" - } -} - -struct ConnectionKey: Hashable where ConnectionType: PoolManageableConnection { - let connection: ConnectionType - - init(_ connection: ConnectionType) { - self.connection = connection - } - - static func == (lhs: ConnectionKey, rhs: ConnectionKey) -> Bool { - return ObjectIdentifier(lhs.connection) == ObjectIdentifier(rhs.connection) - } - - func hash(into hasher: inout Hasher) { - hasher.combine(ObjectIdentifier(self.connection)) - } - - func cancel() -> EventLoopFuture { - return self.connection.cancel() - } -} diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index db1b22e8b..859f5d151 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -2,7 +2,7 @@ // // This source file is part of the AsyncHTTPClient open source project // -// Copyright (c) 2019-2020 Apple Inc. and the AsyncHTTPClient project authors +// Copyright (c) 2019-2021 Apple Inc. and the AsyncHTTPClient project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,113 +12,7 @@ // //===----------------------------------------------------------------------===// -import Foundation -import Logging -import NIOConcurrencyHelpers -import NIOCore -import NIOHTTP1 -import NIOSSL -import NIOTLS -import NIOTransportServices - -/// A connection pool that manages and creates new connections to hosts respecting the specified preferences -/// -/// - Note: All `internal` methods of this class are thread safe -final class ConnectionPool { - /// The configuration used to bootstrap new HTTP connections - private let configuration: HTTPClient.Configuration - - /// The main data structure used by the `ConnectionPool` to retreive and create connections associated - /// to a given `Key` . - /// - /// - Warning: This property should be accessed with proper synchronization, see `lock` - private var providers: [Key: HTTP1ConnectionProvider] = [:] - - /// The lock used by the connection pool used to ensure correct synchronization of accesses to `providers` - /// - /// - Warning: This lock should always be acquired *before* `HTTP1ConnectionProvider`s `lock` if used in combination with it. - private let lock = Lock() - - private let backgroundActivityLogger: Logger - - let sslContextCache = SSLContextCache() - - init(configuration: HTTPClient.Configuration, backgroundActivityLogger: Logger) { - self.configuration = configuration - self.backgroundActivityLogger = backgroundActivityLogger - } - - /// Gets the `EventLoop` associated with the given `Key` if it exists - /// - /// This is part of an optimization used by the `.execute(...)` method when - /// a request has its `EventLoopPreference` property set to `.indifferent`. - /// Having a default `EventLoop` shared by the *channel* and the *delegate* avoids - /// loss of performance due to `EventLoop` hopping - func associatedEventLoop(for key: Key) -> EventLoop? { - return self.lock.withLock { - self.providers[key]?.eventLoop - } - } - - /// This method asks the pool for a connection usable by the specified `request`, respecting the specified options. - /// - /// - parameter request: The request that needs a `Connection` - /// - parameter preference: The `EventLoopPreference` the connection pool will respect to lease a new connection - /// - parameter deadline: The connection timeout - /// - Returns: A connection corresponding to the specified parameters - /// - /// When the pool is asked for a new connection, it creates a `Key` from the url associated to the `request`. This key - /// is used to determine if there already exists an associated `HTTP1ConnectionProvider` in `providers`. - /// If there is, the connection provider then takes care of leasing a new connection. If a connection provider doesn't exist, it is created. - func getConnection(_ request: HTTPClient.Request, - preference: HTTPClient.EventLoopPreference, - taskEventLoop: EventLoop, - deadline: NIODeadline?, - setupComplete: EventLoopFuture, - logger: Logger) -> EventLoopFuture { - let key = Key(request) - - let provider: HTTP1ConnectionProvider = self.lock.withLock { - if let existing = self.providers[key], existing.enqueue() { - return existing - } else { - let provider = HTTP1ConnectionProvider(key: key, - eventLoop: taskEventLoop, - configuration: key.config(overriding: self.configuration), - tlsConfiguration: request.tlsConfiguration, - pool: self, - sslContextCache: self.sslContextCache, - backgroundActivityLogger: self.backgroundActivityLogger) - let enqueued = provider.enqueue() - assert(enqueued) - self.providers[key] = provider - return provider - } - } - - return provider.getConnection(preference: preference, setupComplete: setupComplete, logger: logger) - } - - func delete(_ provider: HTTP1ConnectionProvider) { - self.lock.withLockVoid { - self.providers[provider.key] = nil - } - } - - func close(on eventLoop: EventLoop) -> EventLoopFuture { - let providers = self.lock.withLock { - self.providers.values - } - - return EventLoopFuture.reduce(true, providers.map { $0.close() }, on: eventLoop) { $0 && $1 } - } - - var count: Int { - return self.lock.withLock { - return self.providers.count - } - } - +enum ConnectionPool { /// Used by the `ConnectionPool` to index its `HTTP1ConnectionProvider`s /// /// A key is initialized from a `URL`, it uses the components to derive a hashed value @@ -194,310 +88,3 @@ final class ConnectionPool { } } } - -/// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port) -/// -/// On top of enabling connection reuse this provider it also facilitates the creation -/// of concurrent requests as it has built-in politeness regarding the maximum number -/// of concurrent requests to the server. -class HTTP1ConnectionProvider { - /// The client configuration used to bootstrap new requests - private let configuration: HTTPClient.Configuration - - /// The pool this provider belongs to - private let pool: ConnectionPool - - /// The key associated with this provider - let key: ConnectionPool.Key - - /// The default `EventLoop` for this provider - /// - /// The default event loop is used to create futures and is used when creating `Channel`s for requests - /// for which the `EventLoopPreference` is set to `.indifferent` - let eventLoop: EventLoop - - /// The lock used to access and modify the provider state - `availableConnections`, `waiters` and `openedConnectionsCount`. - /// - /// - Warning: This lock should always be acquired *after* `ConnectionPool`s `lock` if used in combination with it. - private let lock = Lock() - - var closePromise: EventLoopPromise - - var state: ConnectionsState - - private let backgroundActivityLogger: Logger - - private let factory: HTTPConnectionPool.ConnectionFactory - - /// Creates a new `HTTP1ConnectionProvider` - /// - /// - parameters: - /// - key: The `Key` (host, scheme, port) this provider is associated to - /// - configuration: The client configuration used globally by all requests - /// - initialConnection: The initial connection the pool initializes this provider with - /// - pool: The pool this provider belongs to - /// - backgroundActivityLogger: The logger used to log activity in the background, ie. not associated with a - /// request. - init(key: ConnectionPool.Key, - eventLoop: EventLoop, - configuration: HTTPClient.Configuration, - tlsConfiguration: TLSConfiguration?, - pool: ConnectionPool, - sslContextCache: SSLContextCache, - backgroundActivityLogger: Logger) { - self.eventLoop = eventLoop - self.configuration = configuration - self.key = key - self.pool = pool - self.closePromise = eventLoop.makePromise() - self.state = .init(eventLoop: eventLoop) - self.backgroundActivityLogger = backgroundActivityLogger - - self.factory = HTTPConnectionPool.ConnectionFactory( - key: self.key, - tlsConfiguration: tlsConfiguration, - clientConfiguration: self.configuration, - sslContextCache: sslContextCache - ) - } - - deinit { - self.state.assertInvariants() - } - - func execute(_ action: Action, logger: Logger) { - switch action { - case .lease(let connection, let waiter): - // if connection is became inactive, we create a new one. - connection.cancelIdleTimeout().whenComplete { _ in - if connection.isActiveEstimation { - logger.trace("leasing existing connection", - metadata: ["ahc-connection": "\(connection)"]) - waiter.promise.succeed(connection) - } else { - logger.trace("opening fresh connection (found matching but inactive connection)", - metadata: ["ahc-dead-connection": "\(connection)"]) - self.makeChannel(preference: waiter.preference, - logger: logger).whenComplete { result in - self.connect(result, waiter: waiter, logger: logger) - } - } - } - case .create(let waiter): - logger.trace("opening fresh connection (no connections to reuse available)") - self.makeChannel(preference: waiter.preference, logger: logger).whenComplete { result in - self.connect(result, waiter: waiter, logger: logger) - } - case .replace(let connection, let waiter): - connection.cancelIdleTimeout().flatMap { - connection.close() - }.whenComplete { _ in - logger.trace("opening fresh connection (replacing exising connection)", - metadata: ["ahc-old-connection": "\(connection)", - "ahc-waiter": "\(waiter)"]) - self.makeChannel(preference: waiter.preference, logger: logger).whenComplete { result in - self.connect(result, waiter: waiter, logger: logger) - } - } - case .park(let connection): - logger.trace("parking connection", - metadata: ["ahc-connection": "\(connection)"]) - connection.setIdleTimeout(timeout: self.configuration.connectionPool.idleTimeout, - logger: self.backgroundActivityLogger) - case .closeProvider: - logger.debug("closing provider", - metadata: ["ahc-provider": "\(self)"]) - self.closeAndDelete() - case .none: - break - case .parkAnd(let connection, let action): - logger.trace("parking connection & doing further action", - metadata: ["ahc-connection": "\(connection)", - "ahc-action": "\(action)"]) - connection.setIdleTimeout(timeout: self.configuration.connectionPool.idleTimeout, - logger: self.backgroundActivityLogger) - self.execute(action, logger: logger) - case .closeAnd(let connection, let action): - logger.trace("closing connection & doing further action", - metadata: ["ahc-connection": "\(connection)", - "ahc-action": "\(action)"]) - connection.channel.close(promise: nil) - self.execute(action, logger: logger) - case .fail(let waiter, let error): - logger.debug("failing connection for waiter", - metadata: ["ahc-waiter": "\(waiter)", - "ahc-error": "\(error)"]) - waiter.promise.fail(error) - } - } - - /// This function is needed to ensure that there is no race between getting a provider from map, and shutting it down when there are no requests processed by it. - func enqueue() -> Bool { - return self.lock.withLock { - self.state.enqueue() - } - } - - func getConnection(preference: HTTPClient.EventLoopPreference, - setupComplete: EventLoopFuture, - logger: Logger) -> EventLoopFuture { - let waiter = Waiter(promise: self.eventLoop.makePromise(), setupComplete: setupComplete, preference: preference) - - let action: Action = self.lock.withLock { - self.state.acquire(waiter: waiter) - } - - self.execute(action, logger: logger) - - return waiter.promise.futureResult - } - - func connect(_ result: Result, waiter: Waiter, logger: Logger) { - let action: Action - switch result { - case .success(let channel): - logger.trace("successfully created connection", - metadata: ["ahc-connection": "\(channel)"]) - let connection = Connection(channel: channel, provider: self) - action = self.lock.withLock { - self.state.offer(connection: connection) - } - - switch action { - case .closeAnd: - // This happens when client was shut down during connect - logger.trace("connection cancelled due to client shutdown", - metadata: ["ahc-connection": "\(channel)"]) - connection.channel.close(promise: nil) - waiter.promise.fail(HTTPClientError.cancelled) - default: - waiter.promise.succeed(connection) - } - case .failure(let error): - logger.debug("connection attempt failed", - metadata: ["ahc-error": "\(error)"]) - action = self.lock.withLock { - self.state.connectFailed() - } - waiter.promise.fail(error) - } - - waiter.setupComplete.whenComplete { _ in - self.execute(action, logger: logger) - } - } - - func release(connection: Connection, closing: Bool, logger: Logger) { - logger.debug("releasing connection, request complete", - metadata: ["ahc-closing": "\(closing)"]) - let action: Action = self.lock.withLock { - self.state.release(connection: connection, closing: closing) - } - - // We close defensively here: we may have failed to actually close on other codepaths, - // or we may be expecting the server to close. In either case, we want our FD back, so - // we close now to cover our backs. We don't care about the result: if the channel is - // _already_ closed, that's fine by us. - if closing { - connection.close(promise: nil) - } - - switch action { - case .none: - break - case .park, .closeProvider: - // Since both `.park` and `.deleteProvider` are terminal in terms of execution, - // we can execute them immediately - self.execute(action, logger: logger) - case .closeAnd, .create, .fail, .lease, .parkAnd, .replace: - // This is needed to start a new stack, otherwise, since this is called on a previous - // future completion handler chain, it will be growing indefinitely until the connection is closed. - // We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved. - connection.eventLoop.execute { - self.execute(action, logger: logger) - } - } - } - - func remoteClosed(connection: Connection, logger: Logger) { - let action: Action = self.lock.withLock { - self.state.remoteClosed(connection: connection) - } - - self.execute(action, logger: logger) - } - - func timeout(connection: Connection, logger: Logger) { - let action: Action = self.lock.withLock { - self.state.timeout(connection: connection) - } - - self.execute(action, logger: logger) - } - - private func closeAndDelete() { - self.pool.delete(self) - self.closePromise.succeed(()) - } - - func close() -> EventLoopFuture { - if let (waiters, available, leased, clean) = self.lock.withLock({ self.state.close() }) { - waiters.forEach { - $0.promise.fail(HTTPClientError.cancelled) - } - - if available.isEmpty, leased.isEmpty, clean { - self.closePromise.succeed(()) - return self.closePromise.futureResult.map { clean } - } - - EventLoopFuture.andAllComplete(leased.map { $0.cancel() }, on: self.eventLoop).flatMap { _ in - EventLoopFuture.andAllComplete(available.map { $0.close() }, on: self.eventLoop) - }.whenFailure { error in - self.closePromise.fail(error) - } - - return self.closePromise.futureResult.map { clean } - } - - return self.closePromise.futureResult.map { true } - } - - private func makeChannel(preference: HTTPClient.EventLoopPreference, - logger: Logger) -> EventLoopFuture { - let connectionID = HTTPConnectionPool.Connection.ID.globalGenerator.next() - let eventLoop = preference.bestEventLoop ?? self.eventLoop - let deadline = .now() + self.configuration.timeout.connectionCreationTimeout - return self.factory.makeHTTP1Channel( - connectionID: connectionID, - deadline: deadline, - eventLoop: eventLoop, - logger: logger - ) - } - - /// A `Waiter` represents a request that waits for a connection when none is - /// currently available - /// - /// `Waiter`s are created when `maximumConcurrentConnections` is reached - /// and we cannot create new connections anymore. - struct Waiter { - /// The promise to complete once a connection is available - let promise: EventLoopPromise - - /// Future that will be succeeded when request timeout handler and `TaskHandler` are added to the pipeline. - let setupComplete: EventLoopFuture - - /// The event loop preference associated to this particular request - /// that the provider should respect - let preference: HTTPClient.EventLoopPreference - } -} - -extension CircularBuffer { - mutating func swap(at index: Index, with value: Element) -> Element { - let tmp = self[index] - self[index] = value - return tmp - } -} diff --git a/Sources/AsyncHTTPClient/ConnectionsState.swift b/Sources/AsyncHTTPClient/ConnectionsState.swift deleted file mode 100644 index 91b7cba28..000000000 --- a/Sources/AsyncHTTPClient/ConnectionsState.swift +++ /dev/null @@ -1,325 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2019-2020 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import NIOCore - -extension HTTP1ConnectionProvider { - enum Action { - case lease(ConnectionType, Waiter) - case create(Waiter) - case replace(ConnectionType, Waiter) - case closeProvider - case park(ConnectionType) - case none - case fail(Waiter, Error) - indirect case closeAnd(ConnectionType, Action) - indirect case parkAnd(ConnectionType, Action) - } - - struct ConnectionsState { - enum State { - case active - case closed - } - - struct Snapshot { - var state: State - var availableConnections: CircularBuffer - var leasedConnections: Set> - var waiters: CircularBuffer> - var openedConnectionsCount: Int - var pending: Int - } - - let maximumConcurrentConnections: Int - let eventLoop: EventLoop - - private var state: State = .active - - /// Opened connections that are available. - private var availableConnections: CircularBuffer = .init(initialCapacity: 8) - - /// Opened connections that are leased to the user. - private var leasedConnections: Set> = .init() - - /// Consumers that weren't able to get a new connection without exceeding - /// `maximumConcurrentConnections` get a `Future` - /// whose associated promise is stored in `Waiter`. The promise is completed - /// as soon as possible by the provider, in FIFO order. - private var waiters: CircularBuffer> = .init(initialCapacity: 8) - - /// Number of opened or opening connections, used to keep track of all connections and enforcing `maximumConcurrentConnections` limit. - private var openedConnectionsCount: Int = 0 - - /// Number of enqueued requests, used to track if it is safe to delete the provider. - private var pending: Int = 0 - - init(maximumConcurrentConnections: Int = 8, eventLoop: EventLoop) { - self.maximumConcurrentConnections = maximumConcurrentConnections - self.eventLoop = eventLoop - } - - func testsOnly_getInternalState() -> Snapshot { - return Snapshot(state: self.state, availableConnections: self.availableConnections, leasedConnections: self.leasedConnections, waiters: self.waiters, openedConnectionsCount: self.openedConnectionsCount, pending: self.pending) - } - - func assertInvariants() { - assert(self.waiters.isEmpty) - assert(self.availableConnections.isEmpty) - assert(self.leasedConnections.isEmpty) - assert(self.openedConnectionsCount == 0) - assert(self.pending == 0) - } - - mutating func enqueue() -> Bool { - switch self.state { - case .active: - self.pending += 1 - return true - case .closed: - return false - } - } - - private var hasCapacity: Bool { - return self.openedConnectionsCount < self.maximumConcurrentConnections - } - - private var isEmpty: Bool { - return self.openedConnectionsCount == 0 && self.pending == 0 - } - - mutating func acquire(waiter: Waiter) -> Action { - switch self.state { - case .active: - self.pending -= 1 - - let (eventLoop, required) = self.resolvePreference(waiter.preference) - if required { - // If there is an opened connection on the same EL - use it - if let found = self.availableConnections.firstIndex(where: { $0.eventLoop === eventLoop }) { - let connection = self.availableConnections.remove(at: found) - self.leasedConnections.insert(ConnectionKey(connection)) - return .lease(connection, waiter) - } - - // If we can create additional connection, create - if self.hasCapacity { - self.openedConnectionsCount += 1 - return .create(waiter) - } - - // If we cannot create additional connection, but there is one in the pool, replace it - if let connection = self.availableConnections.popFirst() { - return .replace(connection, waiter) - } - - self.waiters.append(waiter) - return .none - } else if let connection = self.availableConnections.popFirst() { - self.leasedConnections.insert(ConnectionKey(connection)) - return .lease(connection, waiter) - } else if self.hasCapacity { - self.openedConnectionsCount += 1 - return .create(waiter) - } else { - self.waiters.append(waiter) - return .none - } - case .closed: - return .fail(waiter, HTTPClientError.alreadyShutdown) - } - } - - mutating func release(connection: ConnectionType, closing: Bool) -> Action { - switch self.state { - case .active: - assert(self.leasedConnections.contains(ConnectionKey(connection))) - - if connection.isActiveEstimation, !closing { // If connection is alive, we can offer it to a next waiter - if let waiter = self.waiters.popFirst() { - // There should be no case where we have both capacity and a waiter here. - // Waiter can only exists if there was no capacity at aquire. If some connection - // is released when we have waiter it can only indicate that we should lease (if EL are the same), - // or replace (if they are different). But we cannot increase connection count here. - assert(!self.hasCapacity) - - let (eventLoop, required) = self.resolvePreference(waiter.preference) - - // If returned connection is on same EL or we do not require special EL - lease it - if connection.eventLoop === eventLoop || !required { - return .lease(connection, waiter) - } - - // If we cannot create new connections, we will have to replace returned connection with a new one on the required loop - // We will keep the `openedConnectionCount`, since .replace === .create, so we decrease and increase the `openedConnectionCount` - self.leasedConnections.remove(ConnectionKey(connection)) - return .replace(connection, waiter) - } else { // or park, if there are no waiters - self.leasedConnections.remove(ConnectionKey(connection)) - self.availableConnections.append(connection) - return .park(connection) - } - } else { // if connection is not alive, we delete it and process the next waiter - // this connections is now gone, we will either create new connection or do nothing - self.openedConnectionsCount -= 1 - self.leasedConnections.remove(ConnectionKey(connection)) - - return self.processNextWaiter() - } - case .closed: - self.openedConnectionsCount -= 1 - self.leasedConnections.remove(ConnectionKey(connection)) - - return self.processNextWaiter() - } - } - - mutating func offer(connection: ConnectionType) -> Action { - switch self.state { - case .active: - self.leasedConnections.insert(ConnectionKey(connection)) - return .none - case .closed: // This can happen when we close the client while connections was being established - self.openedConnectionsCount -= 1 - return .closeAnd(connection, self.processNextWaiter()) - } - } - - mutating func connectFailed() -> Action { - switch self.state { - case .active: - self.openedConnectionsCount -= 1 - return self.processNextWaiter() - case .closed: - // This can happen in the following scenario: user initiates a connection that will fail to connect, - // user calls `syncShutdown` before we received an error from the bootstrap. In this scenario, - // pool will be `.closed` but connection will be still in the process of being established/failed, - // so then this process finishes, it will get to this point. - // We need to call `processNextWaiter` to finish deleting provider from the pool. - self.openedConnectionsCount -= 1 - return self.processNextWaiter() - } - } - - mutating func remoteClosed(connection: ConnectionType) -> Action { - switch self.state { - case .active: - // Connection can be closed remotely while we wait for `.lease` action to complete. - // If this happens when connections is leased, we do not remove it from leased connections, - // it will be done when a new replacement will be ready for it. - if self.leasedConnections.contains(ConnectionKey(connection)) { - return .none - } - - // If this connection is not in use, the have to release it as well - self.openedConnectionsCount -= 1 - self.availableConnections.removeAll { $0 === connection } - - return self.processNextWaiter() - case .closed: - self.openedConnectionsCount -= 1 - return self.processNextWaiter() - } - } - - mutating func timeout(connection: ConnectionType) -> Action { - switch self.state { - case .active: - // We can get timeout and inUse = true when we decided to lease the connection, but this action is not executed yet. - // In this case we can ignore timeout notification. - if self.leasedConnections.contains(ConnectionKey(connection)) { - return .none - } - - // If connection was not in use, we release it from the pool, increasing available capacity - self.openedConnectionsCount -= 1 - self.availableConnections.removeAll { $0 === connection } - - return .closeAnd(connection, self.processNextWaiter()) - case .closed: - // This situation can happen when we call close, state changes, but before we call `close` on all - // available connections, in this case we should close this connection and, potentially, - // delete the provider - self.openedConnectionsCount -= 1 - self.availableConnections.removeAll { $0 === connection } - - return .closeAnd(connection, self.processNextWaiter()) - } - } - - mutating func processNextWaiter() -> Action { - if let waiter = self.waiters.popFirst() { - // There should be no case where we have waiters and available connections at the same time. - // - // This method is called in following cases: - // - // 1. from `release` when connection is inactive and cannot be re-used - // 2. from `connectFailed` when we failed to establish a new connection - // 3. from `remoteClose` when connection was closed by the remote side and cannot be re-used - // 4. from `timeout` when connection was closed due to idle timeout and cannot be re-used. - // - // In all cases connection, which triggered the transition, will not be in `available` state. - // - // Given that the waiter can only be present in the pool if there were no available connections - // (otherwise it had been leased a connection immediately on getting the connection), we do not - // see a situation when we can lease another available connection, therefore the only course - // of action is to create a new connection for the waiter. - assert(self.availableConnections.isEmpty) - - self.openedConnectionsCount += 1 - return .create(waiter) - } - - // if capacity is at max and the are no waiters and no in-flight requests for connection, we are closing this provider - if self.isEmpty { - // deactivate and remove - self.state = .closed - return .closeProvider - } - - return .none - } - - mutating func close() -> (CircularBuffer>, CircularBuffer, Set>, Bool)? { - switch self.state { - case .active: - let waiters = self.waiters - self.waiters.removeAll() - - let available = self.availableConnections - self.availableConnections.removeAll() - - let leased = self.leasedConnections - - self.state = .closed - - return (waiters, available, leased, self.openedConnectionsCount - available.count == 0) - case .closed: - return nil - } - } - - private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) { - switch preference.preference { - case .indifferent: - return (self.eventLoop, false) - case .delegate(let el): - return (el, false) - case .delegateAndChannel(let el), .testOnly_exact(let el, _): - return (el, true) - } - } - } -} diff --git a/Sources/AsyncHTTPClient/HTTPClient+Proxy.swift b/Sources/AsyncHTTPClient/HTTPClient+Proxy.swift index 90a3bf86a..4d2b9388f 100644 --- a/Sources/AsyncHTTPClient/HTTPClient+Proxy.swift +++ b/Sources/AsyncHTTPClient/HTTPClient+Proxy.swift @@ -12,9 +12,6 @@ // //===----------------------------------------------------------------------===// -import NIOCore -import NIOHTTP1 - extension HTTPClient.Configuration { /// Proxy server configuration /// Specifies the remote address of an HTTP proxy. diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 2c31392ac..3e45d4dfb 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -19,7 +19,6 @@ import NIOCore import NIOHTTP1 import NIOHTTPCompression import NIOPosix -import NIOSOCKS import NIOSSL import NIOTLS import NIOTransportServices @@ -578,28 +577,6 @@ public class HTTPClient { return task } - private func resolve(timeout: TimeAmount?, deadline: NIODeadline?) -> TimeAmount? { - switch (timeout, deadline) { - case (.some(let timeout), .some(let deadline)): - return min(timeout, deadline - .now()) - case (.some(let timeout), .none): - return timeout - case (.none, .some(let deadline)): - return deadline - .now() - case (.none, .none): - return nil - } - } - - static func resolveAddress(host: String, port: Int, proxy: Configuration.Proxy?) -> (host: String, port: Int) { - switch proxy { - case .none: - return (host, port) - case .some(let proxy): - return (proxy.host, proxy.port) - } - } - /// `HTTPClient` configuration. public struct Configuration { /// TLS configuration, defaults to `TLSConfiguration.makeClientConfiguration()`. @@ -760,19 +737,6 @@ public class HTTPClient { public static func delegateAndChannel(on eventLoop: EventLoop) -> EventLoopPreference { return EventLoopPreference(.delegateAndChannel(on: eventLoop)) } - - var bestEventLoop: EventLoop? { - switch self.preference { - case .delegate(on: let el): - return el - case .delegateAndChannel(on: let el): - return el - case .testOnly_exact(channelOn: let el, delegateOn: _): - return el - case .indifferent: - return nil - } - } } /// Specifies decompression settings. diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 4877d10e9..31f5e9337 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -16,9 +16,7 @@ import Foundation import Logging import NIOConcurrencyHelpers import NIOCore -import NIOFoundationCompat import NIOHTTP1 -import NIOHTTPCompression import NIOSSL extension HTTPClient { @@ -728,452 +726,6 @@ extension HTTPClient { internal struct TaskCancelEvent {} -// MARK: - TaskHandler - -internal class TaskHandler: RemovableChannelHandler { - enum State { - case idle - case sendingBodyWaitingResponseHead - case sendingBodyResponseHeadReceived - case bodySentWaitingResponseHead - case bodySentResponseHeadReceived - case redirected(HTTPResponseHead, URL) - case bufferedEnd - case endOrError - } - - let task: HTTPClient.Task - let delegate: Delegate - let redirectHandler: RedirectHandler? - let ignoreUncleanSSLShutdown: Bool - let logger: Logger // We are okay to store the logger here because a TaskHandler is just for one request. - - var state: State = .idle - var responseReadBuffer: ResponseReadBuffer = ResponseReadBuffer() - var expectedBodyLength: Int? - var actualBodyLength: Int = 0 - var pendingRead = false - var outstandingDelegateRead = false - var closing = false { - didSet { - assert(self.closing || !oldValue, - "BUG in AsyncHTTPClient: TaskHandler.closing went from true (no conn reuse) to true (do reuse).") - } - } - - let kind: HTTPClient.Request.Kind - - init(task: HTTPClient.Task, - kind: HTTPClient.Request.Kind, - delegate: Delegate, - redirectHandler: RedirectHandler?, - ignoreUncleanSSLShutdown: Bool, - logger: Logger) { - self.task = task - self.delegate = delegate - self.redirectHandler = redirectHandler - self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown - self.kind = kind - self.logger = logger - } -} - -// MARK: Delegate Callouts - -extension TaskHandler { - func failTaskAndNotifyDelegate(error: Err, - _ body: @escaping (HTTPClient.Task, Err) -> Void) { - func doIt() { - body(self.task, error) - self.task.fail(with: error, delegateType: Delegate.self) - } - - if self.task.eventLoop.inEventLoop { - doIt() - } else { - self.task.eventLoop.execute { - doIt() - } - } - } - - func callOutToDelegateFireAndForget(_ body: @escaping (HTTPClient.Task) -> Void) { - self.callOutToDelegateFireAndForget(value: ()) { (task, _: ()) in body(task) } - } - - func callOutToDelegateFireAndForget(value: Value, - _ body: @escaping (HTTPClient.Task, Value) -> Void) { - if self.task.eventLoop.inEventLoop { - body(self.task, value) - } else { - self.task.eventLoop.execute { - body(self.task, value) - } - } - } - - func callOutToDelegate(value: Value, - channelEventLoop: EventLoop, - _ body: @escaping (HTTPClient.Task, Value) -> EventLoopFuture) -> EventLoopFuture { - if self.task.eventLoop.inEventLoop { - return body(self.task, value).hop(to: channelEventLoop) - } else { - return self.task.eventLoop.submit { - body(self.task, value) - }.flatMap { $0 }.hop(to: channelEventLoop) - } - } - - func callOutToDelegate(promise: EventLoopPromise? = nil, - _ body: @escaping (HTTPClient.Task) throws -> Response) where Response == Delegate.Response { - func doIt() { - do { - let result = try body(self.task) - - self.task.succeed(promise: promise, - with: result, - delegateType: Delegate.self, - closing: self.closing) - } catch { - self.task.fail(with: error, delegateType: Delegate.self) - } - } - - if self.task.eventLoop.inEventLoop { - doIt() - } else { - self.task.eventLoop.submit { - doIt() - }.cascadeFailure(to: promise) - } - } - - func callOutToDelegate(channelEventLoop: EventLoop, - _ body: @escaping (HTTPClient.Task) throws -> Response) -> EventLoopFuture where Response == Delegate.Response { - let promise = channelEventLoop.makePromise(of: Response.self) - self.callOutToDelegate(promise: promise, body) - return promise.futureResult - } -} - -// MARK: ChannelHandler implementation - -extension TaskHandler: ChannelDuplexHandler { - typealias OutboundIn = HTTPClient.Request - typealias InboundIn = HTTPClientResponsePart - typealias OutboundOut = HTTPClientRequestPart - - func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - let request = self.unwrapOutboundIn(data) - self.state = .sendingBodyWaitingResponseHead - - let head: HTTPRequestHead - let metadata: RequestFramingMetadata - - do { - (head, metadata) = try request.createRequestHead() - } catch { - self.errorCaught(context: context, error: error) - promise?.fail(error) - return - } - - // This assert can go away when (if ever!) the above `if` correctly handles other HTTP versions. For example - // in HTTP/1.0, we need to treat the absence of a 'connection: keep-alive' as a close too. - assert(head.version == .http1_1, - "Sending a request in HTTP version \(head.version) which is unsupported by the above `if`") - - if case .fixedSize(let length) = metadata.body { - self.expectedBodyLength = length - } - self.closing = metadata.connectionClose - - context.write(wrapOutboundOut(.head(head))).map { - self.callOutToDelegateFireAndForget(value: head, self.delegate.didSendRequestHead) - }.flatMap { - self.writeBody(request: request, context: context) - }.flatMap { - context.eventLoop.assertInEventLoop() - switch self.state { - case .idle: - // since this code path is called from `write` and write sets state to sendingBody - preconditionFailure("should not happen") - case .sendingBodyWaitingResponseHead: - self.state = .bodySentWaitingResponseHead - case .sendingBodyResponseHeadReceived: - self.state = .bodySentResponseHeadReceived - case .bodySentWaitingResponseHead, .bodySentResponseHeadReceived: - preconditionFailure("should not happen, state is \(self.state)") - case .redirected: - break - case .bufferedEnd, .endOrError: - // If the state is .endOrError, it means that request was failed and there is nothing to do here: - // we cannot write .end since channel is most likely closed, and we should not fail the future, - // since the task would already be failed, no need to fail the writer too. - // If the state is .bufferedEnd the issue is the same, we just haven't fully delivered the response to - // the user yet. - return context.eventLoop.makeSucceededFuture(()) - } - - if let expectedBodyLength = self.expectedBodyLength, expectedBodyLength != self.actualBodyLength { - let error = HTTPClientError.bodyLengthMismatch - return context.eventLoop.makeFailedFuture(error) - } - return context.writeAndFlush(self.wrapOutboundOut(.end(nil))) - }.map { - context.eventLoop.assertInEventLoop() - - if case .endOrError = self.state { - return - } - - self.callOutToDelegateFireAndForget(self.delegate.didSendRequest) - }.flatMapErrorThrowing { error in - context.eventLoop.assertInEventLoop() - self.errorCaught(context: context, error: error) - throw error - }.cascade(to: promise) - } - - private func writeBody(request: HTTPClient.Request, context: ChannelHandlerContext) -> EventLoopFuture { - guard let body = request.body else { - return context.eventLoop.makeSucceededFuture(()) - } - - let channel = context.channel - - func doIt() -> EventLoopFuture { - return body.stream(HTTPClient.Body.StreamWriter { part in - let promise = self.task.eventLoop.makePromise(of: Void.self) - // All writes have to be switched to the channel EL if channel and task ELs differ - if channel.eventLoop.inEventLoop { - self.writeBodyPart(context: context, part: part, promise: promise) - } else { - channel.eventLoop.execute { - self.writeBodyPart(context: context, part: part, promise: promise) - } - } - - return promise.futureResult.map { - self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart) - } - }) - } - - // Callout to the user to start body streaming should be on task EL - if self.task.eventLoop.inEventLoop { - return doIt() - } else { - return self.task.eventLoop.flatSubmit { - doIt() - } - } - } - - private func writeBodyPart(context: ChannelHandlerContext, part: IOData, promise: EventLoopPromise) { - switch self.state { - case .idle: - // this function is called on the codepath starting with write, so it cannot be in state .idle - preconditionFailure("should not happen") - case .sendingBodyWaitingResponseHead, .sendingBodyResponseHeadReceived, .redirected: - if let limit = self.expectedBodyLength, self.actualBodyLength + part.readableBytes > limit { - let error = HTTPClientError.bodyLengthMismatch - self.errorCaught(context: context, error: error) - promise.fail(error) - return - } - self.actualBodyLength += part.readableBytes - context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise) - case .bodySentWaitingResponseHead, .bodySentResponseHeadReceived, .bufferedEnd, .endOrError: - let error = HTTPClientError.writeAfterRequestSent - self.errorCaught(context: context, error: error) - promise.fail(error) - } - } - - public func read(context: ChannelHandlerContext) { - if self.outstandingDelegateRead { - self.pendingRead = true - } else { - self.pendingRead = false - context.read() - } - } - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - let response = self.unwrapInboundIn(data) - switch response { - case .head(let head): - switch self.state { - case .idle: - // should be prevented by NIO HTTP1 pipeline, see testHTTPResponseHeadBeforeRequestHead - preconditionFailure("should not happen") - case .sendingBodyWaitingResponseHead: - self.state = .sendingBodyResponseHeadReceived - case .bodySentWaitingResponseHead: - self.state = .bodySentResponseHeadReceived - case .sendingBodyResponseHeadReceived, .bodySentResponseHeadReceived, .redirected: - // should be prevented by NIO HTTP1 pipeline, aee testHTTPResponseDoubleHead - preconditionFailure("should not happen") - case .bufferedEnd, .endOrError: - return - } - - if !head.isKeepAlive { - self.closing = true - } - - if let redirectURL = self.redirectHandler?.redirectTarget(status: head.status, headers: head.headers) { - self.state = .redirected(head, redirectURL) - } else { - self.handleReadForDelegate(response, context: context) - } - case .body: - switch self.state { - case .redirected, .bufferedEnd, .endOrError: - break - default: - self.handleReadForDelegate(response, context: context) - } - case .end: - switch self.state { - case .bufferedEnd, .endOrError: - break - case .redirected(let head, let redirectURL): - self.state = .endOrError - self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise) - default: - self.state = .bufferedEnd - self.handleReadForDelegate(response, context: context) - } - } - } - - private func handleReadForDelegate(_ read: HTTPClientResponsePart, context: ChannelHandlerContext) { - if self.outstandingDelegateRead { - self.responseReadBuffer.appendPart(read) - return - } - - // No outstanding delegate read, so we can deliver this directly. - self.deliverReadToDelegate(read, context: context) - } - - private func deliverReadToDelegate(_ read: HTTPClientResponsePart, context: ChannelHandlerContext) { - precondition(!self.outstandingDelegateRead) - self.outstandingDelegateRead = true - - if case .endOrError = self.state { - // No further read delivery should occur, we already delivered an error. - return - } - - switch read { - case .head(let head): - self.callOutToDelegate(value: head, channelEventLoop: context.eventLoop, self.delegate.didReceiveHead) - .whenComplete { result in - self.handleBackpressureResult(context: context, result: result) - } - case .body(let body): - self.callOutToDelegate(value: body, channelEventLoop: context.eventLoop, self.delegate.didReceiveBodyPart) - .whenComplete { result in - self.handleBackpressureResult(context: context, result: result) - } - case .end: - self.state = .endOrError - self.outstandingDelegateRead = false - - if self.pendingRead { - // We must read here, as we will be removed from the channel shortly. - self.pendingRead = false - context.read() - } - - self.callOutToDelegate(promise: self.task.promise, self.delegate.didFinishRequest) - } - } - - private func handleBackpressureResult(context: ChannelHandlerContext, result: Result) { - context.eventLoop.assertInEventLoop() - self.outstandingDelegateRead = false - - switch result { - case .success: - if let nextRead = self.responseReadBuffer.nextRead() { - // We can deliver this directly. - self.deliverReadToDelegate(nextRead, context: context) - } else if self.pendingRead { - self.pendingRead = false - context.read() - } - case .failure(let error): - self.errorCaught(context: context, error: error) - } - } - - func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { - if (event as? IdleStateHandler.IdleStateEvent) == .read { - self.errorCaught(context: context, error: HTTPClientError.readTimeout) - } else { - context.fireUserInboundEventTriggered(event) - } - } - - func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise?) { - if (event as? TaskCancelEvent) != nil { - self.errorCaught(context: context, error: HTTPClientError.cancelled) - promise?.succeed(()) - } else { - context.triggerUserOutboundEvent(event, promise: promise) - } - } - - func channelInactive(context: ChannelHandlerContext) { - switch self.state { - case .idle, .sendingBodyWaitingResponseHead, .sendingBodyResponseHeadReceived, .bodySentWaitingResponseHead, .bodySentResponseHeadReceived, .redirected: - self.errorCaught(context: context, error: HTTPClientError.remoteConnectionClosed) - case .bufferedEnd, .endOrError: - break - } - context.fireChannelInactive() - } - - func errorCaught(context: ChannelHandlerContext, error: Error) { - switch error { - case NIOSSLError.uncleanShutdown: - switch self.state { - case .bufferedEnd, .endOrError: - /// Some HTTP Servers can 'forget' to respond with CloseNotify when client is closing connection, - /// this could lead to incomplete SSL shutdown. But since request is already processed, we can ignore this error. - break - case .sendingBodyResponseHeadReceived where self.ignoreUncleanSSLShutdown, - .bodySentResponseHeadReceived where self.ignoreUncleanSSLShutdown: - /// We can also ignore this error like `.end`. - break - default: - self.state = .endOrError - self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError) - } - default: - switch self.state { - case .idle, .sendingBodyWaitingResponseHead, .sendingBodyResponseHeadReceived, .bodySentWaitingResponseHead, .bodySentResponseHeadReceived, .redirected, .bufferedEnd: - self.state = .endOrError - self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError) - case .endOrError: - // error was already handled - break - } - } - } - - func handlerAdded(context: ChannelHandlerContext) { - guard context.channel.isActive else { - self.errorCaught(context: context, error: HTTPClientError.remoteConnectionClosed) - return - } - } -} - // MARK: - RedirectHandler internal struct RedirectHandler { diff --git a/Sources/AsyncHTTPClient/ResponseReadBuffer.swift b/Sources/AsyncHTTPClient/ResponseReadBuffer.swift deleted file mode 100644 index 62b8fc126..000000000 --- a/Sources/AsyncHTTPClient/ResponseReadBuffer.swift +++ /dev/null @@ -1,32 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import NIOCore -import NIOHTTP1 - -struct ResponseReadBuffer { - private var responseParts: CircularBuffer - - init() { - self.responseParts = CircularBuffer(initialCapacity: 16) - } - - mutating func appendPart(_ part: HTTPClientResponsePart) { - self.responseParts.append(part) - } - - mutating func nextRead() -> HTTPClientResponsePart? { - return self.responseParts.popFirst() - } -} diff --git a/Sources/AsyncHTTPClient/StringConvertibleInstances.swift b/Sources/AsyncHTTPClient/StringConvertibleInstances.swift index 9cbb0afad..f75fb0d87 100644 --- a/Sources/AsyncHTTPClient/StringConvertibleInstances.swift +++ b/Sources/AsyncHTTPClient/StringConvertibleInstances.swift @@ -2,7 +2,7 @@ // // This source file is part of the AsyncHTTPClient open source project // -// Copyright (c) 2020 Apple Inc. and the AsyncHTTPClient project authors +// Copyright (c) 2020-2021 Apple Inc. and the AsyncHTTPClient project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,12 +12,6 @@ // //===----------------------------------------------------------------------===// -extension HTTP1ConnectionProvider.Waiter: CustomStringConvertible { - var description: String { - return "HTTP1ConnectionProvider.Waiter(\(self.preference))" - } -} - extension HTTPClient.EventLoopPreference: CustomStringConvertible { public var description: String { return "\(self.preference)" diff --git a/Sources/AsyncHTTPClient/Utils.swift b/Sources/AsyncHTTPClient/Utils.swift index d682ffaf7..240d39a01 100644 --- a/Sources/AsyncHTTPClient/Utils.swift +++ b/Sources/AsyncHTTPClient/Utils.swift @@ -2,7 +2,7 @@ // // This source file is part of the AsyncHTTPClient open source project // -// Copyright (c) 2018-2020 Apple Inc. and the AsyncHTTPClient project authors +// Copyright (c) 2018-2021 Apple Inc. and the AsyncHTTPClient project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -13,15 +13,7 @@ //===----------------------------------------------------------------------===// import Foundation -#if canImport(Network) - import Network -#endif -import Logging import NIOCore -import NIOHTTP1 -import NIOHTTPCompression -import NIOSSL -import NIOTransportServices public final class HTTPClientCopyingDelegate: HTTPClientResponseDelegate { public typealias Response = Void @@ -40,11 +32,3 @@ public final class HTTPClientCopyingDelegate: HTTPClientResponseDelegate { return () } } - -extension Connection { - func removeHandler(_ type: Handler.Type) -> EventLoopFuture { - return self.channel.pipeline.handler(type: type).flatMap { handler in - self.channel.pipeline.removeHandler(handler) - }.recover { _ in } - } -} diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift deleted file mode 100644 index 25ef44ff7..000000000 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift +++ /dev/null @@ -1,58 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// -// -// ConnectionPoolTests+XCTest.swift -// -import XCTest - -/// -/// NOTE: This file was generated by generate_linux_tests.rb -/// -/// Do NOT edit this file directly as it will be regenerated automatically when needed. -/// - -extension ConnectionPoolTests { - static var allTests: [(String, (ConnectionPoolTests) -> () throws -> Void)] { - return [ - ("testPending", testPending), - ("testAcquireWhenEmpty", testAcquireWhenEmpty), - ("testAcquireWhenAvailable", testAcquireWhenAvailable), - ("testAcquireWhenUnavailable", testAcquireWhenUnavailable), - ("testAcquireWhenEmptySpecificEL", testAcquireWhenEmptySpecificEL), - ("testAcquireWhenAvailableSpecificEL", testAcquireWhenAvailableSpecificEL), - ("testAcquireReplace", testAcquireReplace), - ("testAcquireWhenUnavailableSpecificEL", testAcquireWhenUnavailableSpecificEL), - ("testAcquireWhenClosed", testAcquireWhenClosed), - ("testConnectFailedWhenClosed", testConnectFailedWhenClosed), - ("testReleaseAliveConnectionEmptyQueue", testReleaseAliveConnectionEmptyQueue), - ("testReleaseAliveButClosingConnectionEmptyQueue", testReleaseAliveButClosingConnectionEmptyQueue), - ("testReleaseInactiveConnectionEmptyQueue", testReleaseInactiveConnectionEmptyQueue), - ("testReleaseInactiveConnectionEmptyQueueHasConnections", testReleaseInactiveConnectionEmptyQueueHasConnections), - ("testReleaseAliveConnectionHasWaiter", testReleaseAliveConnectionHasWaiter), - ("testReleaseInactiveConnectionHasWaitersNoConnections", testReleaseInactiveConnectionHasWaitersNoConnections), - ("testReleaseAliveConnectionSameELHasWaiterSpecificEL", testReleaseAliveConnectionSameELHasWaiterSpecificEL), - ("testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL), - ("testNextWaiterEmptyQueue", testNextWaiterEmptyQueue), - ("testNextWaiterEmptyQueueHasConnections", testNextWaiterEmptyQueueHasConnections), - ("testTimeoutLeasedConnection", testTimeoutLeasedConnection), - ("testTimeoutAvailableConnection", testTimeoutAvailableConnection), - ("testRemoteClosedLeasedConnection", testRemoteClosedLeasedConnection), - ("testRemoteClosedAvailableConnection", testRemoteClosedAvailableConnection), - ("testShutdownOnPendingAndSuccess", testShutdownOnPendingAndSuccess), - ("testShutdownOnPendingAndError", testShutdownOnPendingAndError), - ("testShutdownTimeout", testShutdownTimeout), - ("testShutdownRemoteClosed", testShutdownRemoteClosed), - ] - } -} diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift deleted file mode 100644 index bd96941d4..000000000 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift +++ /dev/null @@ -1,620 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -@testable import AsyncHTTPClient -import NIOConcurrencyHelpers -import NIOCore -import NIOEmbedded -import NIOFoundationCompat -import NIOHTTP1 -import NIOHTTPCompression -import NIOSSL -import NIOTestUtils -import NIOTransportServices -import XCTest - -class ConnectionPoolTests: XCTestCase { - var eventLoop: EmbeddedEventLoop! - - func testPending() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - XCTAssertTrue(state.enqueue()) - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 1, opened: 0) - } - - // MARK: - Acquire Tests - - func testAcquireWhenEmpty() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - - XCTAssertTrue(state.enqueue()) - let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - switch action { - case .create(let waiter): - waiter.promise.fail(TempError()) - default: - XCTFail("Unexpected action: \(action)") - } - - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 1) - } - - func testAcquireWhenAvailable() throws { - var (state, _) = self.buildState(count: 1) - - // Validate that the pool has one available connection and it's internal state is correct - XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - - XCTAssertTrue(state.enqueue()) - - let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - switch action { - case .lease(let connection, let waiter): - XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - - // cleanup - waiter.promise.succeed(connection) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - try XCTAssertStateClose(state, available: 0, leased: 1, waiters: 0, clean: false) - } - - func testAcquireWhenUnavailable() throws { - var (state, _) = self.buildState(count: 8, release: false) - XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - - XCTAssertTrue(state.enqueue()) - let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - switch action { - case .none: - XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 1, clean: false) - } - - // MARK: - Acquire on Specific EL Tests - - func testAcquireWhenEmptySpecificEL() { - let el: EventLoop = self.eventLoop - let preference: HTTPClient.EventLoopPreference = .delegateAndChannel(on: el) - - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: el) - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - - XCTAssertTrue(state.enqueue()) - let action = state.acquire(waiter: .init(promise: el.makePromise(), setupComplete: el.makeSucceededFuture(()), preference: preference)) - switch action { - case .create(let waiter): - waiter.promise.fail(TempError()) - default: - XCTFail("Unexpected action: \(action)") - } - - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 1) - } - - func testAcquireWhenAvailableSpecificEL() throws { - let el: EventLoop = self.eventLoop - let preference: HTTPClient.EventLoopPreference = .delegateAndChannel(on: el) - var (state, _) = self.buildState(count: 1, eventLoop: el) - XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - - XCTAssertTrue(state.enqueue()) - let action = state.acquire(waiter: .init(promise: el.makePromise(), setupComplete: el.makeSucceededFuture(()), preference: preference)) - switch action { - case .lease(let connection, let waiter): - waiter.promise.succeed(connection) - XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - try XCTAssertStateClose(state, available: 0, leased: 1, waiters: 0, clean: false) - } - - func testAcquireReplace() throws { - let el: EventLoop = self.eventLoop - var (state, connections) = self.buildState(count: 8, release: false, eventLoop: el) - - // release a connection - _ = state.release(connection: connections.first!, closing: false) - XCTAssertState(state, available: 1, leased: 7, waiters: 0, pending: 0, opened: 8) - - // other eventLoop - let preference: HTTPClient.EventLoopPreference = .delegateAndChannel(on: EmbeddedEventLoop()) - - XCTAssertTrue(state.enqueue()) - let action = state.acquire(waiter: .init(promise: el.makePromise(), setupComplete: el.makeSucceededFuture(()), preference: preference)) - switch action { - case .replace(let connection, let waiter): - waiter.promise.fail(TempError()) - XCTAssertState(state, available: 0, leased: 7, waiters: 0, pending: 0, opened: 8, isNotLeased: connection) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - try XCTAssertStateClose(state, available: 0, leased: 7, waiters: 0, clean: false) - } - - func testAcquireWhenUnavailableSpecificEL() throws { - var (state, _) = self.buildState(count: 8, release: false, eventLoop: self.eventLoop) - XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - - XCTAssertTrue(state.enqueue()) - let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) - switch action { - case .none: - XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 1, clean: false) - } - - // MARK: - Acquire Errors Tests - - func testAcquireWhenClosed() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - _ = state.close() - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - - XCTAssertFalse(state.enqueue()) - - let promise = self.eventLoop.makePromise(of: Connection.self) - let action = state.acquire(waiter: .init(promise: promise, setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - switch action { - case .fail(let waiter, let error): - waiter.promise.fail(error) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testConnectFailedWhenClosed() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - _ = state.close() - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - - let action = state.connectFailed() - switch action { - case .none: - break - default: - XCTFail("Unexpected action: \(action)") - } - } - - // MARK: - Release Tests - - func testReleaseAliveConnectionEmptyQueue() throws { - var (state, connections) = self.buildState(count: 1, release: false) - XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - - let connection = try XCTUnwrap(connections.first) - let action = state.release(connection: connection, closing: false) - switch action { - case .park: - XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - try XCTAssertStateClose(state, available: 1, leased: 0, waiters: 0, clean: true) - } - - func testReleaseAliveButClosingConnectionEmptyQueue() throws { - var (state, connections) = self.buildState(count: 1, release: false) - XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - - let connection = try XCTUnwrap(connections.first) - // closing should be true to test that we can discard connection that is still active, but caller indicated that it will be closed soon - let action = state.release(connection: connection, closing: true) - switch action { - case .closeProvider: - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - XCTAssertNil(state.close()) - } - - func testReleaseInactiveConnectionEmptyQueue() throws { - var (state, connections) = self.buildState(count: 1, release: false) - XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - - let connection = try XCTUnwrap(connections.first) - connection.isActiveEstimation = false - // closing should be false to test that we check connection state in order to decided if we need to discard the connection - let action = state.release(connection: connection, closing: false) - switch action { - case .closeProvider: - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - XCTAssertNil(state.close()) - } - - func testReleaseInactiveConnectionEmptyQueueHasConnections() throws { - var (state, connections) = self.buildState(count: 2, release: false) - XCTAssertState(state, available: 0, leased: 2, waiters: 0, pending: 0, opened: 2) - - let connection = try XCTUnwrap(connections.first) - - // Return a connection to the pool - _ = state.release(connection: try XCTUnwrap(connections.dropFirst().first), closing: false) - XCTAssertState(state, available: 1, leased: 1, waiters: 0, pending: 0, opened: 2) - - let action = state.release(connection: connection, closing: true) - switch action { - case .none: - XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - try XCTAssertStateClose(state, available: 1, leased: 0, waiters: 0, clean: true) - } - - func testReleaseAliveConnectionHasWaiter() throws { - var (state, connections) = self.buildState(count: 8, release: false) - - // Add one waiter to the pool - XCTAssertTrue(state.enqueue()) - _ = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - - XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - - let connection = try XCTUnwrap(connections.first) - let action = state.release(connection: connection, closing: false) - switch action { - case .lease(let connection, let waiter): - XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8, isLeased: connection) - // cleanup - waiter.promise.succeed(connection) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 0, clean: false) - } - - func testReleaseInactiveConnectionHasWaitersNoConnections() throws { - var (state, connections) = self.buildState(count: 8, release: false) - XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - - // Add one waiter to the pool - XCTAssertTrue(state.enqueue()) - _ = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - - let connection = try XCTUnwrap(connections.first) - let action = state.release(connection: connection, closing: true) - switch action { - case .create(let waiter): - XCTAssertState(state, available: 0, leased: 7, waiters: 0, pending: 0, opened: 8) - // cleanup - waiter.promise.fail(TempError()) - default: - XCTFail("Unexpected action: \(action)") - } - - // cleanup - try XCTAssertStateClose(state, available: 0, leased: 7, waiters: 0, clean: false) - } - - // MARK: - Release on Specific EL Tests - - func testReleaseAliveConnectionSameELHasWaiterSpecificEL() throws { - var (state, connections) = self.buildState(count: 8, release: false) - XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - - // Add one waiter to the pool - XCTAssertTrue(state.enqueue()) - _ = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) - XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - - let connection = try XCTUnwrap(connections.first) - let action = state.release(connection: connection, closing: false) - switch action { - case .lease(let connection, let waiter): - XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8, isLeased: connection) - // cleanup - waiter.promise.succeed(connection) - default: - XCTFail("Unexpected action: \(action)") - } - - try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 0, clean: false) - } - - func testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL() throws { - var (state, connections) = self.buildState(count: 8, release: false) - XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - - let differentEL = EmbeddedEventLoop() - // Add one waiter to the pool - XCTAssertTrue(state.enqueue()) - _ = state.acquire(waiter: .init(promise: differentEL.makePromise(), setupComplete: differentEL.makeSucceededFuture(()), preference: .delegateAndChannel(on: differentEL))) - XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - - let connection = try XCTUnwrap(connections.first) - let action = state.release(connection: connection, closing: false) - switch action { - case .replace(let connection, let waiter): - XCTAssertState(state, available: 0, leased: 7, waiters: 0, pending: 0, opened: 8, isNotLeased: connection) - // cleanup - waiter.promise.fail(TempError()) - default: - XCTFail("Unexpected action: \(action)") - } - - try XCTAssertStateClose(state, available: 0, leased: 7, waiters: 0, clean: false) - } - - // MARK: - Next Waiter Tests - - func testNextWaiterEmptyQueue() throws { - var (state, _) = self.buildState(count: 0) - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - - let action = state.processNextWaiter() - switch action { - case .closeProvider: - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testNextWaiterEmptyQueueHasConnections() throws { - var (state, _) = self.buildState(count: 1, release: true) - XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - - let action = state.processNextWaiter() - switch action { - case .none: - XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - default: - XCTFail("Unexpected action: \(action)") - } - } - - // MARK: - Timeout and Remote Close Tests - - func testTimeoutLeasedConnection() throws { - var (state, connections) = self.buildState(count: 1, release: false) - XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - - let connection = try XCTUnwrap(connections.first) - let action = state.timeout(connection: connection) - switch action { - case .none: - XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testTimeoutAvailableConnection() throws { - var (state, connections) = self.buildState(count: 1) - XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - - let connection = try XCTUnwrap(connections.first) - let action = state.timeout(connection: connection) - switch action { - case .closeAnd(_, let after): - switch after { - case .closeProvider: - break - default: - XCTFail("Unexpected action: \(action)") - } - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testRemoteClosedLeasedConnection() throws { - var (state, connections) = self.buildState(count: 1, release: false) - - XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - - // This can happen when just leased connection is closed before TaskHandler is added to pipeline - let connection = try XCTUnwrap(connections.first) - let action = state.remoteClosed(connection: connection) - switch action { - case .none: - XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testRemoteClosedAvailableConnection() throws { - var (state, connections) = self.buildState(count: 1) - - XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - - let connection = try XCTUnwrap(connections.first) - let action = state.remoteClosed(connection: connection) - switch action { - case .closeProvider: - XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - default: - XCTFail("Unexpected action: \(action)") - } - } - - // MARK: - Shutdown tests - - func testShutdownOnPendingAndSuccess() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - - XCTAssertTrue(state.enqueue()) - - let connectionPromise = self.eventLoop.makePromise(of: ConnectionForTests.self) - let setupPromise = self.eventLoop.makePromise(of: Void.self) - let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) - var action = state.acquire(waiter: waiter) - - guard case .create = action else { - XCTFail("unexpected action \(action)") - return - } - - let snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(snapshot.openedConnectionsCount, 1) - - if let (waiters, available, leased, clean) = state.close() { - XCTAssertTrue(waiters.isEmpty) - XCTAssertTrue(available.isEmpty) - XCTAssertTrue(leased.isEmpty) - XCTAssertFalse(clean) - } else { - XCTFail("Expecting snapshot") - } - - let connection = ConnectionForTests(eventLoop: self.eventLoop) - - action = state.offer(connection: connection) - guard case .closeAnd(_, .closeProvider) = action else { - XCTFail("unexpected action \(action)") - return - } - - connectionPromise.fail(TempError()) - setupPromise.succeed(()) - } - - func testShutdownOnPendingAndError() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - - XCTAssertTrue(state.enqueue()) - - let connectionPromise = self.eventLoop.makePromise(of: ConnectionForTests.self) - let setupPromise = self.eventLoop.makePromise(of: Void.self) - let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) - var action = state.acquire(waiter: waiter) - - guard case .create = action else { - XCTFail("unexpected action \(action)") - return - } - - let snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(snapshot.openedConnectionsCount, 1) - - if let (waiters, available, leased, clean) = state.close() { - XCTAssertTrue(waiters.isEmpty) - XCTAssertTrue(available.isEmpty) - XCTAssertTrue(leased.isEmpty) - XCTAssertFalse(clean) - } else { - XCTFail("Expecting snapshot") - } - - action = state.connectFailed() - guard case .closeProvider = action else { - XCTFail("unexpected action \(action)") - return - } - - connectionPromise.fail(TempError()) - setupPromise.succeed(()) - } - - func testShutdownTimeout() throws { - var (state, connections) = self.buildState(count: 1) - - if let (waiters, available, leased, clean) = state.close() { - XCTAssertTrue(waiters.isEmpty) - XCTAssertFalse(available.isEmpty) - XCTAssertTrue(leased.isEmpty) - XCTAssertTrue(clean) - } else { - XCTFail("Expecting snapshot") - } - - let connection = try XCTUnwrap(connections.first) - let action = state.timeout(connection: connection) - switch action { - case .closeAnd(_, let next): - switch next { - case .closeProvider: - // expected - break - default: - XCTFail("Unexpected action: \(action)") - } - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testShutdownRemoteClosed() throws { - var (state, connections) = self.buildState(count: 1) - - if let (waiters, available, leased, clean) = state.close() { - XCTAssertTrue(waiters.isEmpty) - XCTAssertFalse(available.isEmpty) - XCTAssertTrue(leased.isEmpty) - XCTAssertTrue(clean) - } else { - XCTFail("Expecting snapshot") - } - - let connection = try XCTUnwrap(connections.first) - let action = state.remoteClosed(connection: connection) - switch action { - case .closeProvider: - // expected - break - default: - XCTFail("Unexpected action: \(action)") - } - } - - override func setUp() { - XCTAssertNil(self.eventLoop) - self.eventLoop = EmbeddedEventLoop() - } - - override func tearDown() { - XCTAssertNotNil(self.eventLoop) - XCTAssertNoThrow(try self.eventLoop.syncShutdownGracefully()) - self.eventLoop = nil - } -} diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift deleted file mode 100644 index 8340cf224..000000000 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift +++ /dev/null @@ -1,131 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -@testable import AsyncHTTPClient -import NIOCore -import XCTest - -class ConnectionForTests: PoolManageableConnection { - var eventLoop: EventLoop - var isActiveEstimation: Bool - - init(eventLoop: EventLoop) { - self.eventLoop = eventLoop - self.isActiveEstimation = true - } - - func cancel() -> EventLoopFuture { - return self.eventLoop.makeSucceededFuture(()) - } -} - -extension ConnectionPoolTests { - func buildState(count: Int, release: Bool = true, eventLoop: EventLoop? = nil) -> (HTTP1ConnectionProvider.ConnectionsState, [ConnectionForTests]) { - let eventLoop = eventLoop ?? self.eventLoop! - - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) - var items: [ConnectionForTests] = [] - - if count == 0 { - return (state, items) - } - - for _ in 1...count { - // Set up connection pool to have one available connection - do { - let connection = ConnectionForTests(eventLoop: eventLoop) - items.append(connection) - // First, we ask the empty pool for a connection, triggering connection creation - XCTAssertTrue(state.enqueue()) - let action = state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) - - switch action { - case .create(let waiter): - waiter.promise.succeed(connection) - default: - XCTFail("Unexpected action: \(action)") - } - - // We offer the connection to the pool so that it can be tracked - _ = state.offer(connection: connection) - } - } - - if release { - for item in items { - // No we release the connection, making it available for the next caller - _ = state.release(connection: item, closing: false) - } - } - return (state, items) - } -} - -func XCTAssertState(_ state: HTTP1ConnectionProvider.ConnectionsState, available: Int, leased: Int, waiters: Int, pending: Int, opened: Int) { - let snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(available, snapshot.availableConnections.count) - XCTAssertEqual(leased, snapshot.leasedConnections.count) - XCTAssertEqual(waiters, snapshot.waiters.count) - XCTAssertEqual(pending, snapshot.pending) - XCTAssertEqual(opened, snapshot.openedConnectionsCount) -} - -func XCTAssertState(_ state: HTTP1ConnectionProvider.ConnectionsState, available: Int, leased: Int, waiters: Int, pending: Int, opened: Int, isLeased connection: ConnectionType) { - let snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(available, snapshot.availableConnections.count) - XCTAssertEqual(leased, snapshot.leasedConnections.count) - XCTAssertEqual(waiters, snapshot.waiters.count) - XCTAssertEqual(pending, snapshot.pending) - XCTAssertEqual(opened, snapshot.openedConnectionsCount) - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) -} - -func XCTAssertState(_ state: HTTP1ConnectionProvider.ConnectionsState, available: Int, leased: Int, waiters: Int, pending: Int, opened: Int, isNotLeased connection: ConnectionType) { - let snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(available, snapshot.availableConnections.count) - XCTAssertEqual(leased, snapshot.leasedConnections.count) - XCTAssertEqual(waiters, snapshot.waiters.count) - XCTAssertEqual(pending, snapshot.pending) - XCTAssertEqual(opened, snapshot.openedConnectionsCount) - XCTAssertFalse(snapshot.leasedConnections.contains(ConnectionKey(connection))) -} - -struct XCTEmptyError: Error {} - -func XCTUnwrap(_ value: T?) throws -> T { - if let unwrapped = value { - return unwrapped - } - throw XCTEmptyError() -} - -struct TempError: Error {} - -func XCTAssertStateClose(_ state: HTTP1ConnectionProvider.ConnectionsState, available: Int, leased: Int, waiters: Int, clean: Bool) throws { - var state = state - - let (foundWaiters, foundAvailable, foundLeased, foundClean) = try XCTUnwrap(state.close()) - XCTAssertEqual(waiters, foundWaiters.count) - XCTAssertEqual(available, foundAvailable.count) - XCTAssertEqual(leased, foundLeased.count) - XCTAssertEqual(clean, foundClean) - - for waiter in foundWaiters { - waiter.promise.fail(TempError()) - } - - for lease in foundLeased { - try lease.cancel().wait() - } -} diff --git a/Tests/AsyncHTTPClientTests/ConnectionTests+XCTest.swift b/Tests/AsyncHTTPClientTests/ConnectionTests+XCTest.swift deleted file mode 100644 index 87837a2ca..000000000 --- a/Tests/AsyncHTTPClientTests/ConnectionTests+XCTest.swift +++ /dev/null @@ -1,35 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// -// -// ConnectionTests+XCTest.swift -// -import XCTest - -/// -/// NOTE: This file was generated by generate_linux_tests.rb -/// -/// Do NOT edit this file directly as it will be regenerated automatically when needed. -/// - -extension ConnectionTests { - static var allTests: [(String, (ConnectionTests) -> () throws -> Void)] { - return [ - ("testConnectionReleaseActive", testConnectionReleaseActive), - ("testConnectionReleaseInactive", testConnectionReleaseInactive), - ("testConnectionRemoteCloseRelease", testConnectionRemoteCloseRelease), - ("testConnectionTimeoutRelease", testConnectionTimeoutRelease), - ("testAcquireAvailableBecomesUnavailable", testAcquireAvailableBecomesUnavailable), - ] - } -} diff --git a/Tests/AsyncHTTPClientTests/ConnectionTests.swift b/Tests/AsyncHTTPClientTests/ConnectionTests.swift deleted file mode 100644 index e46d328fd..000000000 --- a/Tests/AsyncHTTPClientTests/ConnectionTests.swift +++ /dev/null @@ -1,239 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -@testable import AsyncHTTPClient -import NIOCore -import NIOEmbedded -import XCTest - -class ConnectionTests: XCTestCase { - var eventLoop: EmbeddedEventLoop! - var http1ConnectionProvider: HTTP1ConnectionProvider! - var pool: ConnectionPool! - - func buildState(connection: Connection, release: Bool) { - XCTAssertTrue(self.http1ConnectionProvider.state.enqueue()) - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - - switch action { - case .create(let waiter): - waiter.promise.succeed(connection) - default: - XCTFail("Unexpected action: \(action)") - } - - // We offer the connection to the pool so that it can be tracked - _ = self.http1ConnectionProvider.state.offer(connection: connection) - - if release { - _ = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - } - } - - // MARK: - Connection Tests - - func testConnectionReleaseActive() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - - self.buildState(connection: connection, release: false) - - XCTAssertState(self.http1ConnectionProvider.state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - - connection.release(closing: false, logger: HTTPClient.loggingDisabled) - - // XCTAssertFalse(connection.isInUse) - XCTAssertState(self.http1ConnectionProvider.state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - connection.remoteClosed(logger: HTTPClient.loggingDisabled) - } - - func testConnectionReleaseInactive() throws { - let channel = EmbeddedChannel() - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - - self.buildState(connection: connection, release: false) - - XCTAssertState(self.http1ConnectionProvider.state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - - connection.release(closing: true, logger: HTTPClient.loggingDisabled) - XCTAssertState(self.http1ConnectionProvider.state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - } - - func testConnectionRemoteCloseRelease() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - - self.buildState(connection: connection, release: true) - - XCTAssertState(self.http1ConnectionProvider.state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - - connection.remoteClosed(logger: HTTPClient.loggingDisabled) - - XCTAssertState(self.http1ConnectionProvider.state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - } - - func testConnectionTimeoutRelease() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - - self.buildState(connection: connection, release: true) - - XCTAssertState(self.http1ConnectionProvider.state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - - connection.timeout(logger: HTTPClient.loggingDisabled) - - XCTAssertState(self.http1ConnectionProvider.state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - } - - func testAcquireAvailableBecomesUnavailable() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - - self.buildState(connection: connection, release: true) - - XCTAssertState(self.http1ConnectionProvider.state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - switch action { - case .lease(let connection, let waiter): - // Since this connection is already in use, this should be a no-op and state should not have changed from normal lease - connection.timeout(logger: HTTPClient.loggingDisabled) - - XCTAssertTrue(connection.isActiveEstimation) - XCTAssertState(self.http1ConnectionProvider.state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1, isLeased: connection) - - // This is unrecoverable, but in this case we create a new connection, so state again should not change, even though release will be called - // This is important to preventself.http1ConnectionProvider deletion since connection is released and there could be 0 waiters - connection.remoteClosed(logger: HTTPClient.loggingDisabled) - - XCTAssertState(self.http1ConnectionProvider.state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1, isLeased: connection) - - // cleanup - waiter.promise.fail(TempError()) - connection.release(closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - override func setUp() { - XCTAssertNil(self.pool) - XCTAssertNil(self.eventLoop) - XCTAssertNil(self.http1ConnectionProvider) - self.eventLoop = EmbeddedEventLoop() - self.pool = ConnectionPool(configuration: .init(), - backgroundActivityLogger: HTTPClient.loggingDisabled) - XCTAssertNoThrow(self.http1ConnectionProvider = - try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), - eventLoop: self.eventLoop, - configuration: .init(), - tlsConfiguration: nil, - pool: self.pool, - sslContextCache: .init(), - backgroundActivityLogger: HTTPClient.loggingDisabled)) - } - - override func tearDown() { - XCTAssertNotNil(self.pool) - XCTAssertNotNil(self.eventLoop) - XCTAssertNotNil(self.http1ConnectionProvider) - XCTAssertNoThrow(try self.http1ConnectionProvider.close().wait()) - XCTAssertNoThrow(try self.eventLoop.syncShutdownGracefully()) - self.http1ConnectionProvider = nil - XCTAssertTrue(try self.pool.close(on: self.eventLoop).wait()) - self.eventLoop = nil - self.pool = nil - } -} - -class ActiveChannel: Channel, ChannelCore { - struct NotImplementedError: Error {} - - func localAddress0() throws -> SocketAddress { - throw NotImplementedError() - } - - func remoteAddress0() throws -> SocketAddress { - throw NotImplementedError() - } - - func register0(promise: EventLoopPromise?) { - promise?.fail(NotImplementedError()) - } - - func bind0(to: SocketAddress, promise: EventLoopPromise?) { - promise?.fail(NotImplementedError()) - } - - func connect0(to: SocketAddress, promise: EventLoopPromise?) { - promise?.fail(NotImplementedError()) - } - - func write0(_ data: NIOAny, promise: EventLoopPromise?) { - promise?.fail(NotImplementedError()) - } - - func flush0() {} - - func read0() {} - - func close0(error: Error, mode: CloseMode, promise: EventLoopPromise?) { - promise?.succeed(()) - } - - func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise?) { - promise?.fail(NotImplementedError()) - } - - func channelRead0(_: NIOAny) {} - - func errorCaught0(error: Error) {} - - var allocator: ByteBufferAllocator - var closeFuture: EventLoopFuture - var eventLoop: EventLoop - - var localAddress: SocketAddress? - var remoteAddress: SocketAddress? - var parent: Channel? - var isWritable: Bool = true - var isActive: Bool = true - - init(eventLoop: EmbeddedEventLoop) { - self.allocator = ByteBufferAllocator() - self.eventLoop = eventLoop - self.closeFuture = self.eventLoop.makeSucceededFuture(()) - } - - var _channelCore: ChannelCore { - return self - } - - var pipeline: ChannelPipeline { - return ChannelPipeline(channel: self) - } - - func setOption