diff --git a/.swiftformat b/.swiftformat index 3e81e26de..3ac6aa632 100644 --- a/.swiftformat +++ b/.swiftformat @@ -9,5 +9,6 @@ --patternlet inline --stripunusedargs unnamed-only --ranges nospace +--disable typeSugar # https://github.com/nicklockwood/SwiftFormat/issues/636 # rules diff --git a/Package.swift b/Package.swift index 7a79e52fa..bce6b69c5 100644 --- a/Package.swift +++ b/Package.swift @@ -25,15 +25,18 @@ let package = Package( .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.7.0"), .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.3.0"), .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.5.1"), + .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"), ], targets: [ .target( name: "AsyncHTTPClient", - dependencies: ["NIO", "NIOHTTP1", "NIOSSL", "NIOConcurrencyHelpers", "NIOHTTPCompression", "NIOFoundationCompat", "NIOTransportServices"] + dependencies: ["NIO", "NIOHTTP1", "NIOSSL", "NIOConcurrencyHelpers", "NIOHTTPCompression", + "NIOFoundationCompat", "NIOTransportServices", "Logging"] ), .testTarget( name: "AsyncHTTPClientTests", - dependencies: ["NIO", "NIOConcurrencyHelpers", "NIOSSL", "AsyncHTTPClient", "NIOFoundationCompat", "NIOTestUtils"] + dependencies: ["NIO", "NIOConcurrencyHelpers", "NIOSSL", "AsyncHTTPClient", "NIOFoundationCompat", + "NIOTestUtils", "Logging"] ), ] ) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 72a911c97..46e0b6109 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Foundation +import Logging import NIO import NIOConcurrencyHelpers import NIOHTTP1 @@ -38,8 +39,11 @@ final class ConnectionPool { /// - Warning: This lock should always be acquired *before* `HTTP1ConnectionProvider`s `lock` if used in combination with it. private let lock = Lock() - init(configuration: HTTPClient.Configuration) { + private let backgroundActivityLogger: Logger + + init(configuration: HTTPClient.Configuration, backgroundActivityLogger: Logger) { self.configuration = configuration + self.backgroundActivityLogger = backgroundActivityLogger } /// Gets the `EventLoop` associated with the given `Key` if it exists @@ -64,7 +68,12 @@ final class ConnectionPool { /// When the pool is asked for a new connection, it creates a `Key` from the url associated to the `request`. This key /// is used to determine if there already exists an associated `HTTP1ConnectionProvider` in `providers`. /// If there is, the connection provider then takes care of leasing a new connection. If a connection provider doesn't exist, it is created. - func getConnection(for request: HTTPClient.Request, preference: HTTPClient.EventLoopPreference, on eventLoop: EventLoop, deadline: NIODeadline?, setupComplete: EventLoopFuture<Void>) -> EventLoopFuture<Connection> { + func getConnection(_ request: HTTPClient.Request, + preference: HTTPClient.EventLoopPreference, + taskEventLoop: EventLoop, + deadline: NIODeadline?, + setupComplete: EventLoopFuture<Void>, + logger: Logger) -> EventLoopFuture<Connection> { let key = Key(request) let provider: HTTP1ConnectionProvider = self.lock.withLock { @@ -72,13 +81,17 @@ final class ConnectionPool { return existing } else { // Connection provider will be created with `pending = 1` - let provider = HTTP1ConnectionProvider(key: key, eventLoop: eventLoop, configuration: self.configuration, pool: self) + let provider = HTTP1ConnectionProvider(key: key, + eventLoop: taskEventLoop, + configuration: self.configuration, + pool: self, + backgroundActivityLogger: self.backgroundActivityLogger) self.providers[key] = provider return provider } } - return provider.getConnection(preference: preference, setupComplete: setupComplete) + return provider.getConnection(preference: preference, setupComplete: setupComplete, logger: logger) } func delete(_ provider: HTTP1ConnectionProvider) { @@ -182,21 +195,21 @@ class Connection { /// Release this `Connection` to its associated `HTTP1ConnectionProvider`. /// /// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline. - func release(closing: Bool) { + func release(closing: Bool, logger: Logger) { assert(self.channel.eventLoop.inEventLoop) - self.provider.release(connection: self, closing: closing) + self.provider.release(connection: self, closing: closing, logger: logger) } /// Called when channel exceeds idle time in pool. - func timeout() { + func timeout(logger: Logger) { assert(self.channel.eventLoop.inEventLoop) - self.provider.timeout(connection: self) + self.provider.timeout(connection: self, logger: logger) } /// Called when channel goes inactive while in the pool. - func remoteClosed() { + func remoteClosed(logger: Logger) { assert(self.channel.eventLoop.inEventLoop) - self.provider.remoteClosed(connection: self) + self.provider.remoteClosed(connection: self, logger: logger) } func cancel() -> EventLoopFuture<Void> { @@ -209,9 +222,10 @@ class Connection { } /// Sets idle timeout handler and channel inactivity listener. - func setIdleTimeout(timeout: TimeAmount?) { + func setIdleTimeout(timeout: TimeAmount?, logger: Logger) { _ = self.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: timeout), position: .first).flatMap { _ in - self.channel.pipeline.addHandler(IdlePoolConnectionHandler(connection: self)) + self.channel.pipeline.addHandler(IdlePoolConnectionHandler(connection: self, + logger: logger)) } } @@ -275,6 +289,8 @@ class HTTP1ConnectionProvider { var state: ConnectionsState + private let backgroundActivityLogger: Logger + /// Creates a new `HTTP1ConnectionProvider` /// /// - parameters: @@ -282,63 +298,96 @@ class HTTP1ConnectionProvider { /// - configuration: The client configuration used globally by all requests /// - initialConnection: The initial connection the pool initializes this provider with /// - pool: The pool this provider belongs to - init(key: ConnectionPool.Key, eventLoop: EventLoop, configuration: HTTPClient.Configuration, pool: ConnectionPool) { + /// - backgroundActivityLogger: The logger used to log activity in the background, ie. not associated with a + /// request. + init(key: ConnectionPool.Key, + eventLoop: EventLoop, + configuration: HTTPClient.Configuration, + pool: ConnectionPool, + backgroundActivityLogger: Logger) { self.eventLoop = eventLoop self.configuration = configuration self.key = key self.pool = pool self.closePromise = eventLoop.makePromise() self.state = .init(eventLoop: eventLoop) + self.backgroundActivityLogger = backgroundActivityLogger } deinit { self.state.assertInvariants() } - private func execute(_ action: Action) { + private func execute(_ action: Action, logger: Logger) { switch action { case .lease(let connection, let waiter): // if connection is became inactive, we create a new one. connection.cancelIdleTimeout().whenComplete { _ in if connection.isActiveEstimation { + logger.trace("leasing existing connection", + metadata: ["ahc-connection": "\(connection)"]) waiter.promise.succeed(connection) } else { + logger.trace("opening fresh connection (found matching but inactive connection)", + metadata: ["ahc-dead-connection": "\(connection)"]) self.makeChannel(preference: waiter.preference).whenComplete { result in - self.connect(result, waiter: waiter, replacing: connection) + self.connect(result, waiter: waiter, replacing: connection, logger: logger) } } } case .create(let waiter): + logger.trace("opening fresh connection (no connections to reuse available)") self.makeChannel(preference: waiter.preference).whenComplete { result in - self.connect(result, waiter: waiter) + self.connect(result, waiter: waiter, logger: logger) } case .replace(let connection, let waiter): connection.cancelIdleTimeout().flatMap { connection.close() }.whenComplete { _ in + logger.trace("opening fresh connection (replacing exising connection)", + metadata: ["ahc-old-connection": "\(connection)", + "ahc-waiter": "\(waiter)"]) self.makeChannel(preference: waiter.preference).whenComplete { result in - self.connect(result, waiter: waiter, replacing: connection) + self.connect(result, waiter: waiter, replacing: connection, logger: logger) } } case .park(let connection): - connection.setIdleTimeout(timeout: self.configuration.maximumAllowedIdleTimeInConnectionPool) + logger.trace("parking connection", + metadata: ["ahc-connection": "\(connection)"]) + connection.setIdleTimeout(timeout: self.configuration.maximumAllowedIdleTimeInConnectionPool, + logger: self.backgroundActivityLogger) case .closeProvider: + logger.debug("closing provider", + metadata: ["ahc-provider": "\(self)"]) self.closeAndDelete() case .none: break case .parkAnd(let connection, let action): - connection.setIdleTimeout(timeout: self.configuration.maximumAllowedIdleTimeInConnectionPool) - self.execute(action) + logger.trace("parking connection & doing further action", + metadata: ["ahc-connection": "\(connection)", + "ahc-action": "\(action)"]) + connection.setIdleTimeout(timeout: self.configuration.maximumAllowedIdleTimeInConnectionPool, + logger: self.backgroundActivityLogger) + self.execute(action, logger: logger) case .closeAnd(let connection, let action): + logger.trace("closing connection & doing further action", + metadata: ["ahc-connection": "\(connection)", + "ahc-action": "\(action)"]) connection.channel.close(promise: nil) - self.execute(action) + self.execute(action, logger: logger) case .cancel(let connection, let close): + logger.trace("cancelling connection", + metadata: ["ahc-connection": "\(connection)", + "ahc-close": "\(close)"]) connection.cancel().whenComplete { _ in if close { self.closeAndDelete() } } case .fail(let waiter, let error): + logger.debug("failing connection for waiter", + metadata: ["ahc-waiter": "\(waiter)", + "ahc-error": "\(error)"]) waiter.promise.fail(error) } } @@ -350,22 +399,29 @@ class HTTP1ConnectionProvider { } } - func getConnection(preference: HTTPClient.EventLoopPreference, setupComplete: EventLoopFuture<Void>) -> EventLoopFuture<Connection> { + func getConnection(preference: HTTPClient.EventLoopPreference, + setupComplete: EventLoopFuture<Void>, + logger: Logger) -> EventLoopFuture<Connection> { let waiter = Waiter(promise: self.eventLoop.makePromise(), setupComplete: setupComplete, preference: preference) let action: Action = self.lock.withLock { self.state.acquire(waiter: waiter) } - self.execute(action) + self.execute(action, logger: logger) return waiter.promise.futureResult } - func connect(_ result: Result<Channel, Error>, waiter: Waiter, replacing closedConnection: Connection? = nil) { + func connect(_ result: Result<Channel, Error>, + waiter: Waiter, + replacing closedConnection: Connection? = nil, + logger: Logger) { let action: Action switch result { case .success(let channel): + logger.trace("successfully created connection", + metadata: ["ahc-connection": "\(channel)"]) let connection = Connection(channel: channel, provider: self) action = self.lock.withLock { if let closedConnection = closedConnection { @@ -375,17 +431,21 @@ class HTTP1ConnectionProvider { } waiter.promise.succeed(connection) case .failure(let error): + logger.debug("connection attempt failed", + metadata: ["ahc-error": "\(error)"]) action = self.lock.withLock { self.state.connectFailed() } waiter.promise.fail(error) } waiter.setupComplete.whenComplete { _ in - self.execute(action) + self.execute(action, logger: logger) } } - func release(connection: Connection, closing: Bool) { + func release(connection: Connection, closing: Bool, logger: Logger) { + logger.debug("releasing connection, request complete", + metadata: ["ahc-closing": "\(closing)"]) let action: Action = self.lock.withLock { self.state.release(connection: connection, closing: closing) } @@ -396,31 +456,31 @@ class HTTP1ConnectionProvider { case .park, .closeProvider: // Since both `.park` and `.deleteProvider` are terminal in terms of execution, // we can execute them immediately - self.execute(action) + self.execute(action, logger: logger) case .cancel, .closeAnd, .create, .fail, .lease, .parkAnd, .replace: // This is needed to start a new stack, otherwise, since this is called on a previous // future completion handler chain, it will be growing indefinitely until the connection is closed. // We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved. connection.channel.eventLoop.execute { - self.execute(action) + self.execute(action, logger: logger) } } } - func remoteClosed(connection: Connection) { + func remoteClosed(connection: Connection, logger: Logger) { let action: Action = self.lock.withLock { self.state.remoteClosed(connection: connection) } - self.execute(action) + self.execute(action, logger: logger) } - func timeout(connection: Connection) { + func timeout(connection: Connection, logger: Logger) { let action: Action = self.lock.withLock { self.state.timeout(connection: connection) } - self.execute(action) + self.execute(action, logger: logger) } private func closeAndDelete() { @@ -525,17 +585,19 @@ class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler let connection: Connection var eventSent: Bool + let logger: Logger - init(connection: Connection) { + init(connection: Connection, logger: Logger) { self.connection = connection self.eventSent = false + self.logger = logger } // this is needed to detect when remote end closes connection while connection is in the pool idling func channelInactive(context: ChannelHandlerContext) { if !self.eventSent { self.eventSent = true - self.connection.remoteClosed() + self.connection.remoteClosed(logger: self.logger) } } @@ -543,7 +605,7 @@ class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { if !self.eventSent { self.eventSent = true - self.connection.timeout() + self.connection.timeout(logger: self.logger) } } else { context.fireUserInboundEventTriggered(event) diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index a119ce90d..ee0f2613e 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Foundation +import Logging import NIO import NIOConcurrencyHelpers import NIOHTTP1 @@ -21,6 +22,21 @@ import NIOSSL import NIOTLS import NIOTransportServices +extension Logger { + private func requestInfo(_ request: HTTPClient.Request) -> Logger.Metadata.Value { + return "\(request.method) \(request.url)" + } + + func attachingRequestInformation(_ request: HTTPClient.Request, requestID: Int) -> Logger { + var modified = self + modified[metadataKey: "ahc-prev-request"] = nil + modified[metadataKey: "ahc-request-id"] = "\(requestID)" + return modified + } +} + +let globalRequestID = NIOAtomic<Int>.makeAtomic(value: 0) + /// HTTPClient class provides API for request execution. /// /// Example: @@ -54,12 +70,28 @@ public class HTTPClient { var state: State private let stateLock = Lock() + internal static let loggingDisabled = Logger(label: "AHC-do-not-log", factory: { _ in NoOpLogHandler() }) + + /// Create an `HTTPClient` with specified `EventLoopGroup` provider and configuration. + /// + /// - parameters: + /// - eventLoopGroupProvider: Specify how `EventLoopGroup` will be created. + /// - configuration: Client configuration. + public convenience init(eventLoopGroupProvider: EventLoopGroupProvider, + configuration: Configuration = Configuration()) { + self.init(eventLoopGroupProvider: eventLoopGroupProvider, + configuration: configuration, + backgroundActivityLogger: HTTPClient.loggingDisabled) + } + /// Create an `HTTPClient` with specified `EventLoopGroup` provider and configuration. /// /// - parameters: /// - eventLoopGroupProvider: Specify how `EventLoopGroup` will be created. /// - configuration: Client configuration. - public init(eventLoopGroupProvider: EventLoopGroupProvider, configuration: Configuration = Configuration()) { + public required init(eventLoopGroupProvider: EventLoopGroupProvider, + configuration: Configuration = Configuration(), + backgroundActivityLogger: Logger) { self.eventLoopGroupProvider = eventLoopGroupProvider switch self.eventLoopGroupProvider { case .shared(let group): @@ -76,7 +108,8 @@ public class HTTPClient { #endif } self.configuration = configuration - self.pool = ConnectionPool(configuration: configuration) + self.pool = ConnectionPool(configuration: configuration, + backgroundActivityLogger: backgroundActivityLogger) self.state = .upAndRunning } @@ -194,9 +227,19 @@ public class HTTPClient { /// - url: Remote URL. /// - deadline: Point in time by which the request must complete. public func get(url: String, deadline: NIODeadline? = nil) -> EventLoopFuture<Response> { + return self.get(url: url, deadline: deadline, logger: HTTPClient.loggingDisabled) + } + + /// Execute `GET` request using specified URL. + /// + /// - parameters: + /// - url: Remote URL. + /// - deadline: Point in time by which the request must complete. + /// - logger: The logger to use for this request. + public func get(url: String, deadline: NIODeadline? = nil, logger: Logger) -> EventLoopFuture<Response> { do { let request = try Request(url: url, method: .GET) - return self.execute(request: request, deadline: deadline) + return self.execute(request: request, deadline: deadline, logger: logger) } catch { return self.eventLoopGroup.next().makeFailedFuture(error) } @@ -209,9 +252,20 @@ public class HTTPClient { /// - body: Request body. /// - deadline: Point in time by which the request must complete. public func post(url: String, body: Body? = nil, deadline: NIODeadline? = nil) -> EventLoopFuture<Response> { + return self.post(url: url, body: body, deadline: deadline, logger: HTTPClient.loggingDisabled) + } + + /// Execute `POST` request using specified URL. + /// + /// - parameters: + /// - url: Remote URL. + /// - body: Request body. + /// - deadline: Point in time by which the request must complete. + /// - logger: The logger to use for this request. + public func post(url: String, body: Body? = nil, deadline: NIODeadline? = nil, logger: Logger) -> EventLoopFuture<Response> { do { let request = try HTTPClient.Request(url: url, method: .POST, body: body) - return self.execute(request: request, deadline: deadline) + return self.execute(request: request, deadline: deadline, logger: logger) } catch { return self.eventLoopGroup.next().makeFailedFuture(error) } @@ -223,10 +277,22 @@ public class HTTPClient { /// - url: Remote URL. /// - body: Request body. /// - deadline: Point in time by which the request must complete. + /// - logger: The logger to use for this request. public func patch(url: String, body: Body? = nil, deadline: NIODeadline? = nil) -> EventLoopFuture<Response> { + return self.post(url: url, body: body, deadline: deadline, logger: HTTPClient.loggingDisabled) + } + + /// Execute `PATCH` request using specified URL. + /// + /// - parameters: + /// - url: Remote URL. + /// - body: Request body. + /// - deadline: Point in time by which the request must complete. + /// - logger: The logger to use for this request. + public func patch(url: String, body: Body? = nil, deadline: NIODeadline? = nil, logger: Logger) -> EventLoopFuture<Response> { do { let request = try HTTPClient.Request(url: url, method: .PATCH, body: body) - return self.execute(request: request, deadline: deadline) + return self.execute(request: request, deadline: deadline, logger: logger) } catch { return self.eventLoopGroup.next().makeFailedFuture(error) } @@ -239,9 +305,20 @@ public class HTTPClient { /// - body: Request body. /// - deadline: Point in time by which the request must complete. public func put(url: String, body: Body? = nil, deadline: NIODeadline? = nil) -> EventLoopFuture<Response> { + return self.put(url: url, body: body, deadline: deadline, logger: HTTPClient.loggingDisabled) + } + + /// Execute `PUT` request using specified URL. + /// + /// - parameters: + /// - url: Remote URL. + /// - body: Request body. + /// - deadline: Point in time by which the request must complete. + /// - logger: The logger to use for this request. + public func put(url: String, body: Body? = nil, deadline: NIODeadline? = nil, logger: Logger) -> EventLoopFuture<Response> { do { let request = try HTTPClient.Request(url: url, method: .PUT, body: body) - return self.execute(request: request, deadline: deadline) + return self.execute(request: request, deadline: deadline, logger: logger) } catch { return self.eventLoopGroup.next().makeFailedFuture(error) } @@ -253,9 +330,18 @@ public class HTTPClient { /// - url: Remote URL. /// - deadline: The time when the request must have been completed by. public func delete(url: String, deadline: NIODeadline? = nil) -> EventLoopFuture<Response> { + return self.delete(url: url, deadline: deadline, logger: HTTPClient.loggingDisabled) + } + + /// Execute `DELETE` request using specified URL. + /// + /// - parameters: + /// - url: Remote URL. + /// - deadline: The time when the request must have been completed by. + public func delete(url: String, deadline: NIODeadline? = nil, logger: Logger) -> EventLoopFuture<Response> { do { let request = try Request(url: url, method: .DELETE) - return self.execute(request: request, deadline: deadline) + return self.execute(request: request, deadline: deadline, logger: logger) } catch { return self.eventLoopGroup.next().makeFailedFuture(error) } @@ -267,8 +353,18 @@ public class HTTPClient { /// - request: HTTP request to execute. /// - deadline: Point in time by which the request must complete. public func execute(request: Request, deadline: NIODeadline? = nil) -> EventLoopFuture<Response> { + return self.execute(request: request, deadline: deadline, logger: HTTPClient.loggingDisabled) + } + + /// Execute arbitrary HTTP request using specified URL. + /// + /// - parameters: + /// - request: HTTP request to execute. + /// - deadline: Point in time by which the request must complete. + /// - logger: The logger to use for this request. + public func execute(request: Request, deadline: NIODeadline? = nil, logger: Logger) -> EventLoopFuture<Response> { let accumulator = ResponseAccumulator(request: request) - return self.execute(request: request, delegate: accumulator, deadline: deadline).futureResult + return self.execute(request: request, delegate: accumulator, deadline: deadline, logger: logger).futureResult } /// Execute arbitrary HTTP request using specified URL. @@ -278,8 +374,25 @@ public class HTTPClient { /// - eventLoop: NIO Event Loop preference. /// - deadline: Point in time by which the request must complete. public func execute(request: Request, eventLoop: EventLoopPreference, deadline: NIODeadline? = nil) -> EventLoopFuture<Response> { + return self.execute(request: request, + eventLoop: eventLoop, + deadline: deadline, + logger: HTTPClient.loggingDisabled) + } + + /// Execute arbitrary HTTP request and handle response processing using provided delegate. + /// + /// - parameters: + /// - request: HTTP request to execute. + /// - eventLoop: NIO Event Loop preference. + /// - deadline: Point in time by which the request must complete. + /// - logger: The logger to use for this request. + public func execute(request: Request, + eventLoop eventLoopPreference: EventLoopPreference, + deadline: NIODeadline? = nil, + logger: Logger?) -> EventLoopFuture<Response> { let accumulator = ResponseAccumulator(request: request) - return self.execute(request: request, delegate: accumulator, eventLoop: eventLoop, deadline: deadline).futureResult + return self.execute(request: request, delegate: accumulator, eventLoop: eventLoopPreference, deadline: deadline, logger: logger).futureResult } /// Execute arbitrary HTTP request and handle response processing using provided delegate. @@ -291,7 +404,21 @@ public class HTTPClient { public func execute<Delegate: HTTPClientResponseDelegate>(request: Request, delegate: Delegate, deadline: NIODeadline? = nil) -> Task<Delegate.Response> { - return self.execute(request: request, delegate: delegate, eventLoop: .indifferent, deadline: deadline) + return self.execute(request: request, delegate: delegate, deadline: deadline, logger: HTTPClient.loggingDisabled) + } + + /// Execute arbitrary HTTP request and handle response processing using provided delegate. + /// + /// - parameters: + /// - request: HTTP request to execute. + /// - delegate: Delegate to process response parts. + /// - deadline: Point in time by which the request must complete. + /// - logger: The logger to use for this request. + public func execute<Delegate: HTTPClientResponseDelegate>(request: Request, + delegate: Delegate, + deadline: NIODeadline? = nil, + logger: Logger) -> Task<Delegate.Response> { + return self.execute(request: request, delegate: delegate, eventLoop: .indifferent, deadline: deadline, logger: logger) } /// Execute arbitrary HTTP request and handle response processing using provided delegate. @@ -301,10 +428,31 @@ public class HTTPClient { /// - delegate: Delegate to process response parts. /// - eventLoop: NIO Event Loop preference. /// - deadline: Point in time by which the request must complete. + /// - logger: The logger to use for this request. public func execute<Delegate: HTTPClientResponseDelegate>(request: Request, delegate: Delegate, eventLoop eventLoopPreference: EventLoopPreference, deadline: NIODeadline? = nil) -> Task<Delegate.Response> { + return self.execute(request: request, + delegate: delegate, + eventLoop: eventLoopPreference, + deadline: deadline, + logger: HTTPClient.loggingDisabled) + } + + /// Execute arbitrary HTTP request and handle response processing using provided delegate. + /// + /// - parameters: + /// - request: HTTP request to execute. + /// - delegate: Delegate to process response parts. + /// - eventLoop: NIO Event Loop preference. + /// - deadline: Point in time by which the request must complete. + public func execute<Delegate: HTTPClientResponseDelegate>(request: Request, + delegate: Delegate, + eventLoop eventLoopPreference: EventLoopPreference, + deadline: NIODeadline? = nil, + logger originalLogger: Logger?) -> Task<Delegate.Response> { + let logger = (originalLogger ?? HTTPClient.loggingDisabled).attachingRequestInformation(request, requestID: globalRequestID.add(1)) let taskEL: EventLoop switch eventLoopPreference.preference { case .indifferent: @@ -318,13 +466,19 @@ public class HTTPClient { case .testOnly_exact(_, delegateOn: let delegateEL): taskEL = delegateEL } + logger.trace("selected EventLoop for task given the preference", + metadata: ["ahc-eventloop": "\(taskEL)", + "ahc-el-preference": "\(eventLoopPreference)"]) let failedTask: Task<Delegate.Response>? = self.stateLock.withLock { switch state { case .upAndRunning: return nil case .shuttingDown, .shutDown: - return Task<Delegate.Response>.failedTask(eventLoop: taskEL, error: HTTPClientError.alreadyShutdown) + logger.debug("client is shutting down, failing request") + return Task<Delegate.Response>.failedTask(eventLoop: taskEL, + error: HTTPClientError.alreadyShutdown, + logger: logger) } } @@ -349,11 +503,21 @@ public class HTTPClient { redirectHandler = nil } - let task = Task<Delegate.Response>(eventLoop: taskEL) + let task = Task<Delegate.Response>(eventLoop: taskEL, logger: logger) let setupComplete = taskEL.makePromise(of: Void.self) - let connection = self.pool.getConnection(for: request, preference: eventLoopPreference, on: taskEL, deadline: deadline, setupComplete: setupComplete.futureResult) - + let connection = self.pool.getConnection(request, + preference: eventLoopPreference, + taskEventLoop: taskEL, + deadline: deadline, + setupComplete: setupComplete.futureResult, + logger: logger) connection.flatMap { connection -> EventLoopFuture<Void> in + logger.debug("got connection for request", + metadata: ["ahc-connection": "\(connection)", + "ahc-request": "\(request.method) \(request.url)", + "ahc-channel-el": "\(connection.channel.eventLoop)", + "ahc-task-el": "\(taskEL)"]) + let channel = connection.channel let future: EventLoopFuture<Void> if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) { @@ -367,7 +531,8 @@ public class HTTPClient { kind: request.kind, delegate: delegate, redirectHandler: redirectHandler, - ignoreUncleanSSLShutdown: self.configuration.ignoreUncleanSSLShutdown) + ignoreUncleanSSLShutdown: self.configuration.ignoreUncleanSSLShutdown, + logger: logger) return channel.pipeline.addHandler(taskHandler) }.flatMap { task.setConnection(connection) @@ -386,7 +551,7 @@ public class HTTPClient { return channel.eventLoop.makeSucceededFuture(()) } }.flatMapError { error in - connection.release(closing: true) + connection.release(closing: true, logger: logger) return channel.eventLoop.makeFailedFuture(error) } }.always { _ in @@ -421,7 +586,7 @@ public class HTTPClient { /// `HTTPClient` configuration. public struct Configuration { /// TLS configuration, defaults to `TLSConfiguration.forClient()`. - public var tlsConfiguration: TLSConfiguration? + public var tlsConfiguration: Optional<TLSConfiguration> /// Enables following 3xx redirects automatically, defaults to `false`. /// /// Following redirects are supported: @@ -436,7 +601,7 @@ public class HTTPClient { /// Default client timeout, defaults to no timeouts. public var timeout: Timeout /// Timeout of pooled connections - public var maximumAllowedIdleTimeInConnectionPool: TimeAmount? + public var maximumAllowedIdleTimeInConnectionPool: Optional<TimeAmount> /// Upstream proxy, defaults to no proxy. public var proxy: Proxy? /// Enables automatic body decompression. Supported algorithms are gzip and deflate. @@ -484,13 +649,30 @@ public class HTTPClient { proxy: Proxy? = nil, ignoreUncleanSSLShutdown: Bool = false, decompression: Decompression = .disabled) { - self.tlsConfiguration = TLSConfiguration.forClient(certificateVerification: certificateVerification) - self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration() - self.timeout = timeout - self.maximumAllowedIdleTimeInConnectionPool = maximumAllowedIdleTimeInConnectionPool - self.proxy = proxy - self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown - self.decompression = decompression + self.init(tlsConfiguration: TLSConfiguration.forClient(certificateVerification: certificateVerification), + redirectConfiguration: redirectConfiguration, + timeout: timeout, + maximumAllowedIdleTimeInConnectionPool: maximumAllowedIdleTimeInConnectionPool, + proxy: proxy, + ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown, + decompression: decompression) + } + + public init(certificateVerification: CertificateVerification, + redirectConfiguration: RedirectConfiguration? = nil, + timeout: Timeout = Timeout(), + maximumAllowedIdleTimeInConnectionPool: TimeAmount = .seconds(60), + proxy: Proxy? = nil, + ignoreUncleanSSLShutdown: Bool = false, + decompression: Decompression = .disabled, + backgroundActivityLogger: Logger?) { + self.init(tlsConfiguration: TLSConfiguration.forClient(certificateVerification: certificateVerification), + redirectConfiguration: redirectConfiguration, + timeout: timeout, + maximumAllowedIdleTimeInConnectionPool: maximumAllowedIdleTimeInConnectionPool, + proxy: proxy, + ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown, + decompression: decompression) } public init(certificateVerification: CertificateVerification, diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 77220f068..3dc32bf84 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Foundation +import Logging import NIO import NIOConcurrencyHelpers import NIOFoundationCompat @@ -533,17 +534,19 @@ extension HTTPClient { var connection: Connection? var cancelled: Bool let lock: Lock + let logger: Logger // We are okay to store the logger here because a Task is for only ond request. - init(eventLoop: EventLoop) { + init(eventLoop: EventLoop, logger: Logger) { self.eventLoop = eventLoop self.promise = eventLoop.makePromise() self.completion = self.promise.futureResult.map { _ in } self.cancelled = false self.lock = Lock() + self.logger = logger } - static func failedTask(eventLoop: EventLoop, error: Error) -> Task<Response> { - let task = self.init(eventLoop: eventLoop) + static func failedTask(eventLoop: EventLoop, error: Error, logger: Logger) -> Task<Response> { + let task = self.init(eventLoop: eventLoop, logger: logger) task.promise.fail(error) return task } @@ -585,13 +588,18 @@ extension HTTPClient { } } - func succeed<Delegate: HTTPClientResponseDelegate>(promise: EventLoopPromise<Response>?, with value: Response, delegateType: Delegate.Type, closing: Bool) { - self.releaseAssociatedConnection(delegateType: delegateType, closing: closing).whenSuccess { + func succeed<Delegate: HTTPClientResponseDelegate>(promise: EventLoopPromise<Response>?, + with value: Response, + delegateType: Delegate.Type, + closing: Bool) { + self.releaseAssociatedConnection(delegateType: delegateType, + closing: closing).whenSuccess { promise?.succeed(value) } } - func fail<Delegate: HTTPClientResponseDelegate>(with error: Error, delegateType: Delegate.Type) { + func fail<Delegate: HTTPClientResponseDelegate>(with error: Error, + delegateType: Delegate.Type) { if let connection = self.connection { self.releaseAssociatedConnection(delegateType: delegateType, closing: true) .whenSuccess { @@ -601,13 +609,14 @@ extension HTTPClient { } } - func releaseAssociatedConnection<Delegate: HTTPClientResponseDelegate>(delegateType: Delegate.Type, closing: Bool) -> EventLoopFuture<Void> { + func releaseAssociatedConnection<Delegate: HTTPClientResponseDelegate>(delegateType: Delegate.Type, + closing: Bool) -> EventLoopFuture<Void> { if let connection = self.connection { // remove read timeout handler return connection.removeHandler(IdleStateHandler.self).flatMap { connection.removeHandler(TaskHandler<Delegate>.self) }.map { - connection.release(closing: closing) + connection.release(closing: closing, logger: self.logger) }.flatMapError { error in fatalError("Couldn't remove taskHandler: \(error)") } @@ -639,6 +648,7 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann let delegate: Delegate let redirectHandler: RedirectHandler<Delegate.Response>? let ignoreUncleanSSLShutdown: Bool + let logger: Logger // We are okay to store the logger here because a TaskHandler is just for one request. var state: State = .idle var pendingRead = false @@ -656,12 +666,14 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann kind: HTTPClient.Request.Kind, delegate: Delegate, redirectHandler: RedirectHandler<Delegate.Response>?, - ignoreUncleanSSLShutdown: Bool) { + ignoreUncleanSSLShutdown: Bool, + logger: Logger) { self.task = task self.delegate = delegate self.redirectHandler = redirectHandler self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown self.kind = kind + self.logger = logger } } @@ -717,7 +729,10 @@ extension TaskHandler { do { let result = try body(self.task) - self.task.succeed(promise: promise, with: result, delegateType: Delegate.self, closing: self.closing) + self.task.succeed(promise: promise, + with: result, + delegateType: Delegate.self, + closing: self.closing) } catch { self.task.fail(with: error, delegateType: Delegate.self) } diff --git a/Sources/AsyncHTTPClient/NoOpLogHandler.swift b/Sources/AsyncHTTPClient/NoOpLogHandler.swift new file mode 100644 index 000000000..26419547b --- /dev/null +++ b/Sources/AsyncHTTPClient/NoOpLogHandler.swift @@ -0,0 +1,40 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2020 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging + +internal struct NoOpLogHandler: LogHandler { + func log(level: Logger.Level, message: Logger.Message, metadata: Logger.Metadata?, file: String, function: String, line: UInt) {} + + subscript(metadataKey _: String) -> Logger.Metadata.Value? { + get { + return nil + } + set {} + } + + var metadata: Logger.Metadata { + get { + return [:] + } + set {} + } + + var logLevel: Logger.Level { + get { + return .critical + } + set {} + } +} diff --git a/Sources/AsyncHTTPClient/StringConvertibleInstances.swift b/Sources/AsyncHTTPClient/StringConvertibleInstances.swift new file mode 100644 index 000000000..1039dc3f1 --- /dev/null +++ b/Sources/AsyncHTTPClient/StringConvertibleInstances.swift @@ -0,0 +1,31 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2020 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +extension Connection: CustomStringConvertible { + var description: String { + return "\(self.channel)" + } +} + +extension HTTP1ConnectionProvider.Waiter: CustomStringConvertible { + var description: String { + return "HTTP1ConnectionProvider.Waiter(\(self.preference))" + } +} + +extension HTTPClient.EventLoopPreference: CustomStringConvertible { + public var description: String { + return "\(self.preference)" + } +} diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift index ef7958b6a..16ab7c8f9 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -24,12 +24,13 @@ import NIOTransportServices import XCTest class ConnectionPoolTests: XCTestCase { + var eventLoop: EmbeddedEventLoop! + var http1ConnectionProvider: HTTP1ConnectionProvider! + struct TempError: Error {} func testPending() { - let eventLoop = EmbeddedEventLoop() - - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) var snapshot = state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) @@ -51,9 +52,7 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Acquire Tests func testAcquireWhenEmpty() { - let eventLoop = EmbeddedEventLoop() - - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) var snapshot = state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) @@ -62,7 +61,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - let action = state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) + let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .create(let waiter): waiter.promise.fail(TempError()) @@ -79,31 +78,29 @@ class ConnectionPoolTests: XCTestCase { } func testAcquireWhenAvailable() throws { - let eventLoop = EmbeddedEventLoop() let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(connection) snapshot.openedConnectionsCount = 1 - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .lease(let connection, let waiter): waiter.promise.succeed(connection) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -111,34 +108,32 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup, since we don't call release + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } } func testAcquireWhenUnavailable() throws { - let eventLoop = EmbeddedEventLoop() - - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.openedConnectionsCount = 8 - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(8, snapshot.openedConnectionsCount) - let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .none: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(1, snapshot.waiters.count) @@ -149,18 +144,16 @@ class ConnectionPoolTests: XCTestCase { } // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) - _ = try provider.close().wait() + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) } // MARK: - Acquire on Specific EL Tests func testAcquireWhenEmptySpecificEL() { - let eventLoop = EmbeddedEventLoop() - - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) var snapshot = state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) @@ -169,7 +162,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - let action = state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: eventLoop))) + let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) switch action { case .create(let waiter): waiter.promise.fail(TempError()) @@ -188,14 +181,13 @@ class ConnectionPoolTests: XCTestCase { func testAcquireWhenAvailableSpecificEL() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(connection) snapshot.openedConnectionsCount = 1 - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -203,12 +195,12 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.acquire(waiter: .init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) switch action { case .lease(let connection, let waiter): waiter.promise.succeed(connection) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -216,26 +208,25 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup, since we don't call release + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } } func testAcquireReplace() throws { - let eventLoop = EmbeddedEventLoop() let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(connection) snapshot.openedConnectionsCount = 8 - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -243,12 +234,12 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(8, snapshot.openedConnectionsCount) - let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: eventLoop))) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) switch action { case .replace(_, let waiter): waiter.promise.fail(TempError()) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -259,21 +250,18 @@ class ConnectionPoolTests: XCTestCase { } // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) - _ = try provider.close().wait() + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) } func testAcquireWhenUnavailableSpecificEL() throws { - let eventLoop = EmbeddedEventLoop() - - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.openedConnectionsCount = 8 - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -281,10 +269,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(8, snapshot.openedConnectionsCount) - let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: eventLoop))) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) switch action { case .none: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(1, snapshot.waiters.count) @@ -295,48 +283,45 @@ class ConnectionPoolTests: XCTestCase { } // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) - _ = try provider.close().wait() + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) } // MARK: - Acquire Errors Tests func testAcquireWhenClosed() { - let eventLoop = EmbeddedEventLoop() - - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) var snapshot = state.testsOnly_getInternalState() snapshot.state = .closed state.testsOnly_setInternalState(snapshot) XCTAssertFalse(state.enqueue()) - let promise = eventLoop.makePromise(of: Connection.self) - let action = state.acquire(waiter: .init(promise: promise, setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) + let promise = self.eventLoop.makePromise(of: Connection.self) + let action = state.acquire(waiter: .init(promise: promise, setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .fail(let waiter, let error): waiter.promise.fail(error) default: XCTFail("Unexpected action: \(action)") } + print(state.testsOnly_getInternalState()) } // MARK: - Release Tests func testReleaseAliveConnectionEmptyQueue() throws { - let channel = ActiveChannel() - - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + let channel = ActiveChannel(eventLoop: self.eventLoop) + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -344,35 +329,37 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: false) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) switch action { case .park: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - // cleanp - provider.closePromise.succeed(()) + // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + snapshot.openedConnectionsCount = 0 + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } } func testReleaseAliveButClosingConnectionEmptyQueue() throws { - let channel = ActiveChannel() + let channel = ActiveChannel(eventLoop: self.eventLoop) - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -380,18 +367,15 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: true) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) switch action { case .closeProvider: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - - // cleanup - provider.closePromise.succeed(()) default: XCTFail("Unexpected action: \(action)") } @@ -400,15 +384,14 @@ class ConnectionPoolTests: XCTestCase { func testReleaseInactiveConnectionEmptyQueue() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -416,18 +399,15 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: true) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) switch action { case .closeProvider: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - - // cleanup - provider.closePromise.succeed(()) default: XCTFail("Unexpected action: \(action)") } @@ -436,17 +416,16 @@ class ConnectionPoolTests: XCTestCase { func testReleaseInactiveConnectionEmptyQueueHasConnections() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) - let available = Connection(channel: channel, provider: provider) + let available = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(available) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -454,10 +433,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(2, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: true) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) switch action { case .none: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -465,25 +444,27 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup - provider.closePromise.succeed(()) + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + snapshot.openedConnectionsCount = 0 + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } } func testReleaseAliveConnectionHasWaiter() throws { - let channel = ActiveChannel() + let channel = ActiveChannel(eventLoop: self.eventLoop) - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -491,11 +472,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: false) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) switch action { case .lease(let connection, let waiter): // XCTAssertTrue(connection.isInUse) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -503,9 +484,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } @@ -514,16 +497,15 @@ class ConnectionPoolTests: XCTestCase { func testReleaseInactiveConnectionHasWaitersNoConnections() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -531,10 +513,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: true) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) switch action { case .create(let waiter): - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -542,10 +524,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.fail(TempError()) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } @@ -554,19 +537,18 @@ class ConnectionPoolTests: XCTestCase { func testReleaseInactiveConnectionHasWaitersHasConnections() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - let available = Connection(channel: channel, provider: provider) + let available = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(available) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -574,10 +556,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(2, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: false) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) switch action { case .lease(let connection, let waiter): - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -587,8 +569,7 @@ class ConnectionPoolTests: XCTestCase { // cleanup waiter.promise.succeed(connection) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } @@ -597,18 +578,17 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Release on Specific EL Tests func testReleaseAliveConnectionSameELHasWaiterSpecificEL() throws { - let channel = ActiveChannel() + let channel = ActiveChannel(eventLoop: self.eventLoop) - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -616,10 +596,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: false) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) switch action { case .lease(let connection, let waiter): - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -630,27 +610,29 @@ class ConnectionPoolTests: XCTestCase { // cleanup waiter.promise.succeed(connection) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } } func testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL() throws { - let channel = ActiveChannel() - let eventLoop = EmbeddedEventLoop() - - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + let differentEL = EmbeddedEventLoop() + defer { + XCTAssertNoThrow(try differentEL.syncShutdownGracefully()) + } + let channel = ActiveChannel(eventLoop: differentEL) // Channel on different EL, that's important for the test. + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: eventLoop))) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(of: Connection.self), + setupComplete: self.eventLoop.makeSucceededFuture(()), + preference: .delegateAndChannel(on: self.eventLoop))) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -658,10 +640,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: false) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) switch action { case .parkAnd(let connection, .create(let waiter)): - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertFalse(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -672,30 +654,28 @@ class ConnectionPoolTests: XCTestCase { // cleanup waiter.promise.succeed(connection) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } } func testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL() throws { - let channel = ActiveChannel() + let channel = ActiveChannel(eventLoop: self.eventLoop) let otherChannel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - let available = Connection(channel: otherChannel, provider: provider) + let available = Connection(channel: otherChannel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(available) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -703,10 +683,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(2, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: false) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) switch action { case .parkAnd(let connection, .lease(let replacement, let waiter)): - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertFalse(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(replacement))) XCTAssertEqual(1, snapshot.availableConnections.count) @@ -718,30 +698,28 @@ class ConnectionPoolTests: XCTestCase { // cleanup waiter.promise.succeed(replacement) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } } func testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL() throws { - let channel = ActiveChannel() + let channel = ActiveChannel(eventLoop: self.eventLoop) let otherChannel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 8 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - let available = Connection(channel: channel, provider: provider) + let available = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(available) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -749,10 +727,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(8, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: false) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) switch action { case .replace(let connection, let waiter): - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -763,8 +741,7 @@ class ConnectionPoolTests: XCTestCase { // cleanup waiter.promise.fail(TempError()) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } @@ -774,19 +751,18 @@ class ConnectionPoolTests: XCTestCase { let channel = EmbeddedChannel() let otherChannel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - let available = Connection(channel: otherChannel, provider: provider) + let available = Connection(channel: otherChannel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(available) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -794,10 +770,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(2, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: false) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) switch action { case .lease(let connection, let waiter): - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertTrue(connection === available) XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertEqual(0, snapshot.availableConnections.count) @@ -809,8 +785,7 @@ class ConnectionPoolTests: XCTestCase { // cleanup waiter.promise.succeed(connection) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } @@ -820,19 +795,18 @@ class ConnectionPoolTests: XCTestCase { let channel = EmbeddedChannel() let otherChannel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.leasedConnections.insert(ConnectionKey(connection)) snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - let available = Connection(channel: channel, provider: provider) + let available = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(available) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -840,10 +814,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(2, snapshot.openedConnectionsCount) - let action = provider.state.release(connection: connection, closing: false) + let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) switch action { case .create(let waiter): - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -853,8 +827,7 @@ class ConnectionPoolTests: XCTestCase { // cleanup waiter.promise.fail(TempError()) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } @@ -863,13 +836,10 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Next Waiter Tests func testNextWaiterEmptyQueue() throws { - let channel = ActiveChannel() - - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -877,36 +847,31 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - let action = provider.state.processNextWaiter() + let action = self.http1ConnectionProvider.state.processNextWaiter() switch action { case .closeProvider: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - - // cleanup - provider.closePromise.succeed(()) default: XCTFail("Unexpected action: \(action)") } } func testNextWaiterEmptyQueueHasConnections() throws { - let channel = ActiveChannel() - - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + let channel = ActiveChannel(eventLoop: self.eventLoop) + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 - let available = Connection(channel: channel, provider: provider) + let available = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(available) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -914,10 +879,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.processNextWaiter() + let action = self.http1ConnectionProvider.state.processNextWaiter() switch action { case .none: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -925,7 +890,12 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup - provider.closePromise.succeed(()) + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + XCTAssertNoThrow(try available.close().wait()) + snapshot.availableConnections.removeAll() + snapshot.openedConnectionsCount = 0 + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } @@ -934,17 +904,16 @@ class ConnectionPoolTests: XCTestCase { func testNextWaiterHasWaitersHasConnections() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - let available = Connection(channel: channel, provider: provider) + let available = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(available) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -952,10 +921,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.processNextWaiter() + let action = self.http1ConnectionProvider.state.processNextWaiter() switch action { case .lease(let connection, let waiter): - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -964,10 +933,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } @@ -976,17 +946,16 @@ class ConnectionPoolTests: XCTestCase { func testNextWaiterHasWaitersHasSameELConnectionsSpecificEL() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) - let available = Connection(channel: channel, provider: provider) + let available = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(available) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -994,10 +963,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.processNextWaiter() + let action = self.http1ConnectionProvider.state.processNextWaiter() switch action { case .lease(let connection, let waiter): - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -1006,10 +975,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } @@ -1017,19 +987,17 @@ class ConnectionPoolTests: XCTestCase { func testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL() throws { let channel = EmbeddedChannel() - let eventLoop = EmbeddedEventLoop() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() snapshot.pending = 0 snapshot.openedConnectionsCount = 1 - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: eventLoop))) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) - let available = Connection(channel: channel, provider: provider) + let available = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(available) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -1037,10 +1005,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.processNextWaiter() + let action = self.http1ConnectionProvider.state.processNextWaiter() switch action { case .create(let waiter): - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -1048,10 +1016,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(2, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.fail(TempError()) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } @@ -1062,15 +1031,14 @@ class ConnectionPoolTests: XCTestCase { func testTimeoutLeasedConnection() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.pending = 0 snapshot.openedConnectionsCount = 1 snapshot.leasedConnections.insert(ConnectionKey(connection)) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -1078,18 +1046,15 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.timeout(connection: connection) + let action = self.http1ConnectionProvider.state.timeout(connection: connection) switch action { case .none: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - provider.closePromise.succeed(()) default: XCTFail("Unexpected action: \(action)") } @@ -1098,15 +1063,14 @@ class ConnectionPoolTests: XCTestCase { func testTimeoutAvailableConnection() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.pending = 0 snapshot.openedConnectionsCount = 1 snapshot.availableConnections.append(connection) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -1114,7 +1078,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.timeout(connection: connection) + let action = self.http1ConnectionProvider.state.timeout(connection: connection) switch action { case .closeAnd(_, let after): switch after { @@ -1123,15 +1087,12 @@ class ConnectionPoolTests: XCTestCase { default: XCTFail("Unexpected action: \(action)") } - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - - // cleanup - provider.closePromise.succeed(()) default: XCTFail("Unexpected action: \(action)") } @@ -1140,15 +1101,14 @@ class ConnectionPoolTests: XCTestCase { func testRemoteClosedLeasedConnection() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.pending = 0 snapshot.openedConnectionsCount = 1 snapshot.leasedConnections.insert(ConnectionKey(connection)) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -1156,18 +1116,15 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.remoteClosed(connection: connection) + let action = self.http1ConnectionProvider.state.remoteClosed(connection: connection) switch action { case .none: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - provider.closePromise.succeed(()) default: XCTFail("Unexpected action: \(action)") } @@ -1176,15 +1133,14 @@ class ConnectionPoolTests: XCTestCase { func testRemoteClosedAvailableConnection() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.pending = 0 snapshot.openedConnectionsCount = 1 snapshot.availableConnections.append(connection) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -1192,18 +1148,15 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.remoteClosed(connection: connection) + let action = self.http1ConnectionProvider.state.remoteClosed(connection: connection) switch action { case .closeProvider: - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - - // cleanup - provider.closePromise.succeed(()) default: XCTFail("Unexpected action: \(action)") } @@ -1212,16 +1165,14 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Connection Tests func testConnectionReleaseActive() throws { - let channel = ActiveChannel() - - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + let channel = ActiveChannel(eventLoop: self.eventLoop) + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.openedConnectionsCount = 1 snapshot.leasedConnections.insert(ConnectionKey(connection)) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -1229,10 +1180,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - connection.release(closing: false) + connection.release(closing: false, logger: HTTPClient.loggingDisabled) // XCTAssertFalse(connection.isInUse) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -1240,22 +1191,22 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) snapshot.pending = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) } func testConnectionReleaseInactive() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.openedConnectionsCount = 1 snapshot.leasedConnections.insert(ConnectionKey(connection)) - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -1263,9 +1214,9 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - connection.release(closing: true) + connection.release(closing: true, logger: HTTPClient.loggingDisabled) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -1273,22 +1224,22 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) snapshot.pending = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) } func testConnectionRemoteCloseRelease() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(connection) snapshot.openedConnectionsCount = 1 - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -1296,9 +1247,9 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - connection.remoteClosed() + connection.remoteClosed(logger: HTTPClient.loggingDisabled) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -1306,22 +1257,22 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) snapshot.pending = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) } func testConnectionTimeoutRelease() throws { let channel = EmbeddedChannel() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(connection) snapshot.openedConnectionsCount = 1 - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -1329,9 +1280,9 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - connection.timeout() + connection.timeout(logger: HTTPClient.loggingDisabled) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -1339,23 +1290,21 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) snapshot.pending = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) } func testAcquireAvailableBecomesUnavailable() throws { - let eventLoop = EmbeddedEventLoop() - let channel = ActiveChannel() + let channel = ActiveChannel(eventLoop: self.eventLoop) + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) - var snapshot = provider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: provider) + let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) snapshot.availableConnections.append(connection) snapshot.openedConnectionsCount = 1 - provider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) @@ -1363,13 +1312,13 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .lease(let connection, let waiter): // Since this connection is already in use, this should be a no-op and state should not have changed from normal lease - connection.timeout() + connection.timeout(logger: HTTPClient.loggingDisabled) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertTrue(connection.isActiveEstimation) XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertEqual(0, snapshot.availableConnections.count) @@ -1379,10 +1328,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // This is unrecoverable, but in this case we create a new connection, so state again should not change, even though release will be called - // This is important to prevent provider deletion since connection is released and there could be 0 waiters - connection.remoteClosed() + // This is important to preventself.http1ConnectionProvider deletion since connection is released and there could be 0 waiters + connection.remoteClosed(logger: HTTPClient.loggingDisabled) - snapshot = provider.state.testsOnly_getInternalState() + snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -1393,16 +1342,89 @@ class ConnectionPoolTests: XCTestCase { waiter.promise.succeed(connection) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) snapshot.openedConnectionsCount = 0 - provider.state.testsOnly_setInternalState(snapshot) - provider.closePromise.succeed(()) + self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } } + + override func setUp() { + XCTAssertNil(self.eventLoop) + XCTAssertNil(self.http1ConnectionProvider) + self.eventLoop = EmbeddedEventLoop() + XCTAssertNoThrow(self.http1ConnectionProvider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), + eventLoop: self.eventLoop, + configuration: .init(), + pool: .init(configuration: .init(), + backgroundActivityLogger: HTTPClient.loggingDisabled), + backgroundActivityLogger: HTTPClient.loggingDisabled)) + } + + override func tearDown() { + XCTAssertNotNil(self.eventLoop) + XCTAssertNotNil(self.http1ConnectionProvider) + /* BEGIN workaround for #232, this whole block is to be replaced by the commented out line below */ + // not closing the provider here (https://github.com/swift-server/async-http-client/issues/232) + var state = self.http1ConnectionProvider.state.testsOnly_getInternalState() + if state.pending == 1, state.waiters.isEmpty, state.leasedConnections.isEmpty, state.openedConnectionsCount == 0 { + state.pending = 0 + self.http1ConnectionProvider.state.testsOnly_setInternalState(state) + } + self.http1ConnectionProvider.closePromise.succeed(()) + /* END workaround for #232 */ + XCTAssertNoThrow(try self.http1ConnectionProvider.close().wait()) + XCTAssertNoThrow(try self.eventLoop.syncShutdownGracefully()) + self.eventLoop = nil + self.http1ConnectionProvider = nil + } } -class ActiveChannel: Channel { +class ActiveChannel: Channel, ChannelCore { + struct NotImplementedError: Error {} + + func localAddress0() throws -> SocketAddress { + throw NotImplementedError() + } + + func remoteAddress0() throws -> SocketAddress { + throw NotImplementedError() + } + + func register0(promise: EventLoopPromise<Void>?) { + promise?.fail(NotImplementedError()) + } + + func bind0(to: SocketAddress, promise: EventLoopPromise<Void>?) { + promise?.fail(NotImplementedError()) + } + + func connect0(to: SocketAddress, promise: EventLoopPromise<Void>?) { + promise?.fail(NotImplementedError()) + } + + func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) { + promise?.fail(NotImplementedError()) + } + + func flush0() {} + + func read0() {} + + func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) { + promise?.succeed(()) + } + + func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) { + promise?.fail(NotImplementedError()) + } + + func channelRead0(_: NIOAny) {} + + func errorCaught0(error: Error) {} + var allocator: ByteBufferAllocator var closeFuture: EventLoopFuture<Void> var eventLoop: EventLoop @@ -1413,14 +1435,14 @@ class ActiveChannel: Channel { var isWritable: Bool = true var isActive: Bool = true - init() { + init(eventLoop: EmbeddedEventLoop) { self.allocator = ByteBufferAllocator() - self.eventLoop = EmbeddedEventLoop() + self.eventLoop = eventLoop self.closeFuture = self.eventLoop.makeSucceededFuture(()) } var _channelCore: ChannelCore { - preconditionFailure("Not implemented") + return self } var pipeline: ChannelPipeline { diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index 98085cf14..77582e3cc 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -39,14 +39,15 @@ class HTTPClientInternalTests: XCTestCase { func testHTTPPartsHandler() throws { let channel = EmbeddedChannel() let recorder = RecordingHandler<HTTPClientResponsePart, HTTPClientRequestPart>() - let task = Task<Void>(eventLoop: channel.eventLoop) + let task = Task<Void>(eventLoop: channel.eventLoop, logger: HTTPClient.loggingDisabled) try channel.pipeline.addHandler(recorder).wait() try channel.pipeline.addHandler(TaskHandler(task: task, kind: .host, delegate: TestHTTPDelegate(), redirectHandler: nil, - ignoreUncleanSSLShutdown: false)).wait() + ignoreUncleanSSLShutdown: false, + logger: HTTPClient.loggingDisabled)).wait() var request = try Request(url: "http://localhost/get") request.headers.add(name: "X-Test-Header", value: "X-Test-Value") @@ -69,14 +70,15 @@ class HTTPClientInternalTests: XCTestCase { func testBadHTTPRequest() throws { let channel = EmbeddedChannel() let recorder = RecordingHandler<HTTPClientResponsePart, HTTPClientRequestPart>() - let task = Task<Void>(eventLoop: channel.eventLoop) + let task = Task<Void>(eventLoop: channel.eventLoop, logger: HTTPClient.loggingDisabled) XCTAssertNoThrow(try channel.pipeline.addHandler(recorder).wait()) XCTAssertNoThrow(try channel.pipeline.addHandler(TaskHandler(task: task, kind: .host, delegate: TestHTTPDelegate(), redirectHandler: nil, - ignoreUncleanSSLShutdown: false)).wait()) + ignoreUncleanSSLShutdown: false, + logger: HTTPClient.loggingDisabled)).wait()) var request = try Request(url: "http://localhost/get") request.headers.add(name: "X-Test-Header", value: "X-Test-Value") @@ -91,12 +93,13 @@ class HTTPClientInternalTests: XCTestCase { func testHTTPPartsHandlerMultiBody() throws { let channel = EmbeddedChannel() let delegate = TestHTTPDelegate() - let task = Task<Void>(eventLoop: channel.eventLoop) + let task = Task<Void>(eventLoop: channel.eventLoop, logger: HTTPClient.loggingDisabled) let handler = TaskHandler(task: task, kind: .host, delegate: delegate, redirectHandler: nil, - ignoreUncleanSSLShutdown: false) + ignoreUncleanSSLShutdown: false, + logger: HTTPClient.loggingDisabled) try channel.pipeline.addHandler(handler).wait() @@ -573,11 +576,12 @@ class HTTPClientInternalTests: XCTestCase { // This is pretty evil but we literally just get hold of a connection to get to the channel to be able to // observe when the server closing the connection is known to the client. let el = group.next() - XCTAssertNoThrow(maybeConnection = try client.pool.getConnection(for: .init(url: url), + XCTAssertNoThrow(maybeConnection = try client.pool.getConnection(Request(url: url), preference: .indifferent, - on: el, + taskEventLoop: el, deadline: nil, - setupComplete: el.makeSucceededFuture(())).wait()) + setupComplete: el.makeSucceededFuture(()), + logger: HTTPClient.loggingDisabled).wait()) guard let connection = maybeConnection else { XCTFail("couldn't get connection") return @@ -585,7 +589,7 @@ class HTTPClientInternalTests: XCTestCase { // And let's also give the connection back :). try connection.channel.eventLoop.submit { - connection.release(closing: false) + connection.release(closing: false, logger: HTTPClient.loggingDisabled) }.wait() XCTAssertEqual(0, sharedStateServerHandler.requestNumber.load()) @@ -622,11 +626,12 @@ class HTTPClientInternalTests: XCTestCase { body: nil) var maybeConnection: Connection? let el = client.eventLoopGroup.next() - XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(for: req, + XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(req, preference: .indifferent, - on: el, + taskEventLoop: el, deadline: nil, - setupComplete: el.makeSucceededFuture(())).wait()) + setupComplete: el.makeSucceededFuture(()), + logger: HTTPClient.loggingDisabled).wait()) guard let connection = maybeConnection else { XCTFail("couldn't make connection") throw NoChannelError() @@ -634,7 +639,7 @@ class HTTPClientInternalTests: XCTestCase { let channel = connection.channel try! channel.eventLoop.submit { - connection.release(closing: true) + connection.release(closing: true, logger: HTTPClient.loggingDisabled) }.wait() return (web, channel) }) @@ -707,11 +712,12 @@ class HTTPClientInternalTests: XCTestCase { // Let's start by getting a connection so we can mess with the Channel :). var maybeConnection: Connection? let el = client.eventLoopGroup.next() - XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(for: req, + XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(req, preference: .indifferent, - on: el, + taskEventLoop: el, deadline: nil, - setupComplete: el.makeSucceededFuture(())).wait()) + setupComplete: el.makeSucceededFuture(()), + logger: HTTPClient.loggingDisabled).wait()) guard let connection = maybeConnection else { XCTFail("couldn't make connection") return @@ -725,7 +731,7 @@ class HTTPClientInternalTests: XCTestCase { sawTheClosePromise: sawTheClosePromise), position: .first).wait()) try! connection.channel.eventLoop.submit { - connection.release(closing: false) + connection.release(closing: false, logger: HTTPClient.loggingDisabled) }.wait() XCTAssertNoThrow(try client.execute(request: req).wait()) @@ -742,11 +748,12 @@ class HTTPClientInternalTests: XCTestCase { // When asking for a connection again, we should _not_ get the same one back because we did most of the close, // similar to what the SSLHandler would do. let el2 = client.eventLoopGroup.next() - let connection2Future = client.pool.getConnection(for: req, + let connection2Future = client.pool.getConnection(req, preference: .indifferent, - on: el2, + taskEventLoop: el2, deadline: nil, - setupComplete: el2.makeSucceededFuture(())) + setupComplete: el2.makeSucceededFuture(()), + logger: HTTPClient.loggingDisabled) doActualCloseNowPromise.succeed(()) XCTAssertNoThrow(try maybeConnection = connection2Future.wait()) @@ -757,7 +764,7 @@ class HTTPClientInternalTests: XCTestCase { XCTAssert(connection !== connection2) try! connection2.channel.eventLoop.submit { - connection2.release(closing: true) + connection2.release(closing: true, logger: HTTPClient.loggingDisabled) }.wait() XCTAssertTrue(connection2.channel.isActive) } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 96d0d3fa4..a7eee57b5 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -14,6 +14,7 @@ import AsyncHTTPClient import Foundation +import Logging import NIO import NIOConcurrencyHelpers import NIOHTTP1 @@ -752,6 +753,60 @@ extension EventLoopFuture { } } +struct CollectEverythingLogHandler: LogHandler { + var metadata: Logger.Metadata = [:] + var logLevel: Logger.Level = .info + let logStore: LogStore + + class LogStore { + struct Entry { + var level: Logger.Level + var message: String + var metadata: [String: String] + } + + var lock = Lock() + var logs: [Entry] = [] + + var allEntries: [Entry] { + get { + return self.lock.withLock { self.logs } + } + set { + self.lock.withLock { self.logs = newValue } + } + } + + func append(level: Logger.Level, message: Logger.Message, metadata: Logger.Metadata?) { + self.lock.withLock { + self.logs.append(Entry(level: level, + message: message.description, + metadata: metadata?.mapValues { $0.description } ?? [:])) + } + } + } + + init(logStore: LogStore) { + self.logStore = logStore + } + + func log(level: Logger.Level, + message: Logger.Message, + metadata: Logger.Metadata?, + file: String, function: String, line: UInt) { + self.logStore.append(level: level, message: message, metadata: self.metadata.merging(metadata ?? [:]) { $1 }) + } + + subscript(metadataKey key: String) -> Logger.Metadata.Value? { + get { + return self.metadata[key] + } + set { + self.metadata[key] = newValue + } + } +} + private let cert = """ -----BEGIN CERTIFICATE----- MIICmDCCAYACCQCPC8JDqMh1zzANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJ1 diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index b9a4af3e0..7d61f805e 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -105,6 +105,10 @@ extension HTTPClientTests { ("testWeHandleUsReceivingACloseHeaderCorrectly", testWeHandleUsReceivingACloseHeaderCorrectly), ("testWeHandleUsSendingACloseHeaderAmongstOtherConnectionHeadersCorrectly", testWeHandleUsSendingACloseHeaderAmongstOtherConnectionHeadersCorrectly), ("testWeHandleUsReceivingACloseHeaderAmongstOtherConnectionHeadersCorrectly", testWeHandleUsReceivingACloseHeaderAmongstOtherConnectionHeadersCorrectly), + ("testLoggingCorrectlyAttachesRequestInformation", testLoggingCorrectlyAttachesRequestInformation), + ("testNothingIsLoggedAtInfoOrHigher", testNothingIsLoggedAtInfoOrHigher), + ("testAllMethodsLog", testAllMethodsLog), + ("testClosingIdleConnectionsInPoolLogsInTheBackground", testClosingIdleConnectionsInPoolLogsInTheBackground), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 5e1a403a1..e259db776 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -16,6 +16,7 @@ #if canImport(Network) import Network #endif +import Logging import NIO import NIOConcurrencyHelpers import NIOFoundationCompat @@ -33,6 +34,7 @@ class HTTPClientTests: XCTestCase { var serverGroup: EventLoopGroup! var defaultHTTPBin: HTTPBin! var defaultClient: HTTPClient! + var backgroundLogStore: CollectEverythingLogHandler.LogStore! var defaultHTTPBinURLPrefix: String { return "http://localhost:\(self.defaultHTTPBin.port)/" @@ -43,16 +45,25 @@ class HTTPClientTests: XCTestCase { XCTAssertNil(self.serverGroup) XCTAssertNil(self.defaultHTTPBin) XCTAssertNil(self.defaultClient) + XCTAssertNil(self.backgroundLogStore) + self.clientGroup = getDefaultEventLoopGroup(numberOfThreads: 1) self.serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) self.defaultHTTPBin = HTTPBin() - self.defaultClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup)) + self.backgroundLogStore = CollectEverythingLogHandler.LogStore() + var backgroundLogger = Logger(label: "\(#function)", factory: { _ in + CollectEverythingLogHandler(logStore: self.backgroundLogStore!) + }) + backgroundLogger.logLevel = .trace + self.defaultClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup), + backgroundActivityLogger: backgroundLogger) } override func tearDown() { - XCTAssertNotNil(self.defaultClient) - XCTAssertNoThrow(try self.defaultClient.syncShutdown()) - self.defaultClient = nil + if let defaultClient = self.defaultClient { + XCTAssertNoThrow(try defaultClient.syncShutdown()) + self.defaultClient = nil + } XCTAssertNotNil(self.defaultHTTPBin) XCTAssertNoThrow(try self.defaultHTTPBin.shutdown()) @@ -65,6 +76,9 @@ class HTTPClientTests: XCTestCase { XCTAssertNotNil(self.serverGroup) XCTAssertNoThrow(try self.serverGroup.syncShutdownGracefully()) self.serverGroup = nil + + XCTAssertNotNil(self.backgroundLogStore) + self.backgroundLogStore = nil } func testRequestURI() throws { @@ -1793,4 +1807,197 @@ class HTTPClientTests: XCTestCase { XCTAssertEqual(stats2.connectionNumber, stats3.connectionNumber) } } + + func testLoggingCorrectlyAttachesRequestInformation() { + let logStore = CollectEverythingLogHandler.LogStore() + + var loggerYolo001: Logger = Logger(label: "\(#function)", factory: { _ in + CollectEverythingLogHandler(logStore: logStore) + }) + loggerYolo001.logLevel = .trace + loggerYolo001[metadataKey: "yolo-request-id"] = "yolo-001" + var loggerACME002: Logger = Logger(label: "\(#function)", factory: { _ in + CollectEverythingLogHandler(logStore: logStore) + }) + loggerACME002.logLevel = .trace + loggerACME002[metadataKey: "acme-request-id"] = "acme-002" + + guard let request1 = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get"), + let request2 = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "stats"), + let request3 = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "ok") else { + XCTFail("bad stuff, can't even make request structures") + return + } + + // === Request 1 (Yolo001) + XCTAssertNoThrow(try self.defaultClient.execute(request: request1, + eventLoop: .indifferent, + deadline: nil, + logger: loggerYolo001).wait()) + let logsAfterReq1 = logStore.allEntries + logStore.allEntries = [] + + // === Request 2 (Yolo001) + XCTAssertNoThrow(try self.defaultClient.execute(request: request2, + eventLoop: .indifferent, + deadline: nil, + logger: loggerYolo001).wait()) + let logsAfterReq2 = logStore.allEntries + logStore.allEntries = [] + + // === Request 3 (ACME002) + XCTAssertNoThrow(try self.defaultClient.execute(request: request3, + eventLoop: .indifferent, + deadline: nil, + logger: loggerACME002).wait()) + let logsAfterReq3 = logStore.allEntries + logStore.allEntries = [] + + // === Assertions + XCTAssertGreaterThan(logsAfterReq1.count, 0) + XCTAssertGreaterThan(logsAfterReq2.count, 0) + XCTAssertGreaterThan(logsAfterReq3.count, 0) + + XCTAssert(logsAfterReq1.allSatisfy { entry in + if let httpRequestMetadata = entry.metadata["ahc-request-id"], + let yoloRequestID = entry.metadata["yolo-request-id"] { + XCTAssertNil(entry.metadata["acme-request-id"]) + XCTAssertEqual("yolo-001", yoloRequestID) + XCTAssertNotNil(Int(httpRequestMetadata)) + return true + } else { + XCTFail("log message doesn't contain the right IDs: \(entry)") + return false + } + }) + XCTAssert(logsAfterReq1.contains { entry in + entry.message == "opening fresh connection (no connections to reuse available)" + }) + + XCTAssert(logsAfterReq2.allSatisfy { entry in + if let httpRequestMetadata = entry.metadata["ahc-request-id"], + let yoloRequestID = entry.metadata["yolo-request-id"] { + XCTAssertNil(entry.metadata["acme-request-id"]) + XCTAssertEqual("yolo-001", yoloRequestID) + XCTAssertNotNil(Int(httpRequestMetadata)) + return true + } else { + XCTFail("log message doesn't contain the right IDs: \(entry)") + return false + } + }) + XCTAssert(logsAfterReq2.contains { entry in + entry.message.starts(with: "leasing existing connection") + }) + + XCTAssert(logsAfterReq3.allSatisfy { entry in + if let httpRequestMetadata = entry.metadata["ahc-request-id"], + let acmeRequestID = entry.metadata["acme-request-id"] { + XCTAssertNil(entry.metadata["yolo-request-id"]) + XCTAssertEqual("acme-002", acmeRequestID) + XCTAssertNotNil(Int(httpRequestMetadata)) + return true + } else { + XCTFail("log message doesn't contain the right IDs: \(entry)") + return false + } + }) + XCTAssert(logsAfterReq3.contains { entry in + entry.message.starts(with: "leasing existing connection") + }) + } + + func testNothingIsLoggedAtInfoOrHigher() { + let logStore = CollectEverythingLogHandler.LogStore() + + var logger: Logger = Logger(label: "\(#function)", factory: { _ in + CollectEverythingLogHandler(logStore: logStore) + }) + logger.logLevel = .info + + guard let request1 = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get"), + let request2 = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "stats") else { + XCTFail("bad stuff, can't even make request structures") + return + } + + // === Request 1 + XCTAssertNoThrow(try self.defaultClient.execute(request: request1, + eventLoop: .indifferent, + deadline: nil, + logger: logger).wait()) + XCTAssertEqual(0, logStore.allEntries.count) + + // === Request 2 + XCTAssertNoThrow(try self.defaultClient.execute(request: request2, + eventLoop: .indifferent, + deadline: nil, + logger: logger).wait()) + XCTAssertEqual(0, logStore.allEntries.count) + + XCTAssertEqual(0, self.backgroundLogStore.allEntries.count) + } + + func testAllMethodsLog() { + func checkExpectationsWithLogger<T>(type: String, _ body: (Logger, String) throws -> T) throws -> T { + let logStore = CollectEverythingLogHandler.LogStore() + + var logger: Logger = Logger(label: "\(#function)", factory: { _ in + CollectEverythingLogHandler(logStore: logStore) + }) + logger.logLevel = .trace + logger[metadataKey: "req"] = "yo-\(type)" + + let url = self.defaultHTTPBinURLPrefix + "not-found/request/\(type))" + let result = try body(logger, url) + + XCTAssertGreaterThan(logStore.allEntries.count, 0) + logStore.allEntries.forEach { entry in + XCTAssertEqual("yo-\(type)", entry.metadata["req"] ?? "n/a") + XCTAssertNotNil(Int(entry.metadata["ahc-request-id"] ?? "n/a")) + } + return result + } + + XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "GET") { logger, url in + try self.defaultClient.get(url: url, logger: logger).wait() + }.status)) + + XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "PUT") { logger, url in + try self.defaultClient.put(url: url, logger: logger).wait() + }.status)) + + XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "POST") { logger, url in + try self.defaultClient.post(url: url, logger: logger).wait() + }.status)) + + XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "DELETE") { logger, url in + try self.defaultClient.delete(url: url, logger: logger).wait() + }.status)) + + XCTAssertNoThrow(XCTAssertEqual(.notFound, try checkExpectationsWithLogger(type: "PATCH") { logger, url in + try self.defaultClient.patch(url: url, logger: logger).wait() + }.status)) + + // No background activity expected here. + XCTAssertEqual(0, self.backgroundLogStore.allEntries.count) + } + + func testClosingIdleConnectionsInPoolLogsInTheBackground() { + XCTAssertNoThrow(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "/get").wait()) + + XCTAssertNoThrow(try self.defaultClient.syncShutdown()) + + XCTAssertGreaterThanOrEqual(self.backgroundLogStore.allEntries.count, 0) + XCTAssert(self.backgroundLogStore.allEntries.contains { entry in + entry.message == "closing provider" + }) + XCTAssert(self.backgroundLogStore.allEntries.allSatisfy { entry in + entry.metadata["ahc-request-id"] == nil && + entry.metadata["ahc-request"] == nil && + entry.metadata["ahc-provider"] != nil + }) + + self.defaultClient = nil // so it doesn't get shut down again. + } } diff --git a/docs/logging-design.md b/docs/logging-design.md new file mode 100644 index 000000000..02ef721fa --- /dev/null +++ b/docs/logging-design.md @@ -0,0 +1,70 @@ +# Design of the way AsyncHTTPClient logs + +<details> + <summary>The logging is strictly separated between request activity & background activity.</summary> + AsyncHTTPClient is very much a request-driven library. Almost all work happens when you invoke a request, say `httpClient.get(someURL)`. To preserve the metadata you may have attached to your current `Logger`, we accept a `logger: Logger` parameter on each request. For example to so a `GET` request with logging use the following code. + +```swift +httpClient.get(someURL, logger: myLogger) +``` + + Apart from the request-driven work, AsyncHTTPClient does do some very limited amount of background work, for example expiring connections that stayed unused in the connection pool for too long. Logs associated with the activity from background tasks can be seen only if you attach a `Logger` in `HTTPClient`'s initialiser like below. + +```swift +HTTPClient(eventLoopGroupProvider: .shared(group), + backgroundActivityLogger: self.myBackgroundLogger) +``` + +The rationale for the strict separation is the correct propagation of the `Logger`'s `metadata`. You are likely to attach request specific information to a `Logger` before passing it to one of AsyncHTTPClient's request methods. This metadata will then be correctly attached to all log messages that occur from AsyncHTTPClient processing this request. + +If AsyncHTTPClient does some work in the background (like closing a connection that was long idle) however you likely do _not_ want the request-specific information from some previous request to be attached to those messages. Therefore, those messages get logged with the `backgroundActivityLogger` passed to HTTPClient's initialiser. +</details> +<details> + <summary>Unless you explicitly pass AsyncHTTPClient a `Logger` instance, nothing is ever logged.</summary> + AsyncHTTPClient is useful in many places where you wouldn't want to log, for example a command line HTTP client. Also, we do not want to change its default behaviour in a minor release. +</details> +<details> + <summary>Nothing is logged at level `info` or higher, unless something is really wrong that cannot be communicated through the API.</summary> + Fundamentally, AsyncHTTPClient performs a simple task, it makes a HTTP request and communicates the outcome back via its API. In normal usage, we would not expect people to want AsyncHTTPClient to log. In certain scenarios, for example when debugging why a request takes longer than expected it may however be useful to get information about AsyncHTTPClient's connection pool. That is when enabling logging may become useful. +</details> +<details> + <summary>Each request will get a globally unique request ID (`ahc-request-id`) that will be attached (as metadata) to each log message relevant to a request.</summary> + When many concurrent requests are active, it can be challenging to figure out which log message is associated with which request. To facilitate this task, AsyncHTTPClient will add a metadata field `ahc-request-id` to each log message so you can first find the request ID that is causing issues and then filter only messages with that ID. +</details> +<details> + <summary>Your `Logger` metadata is preserved.</summary> + AsyncHTTPClient accepts a `Logger` on every request method. This means that all the metadata you have attached, will be present on log messages issued by AsyncHTTPClient. + + For example, if you attach `["my-system-req-uuid": "84B453E0-0DFD-4B4B-BF22-3434812C9015"]` and then do two requests using AsyncHTTPClient, both of those requests will carry `"my-system-req-uuid` as well as AsyncHTTPClient's `ahc-request-id`. This allows you to filter all HTTP request made from one of your system's requests whilst still disambiguating the HTTP requests (they will have different `ahc-request-id`s. +</details> +<details> + <summary>Instead of accepting one `Logger` instance per `HTTPClient` instance, each request method can accept a `Logger`.</summary> + This allows AsyncHTTPClient to preserve your metadata and add its own metadata such as `ahc-request-id`. +</details> +<details> + <summary>All logs use the [structured logging](https://www.sumologic.com/glossary/structured-logging/) pattern, i.e. only static log messages and accompanying key/value metadata are used.</summary> + None of the log messages issued by AsyncHTTPClient will use String interpolation which means they will always be the exact same message. + + For example when AsyncHTTPClient wants to tell you it got an actual network connection to perform a request on, it will give the logger the following pieces of information: + + - message: `got connection for request` + - metadata (the values are example): + - `ahc-request-id`: `0` + - `ahc-connection`: `SocketChannel { BaseSocket { fd=15 }, active = true, localAddress = Optional([IPv4]127.0.0.1/127.0.0.1:54459), remoteAddress = Optional([IPv4]127.0.0.1/127.0.0.1:54457) }` + + As you can see above, the log message doesn't actually contain the request or the network connection. Both of those pieces of information are in the `metadata`. + + The rationale is that many people use log aggregation systems where it is very useful to aggregate, search and group by log message, or specific metadata values. This is greatly simplified by using a constant string (relatively stable) string and explicitly marked metadata values which make it easy to filter by. +</details> +<details> + <summary>`debug` should be enough to diagnose most problems but information that can be correlated is usually skipped.</summary> + When crafting log messages, it's often hard to strike a balance between logging everything and logging just enough. A rule of thumb is that you have to assume someone may be running with `logLevel = .debug` in production. So it can't be too much. Yet `.trace` can log everything you would need to know when debugging a tricky implementation issue. We assume nobody is running in production with `logLevel = .trace`. + + The problem with logging everything is that logging itself becomes very slow. We want logging in `debug` level to still be reasonably performant and therefore avoid logging information that can be correlated from other log messages. + + For example, AsyncHTTPClient may tell you in two log messages that it `got a connection` (from the connection pool) and a little later that it's `parking connection` (in the connection pool). Just like all messages, both of them will have an associated `ahc-request-id` which makes it possible to correlate the two log messages. The message that logs that we actually got a network connection will also include information about this network connection. The message that we're now parking the connection however _will not_. The information which connection is being parked can be found by filtering all other log messages with the same `ahc-request-id`. +</details> +<details> + <summary>In `trace`, AsyncHTTPClient may log _a lot_.</summary> + In the `.trace` log level, AsyncHTTPClient basically logs all the information that it has handily available. The frugality considerations we take in `.debug` do not apply here. We just want to log as much information as possible. This is useful almost exclusively for local debugging and should almost certainly not be sent into a log aggregation system where the information might be persisted for a long time. This also means, handing AsyncHTTPClient a logger in `logLevel = .trace` may have a fairly serious performance impact. +</details>