diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1Connection.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1Connection.swift index 8268a598b..4c47eb2cc 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1Connection.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1Connection.swift @@ -79,8 +79,14 @@ final class HTTP1Connection { self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil) } + func close(promise: EventLoopPromise?) { + return self.channel.close(mode: .all, promise: promise) + } + func close() -> EventLoopFuture { - return self.channel.close() + let promise = self.channel.eventLoop.makePromise(of: Void.self) + self.close(promise: promise) + return promise.futureResult } func taskCompleted() { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift index 70c64105a..3761c646e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift @@ -144,8 +144,14 @@ final class HTTP2Connection { } } + func close(promise: EventLoopPromise?) { + return self.channel.close(mode: .all, promise: promise) + } + func close() -> EventLoopFuture { - self.channel.close() + let promise = self.channel.eventLoop.makePromise(of: Void.self) + self.close(promise: promise) + return promise.futureResult } private func start() -> EventLoopFuture { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Manager.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Manager.swift index 582f97527..0cbfbb3c0 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Manager.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Manager.swift @@ -12,7 +12,154 @@ // //===----------------------------------------------------------------------===// +import Logging +import NIO import NIOConcurrencyHelpers +import NIOHTTP1 + +extension HTTPConnectionPool { + final class Manager { + private typealias Key = ConnectionPool.Key + + private enum State { + case active + case shuttingDown(promise: EventLoopPromise?, unclean: Bool) + case shutDown + } + + private let eventLoopGroup: EventLoopGroup + private let configuration: HTTPClient.Configuration + private let connectionIDGenerator = Connection.ID.globalGenerator + private let logger: Logger + + private var state: State = .active + private var _pools: [Key: HTTPConnectionPool] = [:] + private let lock = Lock() + + private let sslContextCache = SSLContextCache() + + init(eventLoopGroup: EventLoopGroup, + configuration: HTTPClient.Configuration, + backgroundActivityLogger logger: Logger) { + self.eventLoopGroup = eventLoopGroup + self.configuration = configuration + self.logger = logger + } + + func executeRequest(_ request: HTTPSchedulableRequest) { + let poolKey = request.poolKey + let poolResult = self.lock.withLock { () -> Result in + switch self.state { + case .active: + if let pool = self._pools[poolKey] { + return .success(pool) + } + + let pool = HTTPConnectionPool( + eventLoopGroup: self.eventLoopGroup, + sslContextCache: self.sslContextCache, + tlsConfiguration: request.tlsConfiguration, + clientConfiguration: self.configuration, + key: poolKey, + delegate: self, + idGenerator: self.connectionIDGenerator, + backgroundActivityLogger: self.logger + ) + self._pools[poolKey] = pool + return .success(pool) + + case .shuttingDown, .shutDown: + return .failure(HTTPClientError.alreadyShutdown) + } + } + + switch poolResult { + case .success(let pool): + pool.executeRequest(request) + case .failure(let error): + request.fail(error) + } + } + + /// Shutdown the connection pool manager. You **must** shutdown the pool manager, since it leak otherwise. + /// + /// - Parameter promise: An `EventLoopPromise` that is succeeded once all connections pools are shutdown. + /// - Returns: An EventLoopFuture that is succeeded once the pool is shutdown. The bool indicates if the + /// shutdown was unclean. + func shutdown(promise: EventLoopPromise?) { + enum ShutdownAction { + case done(EventLoopPromise?) + case shutdown([Key: HTTPConnectionPool]) + } + + let action = self.lock.withLock { () -> ShutdownAction in + switch self.state { + case .active: + // If there aren't any pools, we can mark the pool as shut down right away. + if self._pools.isEmpty { + self.state = .shutDown + return .done(promise) + } else { + // this promise will be succeeded once all connection pools are shutdown + self.state = .shuttingDown(promise: promise, unclean: false) + return .shutdown(self._pools) + } + + case .shuttingDown, .shutDown: + preconditionFailure("PoolManager already shutdown") + } + } + + // if no pools are returned, the manager is already shutdown completely. Inform the + // delegate. This is a very clean shutdown... + switch action { + case .done(let promise): + promise?.succeed(false) + + case .shutdown(let pools): + pools.values.forEach { pool in + pool.shutdown() + } + } + } + } +} + +extension HTTPConnectionPool.Manager: HTTPConnectionPoolDelegate { + func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool) { + enum CloseAction { + case close(EventLoopPromise?, unclean: Bool) + case wait + } + + let closeAction = self.lock.withLock { () -> CloseAction in + switch self.state { + case .active, .shutDown: + preconditionFailure("Why are pools shutting down, if the manager did not give a signal") + + case .shuttingDown(let promise, let soFarUnclean): + guard self._pools.removeValue(forKey: pool.key) === pool else { + preconditionFailure("Expected that the pool was created by this manager and is known for this reason.") + } + + if self._pools.isEmpty { + self.state = .shutDown + return .close(promise, unclean: soFarUnclean || unclean) + } else { + self.state = .shuttingDown(promise: promise, unclean: soFarUnclean || unclean) + return .wait + } + } + } + + switch closeAction { + case .close(let promise, unclean: let unclean): + promise?.succeed(unclean) + case .wait: + break + } + } +} extension HTTPConnectionPool.Connection.ID { static var globalGenerator = Generator() diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index ba8c7ae62..475c3b8f3 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -12,9 +12,16 @@ // //===----------------------------------------------------------------------===// +import Logging +import NIOConcurrencyHelpers import NIOCore +import NIOSSL -enum HTTPConnectionPool { +protocol HTTPConnectionPoolDelegate { + func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool) +} + +final class HTTPConnectionPool { struct Connection: Hashable { typealias ID = Int @@ -85,14 +92,14 @@ enum HTTPConnectionPool { /// Closes the connection without cancelling running requests. Use this when you are sure, that the /// connection is currently idle. - fileprivate func close() -> EventLoopFuture { + fileprivate func close(promise: EventLoopPromise?) { switch self._ref { case .http1_1(let connection): - return connection.close() + return connection.close(promise: promise) case .http2(let connection): - return connection.close() - case .__testOnly_connection(_, let eventLoop): - return eventLoop.makeSucceededFuture(()) + return connection.close(promise: promise) + case .__testOnly_connection: + promise?.succeed(()) } } @@ -121,6 +128,377 @@ enum HTTPConnectionPool { } } } + + private let stateLock = Lock() + private var _state: StateMachine { + didSet { + self.logger.trace("Connection Pool State changed", metadata: [ + "key": "\(self.key)", + "state": "\(self._state)", + ]) + } + } + + private static let fallbackConnectTimeout: TimeAmount = .seconds(30) + + let key: ConnectionPool.Key + + private let timerLock = Lock() + private var _requestTimer = [Request.ID: Scheduled]() + private var _idleTimer = [Connection.ID: Scheduled]() + private var _backoffTimer = [Connection.ID: Scheduled]() + + private var logger: Logger + + private let eventLoopGroup: EventLoopGroup + private let connectionFactory: ConnectionFactory + private let clientConfiguration: HTTPClient.Configuration + private let idleConnectionTimeout: TimeAmount + + let delegate: HTTPConnectionPoolDelegate + + init(eventLoopGroup: EventLoopGroup, + sslContextCache: SSLContextCache, + tlsConfiguration: TLSConfiguration?, + clientConfiguration: HTTPClient.Configuration, + key: ConnectionPool.Key, + delegate: HTTPConnectionPoolDelegate, + idGenerator: Connection.ID.Generator, + backgroundActivityLogger logger: Logger) { + self.eventLoopGroup = eventLoopGroup + self.connectionFactory = ConnectionFactory( + key: key, + tlsConfiguration: tlsConfiguration, + clientConfiguration: clientConfiguration, + sslContextCache: sslContextCache + ) + self.clientConfiguration = clientConfiguration + self.key = key + self.delegate = delegate + self.logger = logger + + self.idleConnectionTimeout = clientConfiguration.connectionPool.idleTimeout + + self._state = StateMachine( + eventLoopGroup: eventLoopGroup, + idGenerator: idGenerator, + maximumConcurrentHTTP1Connections: 8 + ) + } + + func executeRequest(_ request: HTTPSchedulableRequest) { + let action = self.stateLock.withLock { () -> StateMachine.Action in + self._state.executeRequest(.init(request)) + } + self.run(action: action) + } + + func shutdown() { + let action = self.stateLock.withLock { () -> StateMachine.Action in + self._state.shutdown() + } + self.run(action: action) + } + + // MARK: Run actions + + private func run(action: StateMachine.Action) { + self.runConnectionAction(action.connection) + self.runRequestAction(action.request) + } + + private func runConnectionAction(_ action: StateMachine.ConnectionAction) { + switch action { + case .createConnection(let connectionID, let eventLoop): + self.createConnection(connectionID, on: eventLoop) + + case .scheduleBackoffTimer(let connectionID, let backoff, on: let eventLoop): + self.scheduleConnectionStartBackoffTimer(connectionID, backoff, on: eventLoop) + + case .scheduleTimeoutTimer(let connectionID, on: let eventLoop): + self.scheduleIdleTimerForConnection(connectionID, on: eventLoop) + + case .cancelTimeoutTimer(let connectionID): + self.cancelIdleTimerForConnection(connectionID) + + case .closeConnection(let connection, isShutdown: let isShutdown): + // we are not interested in the close future... + connection.close(promise: nil) + + if case .yes(let unclean) = isShutdown { + self.delegate.connectionPoolDidShutdown(self, unclean: unclean) + } + + case .cleanupConnections(let cleanupContext, isShutdown: let isShutdown): + for connection in cleanupContext.close { + connection.close(promise: nil) + } + + for connection in cleanupContext.cancel { + connection.close(promise: nil) + } + + for connectionID in cleanupContext.connectBackoff { + self.cancelConnectionStartBackoffTimer(connectionID) + } + + if case .yes(let unclean) = isShutdown { + self.delegate.connectionPoolDidShutdown(self, unclean: unclean) + } + + case .none: + break + } + } + + private func runRequestAction(_ action: StateMachine.RequestAction) { + // The order of execution fail/execute request vs cancelling the request timeout timer does + // not matter in the actions here. The actions don't cause any side effects that will be + // reported back to the state machine and are not dependent on each other. + + switch action { + case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout): + if cancelTimeout { + self.cancelRequestTimeout(request.id) + } + connection.executeRequest(request.req) + + case .executeRequestsAndCancelTimeouts(let requests, let connection): + self.cancelRequestTimeouts(requests) + requests.forEach { connection.executeRequest($0.req) } + + case .failRequest(let request, let error, cancelTimeout: let cancelTimeout): + if cancelTimeout { + self.cancelRequestTimeout(request.id) + } + request.req.fail(error) + + case .failRequestsAndCancelTimeouts(let requests, let error): + self.cancelRequestTimeouts(requests) + requests.forEach { $0.req.fail(error) } + + case .scheduleRequestTimeout(let request, on: let eventLoop): + self.scheduleRequestTimeout(request, on: eventLoop) + + case .cancelRequestTimeout(let requestID): + self.cancelRequestTimeout(requestID) + + case .none: + break + } + } + + private func createConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) { + // Even though this function is called make it actually creates/establishes a connection. + // TBD: Should we rename it? To what? + self.connectionFactory.makeConnection( + for: self, + connectionID: connectionID, + http1ConnectionDelegate: self, + http2ConnectionDelegate: self, + deadline: .now() + (self.clientConfiguration.timeout.connect ?? Self.fallbackConnectTimeout), + eventLoop: eventLoop, + logger: self.logger + ) + } + + private func scheduleRequestTimeout(_ request: Request, on eventLoop: EventLoop) { + let requestID = request.id + let scheduled = eventLoop.scheduleTask(deadline: request.connectionDeadline) { + // The timer has fired. Now we need to do a couple of things: + // + // 1. Remove ourselves from the timer dictionary to not leak any data. If our + // waiter entry still exists, we need to tell the state machine, that we want + // to fail the request. + let timeoutFired = self.timerLock.withLock { + self._requestTimer.removeValue(forKey: requestID) != nil + } + + // 2. If the entry did not exists anymore, we can assume that the request was + // scheduled on another connection. The timer still fired anyhow because of a + // race. In such a situation we don't need to do anything. + guard timeoutFired else { return } + + // 3. Tell the state machine about the timeout + let action = self.stateLock.withLock { + self._state.timeoutRequest(requestID) + } + + self.run(action: action) + } + + self.timerLock.withLockVoid { + assert(self._requestTimer[requestID] == nil) + self._requestTimer[requestID] = scheduled + } + + request.req.requestWasQueued(self) + } + + private func cancelRequestTimeout(_ id: Request.ID) { + let scheduled = self.timerLock.withLock { + self._requestTimer.removeValue(forKey: id) + } + + scheduled?.cancel() + } + + private func cancelRequestTimeouts(_ requests: [Request]) { + let scheduled = self.timerLock.withLock { + requests.compactMap { + self._requestTimer.removeValue(forKey: $0.id) + } + } + scheduled.forEach { $0.cancel() } + } + + private func scheduleIdleTimerForConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) { + let scheduled = eventLoop.scheduleTask(in: self.idleConnectionTimeout) { + // there might be a race between a cancelTimer call and the triggering + // of this scheduled task. both want to acquire the lock + let timerExisted = self.timerLock.withLock { + self._idleTimer.removeValue(forKey: connectionID) != nil + } + + guard timerExisted else { return } + + let action = self.stateLock.withLock { + self._state.connectionIdleTimeout(connectionID) + } + self.run(action: action) + } + + self.timerLock.withLock { + assert(self._idleTimer[connectionID] == nil) + self._idleTimer[connectionID] = scheduled + } + } + + private func cancelIdleTimerForConnection(_ connectionID: Connection.ID) { + let cancelTimer = self.timerLock.withLock { + self._idleTimer.removeValue(forKey: connectionID) + } + + cancelTimer?.cancel() + } + + private func scheduleConnectionStartBackoffTimer( + _ connectionID: Connection.ID, + _ timeAmount: TimeAmount, + on eventLoop: EventLoop + ) { + let scheduled = eventLoop.scheduleTask(in: timeAmount) { + // there might be a race between a backoffTimer and the pool shutting down. + let timerExisted = self.timerLock.withLock { + self._backoffTimer.removeValue(forKey: connectionID) != nil + } + + guard timerExisted else { return } + + let action = self.stateLock.withLock { + self._state.connectionCreationBackoffDone(connectionID) + } + self.run(action: action) + } + + self.timerLock.withLock { + assert(self._backoffTimer[connectionID] == nil) + self._backoffTimer[connectionID] = scheduled + } + } + + private func cancelConnectionStartBackoffTimer(_ connectionID: Connection.ID) { + let backoffTimer = self.timerLock.withLock { + self._backoffTimer[connectionID] + } + + backoffTimer?.cancel() + } +} + +// MARK: - Protocol methods - + +extension HTTPConnectionPool: HTTPConnectionRequester { + func http1ConnectionCreated(_ connection: HTTP1Connection) { + let action = self.stateLock.withLock { + self._state.newHTTP1ConnectionCreated(.http1_1(connection)) + } + self.run(action: action) + } + + func http2ConnectionCreated(_ connection: HTTP2Connection, maximumStreams: Int) { + preconditionFailure("Did not expect http/2 connections right now.") +// let action = self.stateLock.withLock { () -> StateMachine.Action in +// if let settings = connection.settings { +// return self._state.newHTTP2ConnectionCreated(.http2(connection), settings: settings) +// } else { +// // immidiate connection closure before we can register with state machine +// // is the only reason we don't have settings +// struct ImmidiateConnectionClose: Error {} +// return self._state.failedToCreateNewConnection(ImmidiateConnectionClose(), connectionID: connection.id) +// } +// } +// self.run(action: action) + } + + func failedToCreateHTTPConnection(_ connectionID: HTTPConnectionPool.Connection.ID, error: Error) { + let action = self.stateLock.withLock { + self._state.failedToCreateNewConnection(error, connectionID: connectionID) + } + self.run(action: action) + } +} + +extension HTTPConnectionPool: HTTP1ConnectionDelegate { + func http1ConnectionClosed(_ connection: HTTP1Connection) { + let action = self.stateLock.withLock { + self._state.connectionClosed(connection.id) + } + self.run(action: action) + } + + func http1ConnectionReleased(_ connection: HTTP1Connection) { + let action = self.stateLock.withLock { + self._state.http1ConnectionReleased(connection.id) + } + self.run(action: action) + } +} + +extension HTTPConnectionPool: HTTP2ConnectionDelegate { + func http2Connection(_ connection: HTTP2Connection, newMaxStreamSetting: Int) { + // ignore for now + } + + func http2ConnectionGoAwayReceived(_: HTTP2Connection) { + // ignore for now + } + + func http2ConnectionClosed(_: HTTP2Connection) { + // ignore for now +// let action = self.stateLock.withLock { +// self._state.connectionClosed(connection.id) +// } +// self.run(action: action) + } + + func http2ConnectionStreamClosed(_ connection: HTTP2Connection, availableStreams: Int) { + // ignore for now +// let action = self.stateLock.withLock { +// self._state.http2ConnectionStreamClosed(connection.id, availableStreams: availableStreams) +// } +// self.run(action: action) + } +} + +extension HTTPConnectionPool: HTTPRequestScheduler { + func cancelRequest(_ request: HTTPSchedulableRequest) { + let requestID = Request(request).id + let action = self.stateLock.withLock { + self._state.cancelRequest(requestID) + } + self.run(action: action) + } } extension HTTPConnectionPool { @@ -156,7 +534,7 @@ extension HTTPConnectionPool { self.req.preferredEventLoop } - var connectionDeadline: NIODeadline? { + var connectionDeadline: NIODeadline { self.req.connectionDeadline } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift index 1b399bb18..79aa1c1b3 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift @@ -15,6 +15,7 @@ import Logging import NIOCore import NIOHTTP1 +import NIOSSL /// # Protocol Overview /// @@ -140,6 +141,16 @@ protocol HTTPRequestScheduler { /// queuer and executor. The client's methods will be called synchronously on an `EventLoop` by the /// executor. For this reason it is very important that the implementation of these functions never blocks. protocol HTTPSchedulableRequest: HTTPExecutableRequest { + /// The tasks connection pool key + /// + /// Based on this key the correct connection pool will be chosen for the request + var poolKey: ConnectionPool.Key { get } + + /// An optional custom `TLSConfiguration`. + /// + /// If you want to override the default `TLSConfiguration` ensure that this property is non nil + var tlsConfiguration: TLSConfiguration? { get } + /// The task's logger var logger: Logger { get } diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 478f62a6a..596bfc57b 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -829,7 +829,7 @@ public class HTTPClient { extension HTTPClient.Configuration { /// Timeout configuration. public struct Timeout { - /// Specifies connect timeout. + /// Specifies connect timeout. If no connect timeout is given, a default 30 seconds timeout will applied. public var connect: TimeAmount? /// Specifies read timeout. public var read: TimeAmount? diff --git a/Sources/AsyncHTTPClient/RequestBag.swift b/Sources/AsyncHTTPClient/RequestBag.swift index 56d5db115..8da4ca480 100644 --- a/Sources/AsyncHTTPClient/RequestBag.swift +++ b/Sources/AsyncHTTPClient/RequestBag.swift @@ -17,6 +17,7 @@ import Logging import NIOConcurrencyHelpers import NIOCore import NIOHTTP1 +import NIOSSL final class RequestBag { let task: HTTPClient.Task @@ -313,6 +314,14 @@ final class RequestBag { } extension RequestBag: HTTPSchedulableRequest { + var poolKey: ConnectionPool.Key { + ConnectionPool.Key(self.request) + } + + var tlsConfiguration: TLSConfiguration? { + self.request.tlsConfiguration + } + func requestWasQueued(_ scheduler: HTTPRequestScheduler) { if self.task.eventLoop.inEventLoop { self.requestWasQueued0(scheduler) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 0789575e3..db5d871fc 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -306,11 +306,15 @@ internal final class HTTPBin where let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) - private let activeConnCounterHandler: CountActiveConnectionsHandler + private let activeConnCounterHandler: ConnectionsCountHandler var activeConnections: Int { return self.activeConnCounterHandler.currentlyActiveConnections } + var createdConnections: Int { + return self.activeConnCounterHandler.createdConnections + } + var port: Int { return Int(self.serverChannel.localAddress!.port!) } @@ -343,7 +347,7 @@ internal final class HTTPBin where socketAddress = try! SocketAddress(unixDomainSocketPath: path) } - self.activeConnCounterHandler = CountActiveConnectionsHandler() + self.activeConnCounterHandler = ConnectionsCountHandler() let connectionIDAtomic = NIOAtomic.makeAtomic(value: 0) @@ -908,19 +912,25 @@ internal final class HTTPBinHandler: ChannelInboundHandler { } } -final class CountActiveConnectionsHandler: ChannelInboundHandler { +final class ConnectionsCountHandler: ChannelInboundHandler { typealias InboundIn = Channel private let activeConns = NIOAtomic.makeAtomic(value: 0) + private let createdConns = NIOAtomic.makeAtomic(value: 0) + + var createdConnections: Int { + self.createdConns.load() + } - public var currentlyActiveConnections: Int { - return self.activeConns.load() + var currentlyActiveConnections: Int { + self.activeConns.load() } func channelRead(context: ChannelHandlerContext, data: NIOAny) { let channel = self.unwrapInboundIn(data) _ = self.activeConns.add(1) + _ = self.createdConns.add(1) channel.closeFuture.whenComplete { _ in _ = self.activeConns.sub(1) } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+ManagerTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+ManagerTests+XCTest.swift new file mode 100644 index 000000000..93945f63c --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+ManagerTests+XCTest.swift @@ -0,0 +1,33 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// +// HTTPConnectionPool+ManagerTests+XCTest.swift +// +import XCTest + +/// +/// NOTE: This file was generated by generate_linux_tests.rb +/// +/// Do NOT edit this file directly as it will be regenerated automatically when needed. +/// + +extension HTTPConnectionPool_ManagerTests { + static var allTests: [(String, (HTTPConnectionPool_ManagerTests) -> () throws -> Void)] { + return [ + ("testManagerHappyPath", testManagerHappyPath), + ("testShutdownManagerThatHasSeenNoConnections", testShutdownManagerThatHasSeenNoConnections), + ("testExecutingARequestOnAShutdownPoolManager", testExecutingARequestOnAShutdownPoolManager), + ] + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+ManagerTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+ManagerTests.swift new file mode 100644 index 000000000..c4669455e --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+ManagerTests.swift @@ -0,0 +1,125 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +import NIO +import NIOHTTP1 +import XCTest + +class HTTPConnectionPool_ManagerTests: XCTestCase { + func testManagerHappyPath() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 4) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let httpBin1 = HTTPBin() + defer { XCTAssertNoThrow(try httpBin1.shutdown()) } + + let httpBin2 = HTTPBin() + defer { XCTAssertNoThrow(try httpBin2.shutdown()) } + + let server = [httpBin1, httpBin2] + + let poolManager = HTTPConnectionPool.Manager( + eventLoopGroup: eventLoopGroup, + configuration: .init(), + backgroundActivityLogger: .init(label: "test") + ) + + defer { + let promise = eventLoopGroup.next().makePromise(of: Bool.self) + poolManager.shutdown(promise: promise) + XCTAssertNoThrow(try promise.futureResult.wait()) + } + + for i in 0..<9 { + let httpBin = server[i % 2] + + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoopGroup.next(), logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(5), + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + poolManager.executeRequest(requestBag) + + XCTAssertNoThrow(try requestBag.task.futureResult.wait()) + XCTAssertEqual(httpBin.activeConnections, 1) + } + } + + func testShutdownManagerThatHasSeenNoConnections() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let poolManager = HTTPConnectionPool.Manager( + eventLoopGroup: eventLoopGroup, + configuration: .init(), + backgroundActivityLogger: .init(label: "test") + ) + + let eventLoop = eventLoopGroup.next() + let promise = eventLoop.makePromise(of: Bool.self) + poolManager.shutdown(promise: promise) + XCTAssertFalse(try promise.futureResult.wait()) + } + + func testExecutingARequestOnAShutdownPoolManager() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let httpBin = HTTPBin() + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + + let poolManager = HTTPConnectionPool.Manager( + eventLoopGroup: eventLoopGroup, + configuration: .init(), + backgroundActivityLogger: .init(label: "test") + ) + + let eventLoop = eventLoopGroup.next() + let promise = eventLoop.makePromise(of: Bool.self) + poolManager.shutdown(promise: promise) + XCTAssertFalse(try promise.futureResult.wait()) + + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoopGroup.next(), logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(5), + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + poolManager.executeRequest(requestBag) + + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { + XCTAssertEqual($0 as? HTTPClientError, .alreadyShutdown) + } + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift index f4ebd4ebb..a545e06ea 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift @@ -17,6 +17,7 @@ import Logging import NIOCore import NIOEmbedded import NIOHTTP1 +import NIOSSL import XCTest class HTTPConnectionPool_RequestQueueTests: XCTestCase { @@ -88,6 +89,8 @@ private class MockScheduledRequest: HTTPSchedulableRequest { self.requiredEventLoop = requiredEventLoop } + var poolKey: ConnectionPool.Key { preconditionFailure("Unimplemented") } + var tlsConfiguration: TLSConfiguration? { nil } var logger: Logger { preconditionFailure("Unimplemented") } var connectionDeadline: NIODeadline { preconditionFailure("Unimplemented") } var preferredEventLoop: EventLoop { preconditionFailure("Unimplemented") } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests+XCTest.swift new file mode 100644 index 000000000..da3bb5db7 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests+XCTest.swift @@ -0,0 +1,36 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// +// HTTPConnectionPoolTests+XCTest.swift +// +import XCTest + +/// +/// NOTE: This file was generated by generate_linux_tests.rb +/// +/// Do NOT edit this file directly as it will be regenerated automatically when needed. +/// + +extension HTTPConnectionPoolTests { + static var allTests: [(String, (HTTPConnectionPoolTests) -> () throws -> Void)] { + return [ + ("testOnlyOneConnectionIsUsedForSubSequentRequests", testOnlyOneConnectionIsUsedForSubSequentRequests), + ("testConnectionsForEventLoopRequirementsAreClosed", testConnectionsForEventLoopRequirementsAreClosed), + ("testConnectionPoolGrowsToMaxConcurrentConnections", testConnectionPoolGrowsToMaxConcurrentConnections), + ("testConnectionCreationIsRetriedUntilRequestIsFailed", testConnectionCreationIsRetriedUntilRequestIsFailed), + ("testConnectionCreationIsRetriedUntilPoolIsShutdown", testConnectionCreationIsRetriedUntilPoolIsShutdown), + ("testConnectionCreationIsRetriedUntilRequestIsCancelled", testConnectionCreationIsRetriedUntilRequestIsCancelled), + ] + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests.swift new file mode 100644 index 000000000..dd13284b1 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests.swift @@ -0,0 +1,358 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +import Logging +import NIOCore +import NIOPosix +import XCTest + +class HTTPConnectionPoolTests: XCTestCase { + func testOnlyOneConnectionIsUsedForSubSequentRequests() { + let httpBin = HTTPBin() + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init(), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + defer { + pool.shutdown() + XCTAssertNoThrow(try poolDelegate.future.wait()) + XCTAssertNoThrow(try eventLoop.scheduleTask(in: .seconds(1)) {}.futureResult.wait()) + XCTAssertEqual(httpBin.activeConnections, 0) + } + + XCTAssertEqual(httpBin.createdConnections, 0) + + for _ in 0..<10 { + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoop, logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .distantFuture, + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + + XCTAssertNoThrow(try requestBag.task.futureResult.wait()) + XCTAssertEqual(httpBin.activeConnections, 1) + XCTAssertEqual(httpBin.createdConnections, 1) + } + } + + func testConnectionsForEventLoopRequirementsAreClosed() { + let httpBin = HTTPBin() + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init(), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + defer { + pool.shutdown() + XCTAssertNoThrow(try poolDelegate.future.wait()) + XCTAssertNoThrow(try eventLoop.scheduleTask(in: .milliseconds(100)) {}.futureResult.wait()) + XCTAssertEqual(httpBin.activeConnections, 0) + XCTAssertEqual(httpBin.createdConnections, 10) + } + + XCTAssertEqual(httpBin.createdConnections, 0) + + for i in 0..<10 { + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .init(.testOnly_exact(channelOn: eventLoopGroup.next(), delegateOn: eventLoopGroup.next())), + task: .init(eventLoop: eventLoop, logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .distantFuture, + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + XCTAssertNoThrow(try requestBag.task.futureResult.wait()) + XCTAssertEqual(httpBin.createdConnections, i + 1) + } + } + + func testConnectionPoolGrowsToMaxConcurrentConnections() { + let httpBin = HTTPBin() + let maxConnections = 8 + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init(connectionPool: .init(idleTimeout: .milliseconds(500))), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + defer { + pool.shutdown() + XCTAssertNoThrow(try poolDelegate.future.wait()) + + XCTAssertEqual(httpBin.activeConnections, 0) + XCTAssertEqual(httpBin.createdConnections, maxConnections) + } + + XCTAssertEqual(httpBin.createdConnections, 0) + + var tasks = [EventLoopFuture]() + + for _ in 0..<1000 { + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoopGroup.next(), logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .distantFuture, + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + tasks.append(requestBag.task.futureResult) + } + + XCTAssertNoThrow(try EventLoopFuture.whenAllSucceed(tasks, on: eventLoopGroup.next()).wait()) + XCTAssertEqual(httpBin.activeConnections, maxConnections) + XCTAssertNoThrow(try eventLoop.scheduleTask(in: .milliseconds(600)) {}.futureResult.wait()) + XCTAssertEqual(httpBin.activeConnections, 0) + } + + func testConnectionCreationIsRetriedUntilRequestIsFailed() { + let httpBin = HTTPBin(proxy: .simulate(authorization: "abc123")) + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:9000") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init( + proxy: .init(host: "localhost", port: httpBin.port, type: .http(.basic(credentials: "invalid"))) + ), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + defer { + pool.shutdown() + XCTAssertNoThrow(try poolDelegate.future.wait()) + } + + XCTAssertEqual(httpBin.createdConnections, 0) + + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoopGroup.next(), logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(5), + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { + XCTAssertEqual($0 as? HTTPClientError, .getConnectionFromPoolTimeout) + } + XCTAssertGreaterThanOrEqual(httpBin.createdConnections, 8) + XCTAssertEqual(httpBin.activeConnections, 0) + } + + func testConnectionCreationIsRetriedUntilPoolIsShutdown() { + let httpBin = HTTPBin(proxy: .simulate(authorization: "abc123")) + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:9000") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init( + proxy: .init(host: "localhost", port: httpBin.port, type: .http(.basic(credentials: "invalid"))) + ), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + + XCTAssertEqual(httpBin.createdConnections, 0) + + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoopGroup.next(), logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(5), + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + XCTAssertNoThrow(try eventLoop.scheduleTask(in: .seconds(2)) {}.futureResult.wait()) + pool.shutdown() + + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { + XCTAssertEqual($0 as? HTTPClientError, .cancelled) + } + XCTAssertGreaterThanOrEqual(httpBin.createdConnections, 3) + XCTAssertNoThrow(try poolDelegate.future.wait()) + XCTAssertEqual(httpBin.activeConnections, 0) + } + + func testConnectionCreationIsRetriedUntilRequestIsCancelled() { + let httpBin = HTTPBin(proxy: .simulate(authorization: "abc123")) + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:9000") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init( + proxy: .init(host: "localhost", port: httpBin.port, type: .http(.basic(credentials: "invalid"))) + ), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + defer { + pool.shutdown() + XCTAssertNoThrow(try poolDelegate.future.wait()) + } + + XCTAssertEqual(httpBin.createdConnections, 0) + + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoopGroup.next(), logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(5), + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + XCTAssertNoThrow(try eventLoop.scheduleTask(in: .milliseconds(100)) {}.futureResult.wait()) + requestBag.cancel() + + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { + XCTAssertEqual($0 as? HTTPClientError, .cancelled) + } + XCTAssertGreaterThanOrEqual(httpBin.createdConnections, 1) + } +} + +class TestDelegate: HTTPConnectionPoolDelegate { + private let promise: EventLoopPromise + var future: EventLoopFuture { + self.promise.futureResult + } + + init(eventLoop: EventLoop) { + self.promise = eventLoop.makePromise(of: Bool.self) + } + + func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool) { + self.promise.succeed(unclean) + } +} diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift index eb49a6fa6..4f3eb9389 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift @@ -16,6 +16,7 @@ import Logging import NIO import NIOHTTP1 +import NIOSSL /// A mock connection pool (not creating any actual connections) that is used to validate /// connection actions returned by the `HTTPConnectionPool.StateMachine`. @@ -576,6 +577,12 @@ class MockHTTPRequest: HTTPSchedulableRequest { // MARK: HTTPSchedulableRequest + var poolKey: ConnectionPool.Key { + preconditionFailure("Unimplemented") + } + + var tlsConfiguration: TLSConfiguration? { nil } + func requestWasQueued(_: HTTPRequestScheduler) { preconditionFailure("Unimplemented") } diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index a30d560a9..e42154c06 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -40,9 +40,11 @@ import XCTest testCase(HTTPClientNIOTSTests.allTests), testCase(HTTPClientSOCKSTests.allTests), testCase(HTTPClientTests.allTests), + testCase(HTTPConnectionPoolTests.allTests), testCase(HTTPConnectionPool_FactoryTests.allTests), testCase(HTTPConnectionPool_HTTP1ConnectionsTests.allTests), testCase(HTTPConnectionPool_HTTP1StateMachineTests.allTests), + testCase(HTTPConnectionPool_ManagerTests.allTests), testCase(HTTPConnectionPool_RequestQueueTests.allTests), testCase(HTTPRequestStateMachineTests.allTests), testCase(LRUCacheTests.allTests),