diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index c29b32a7a..13f8149f7 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -375,6 +375,11 @@ extension HTTPConnectionPool { self.connections[index].lease() } + func parkConnection(at index: Int) -> (Connection.ID, EventLoop) { + precondition(self.connections[index].isIdle) + return (self.connections[index].connectionID, self.connections[index].eventLoop) + } + /// A new HTTP/1.1 connection was released. /// /// This will put the position into the idle state. @@ -446,12 +451,13 @@ extension HTTPConnectionPool { /// This will put the position into the closed state. /// /// - Parameter connectionID: The failed connection's id. - /// - Returns: An index and an IdleConnectionContext to determine the next action for the now closed connection. + /// - Returns: An optional index and an IdleConnectionContext to determine the next action for the closed connection. /// You must call ``removeConnection(at:)`` or ``replaceConnection(at:)`` with the - /// supplied index after this. - mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext) { + /// supplied index after this. If nil is returned the connection was closed by the state machine and was + /// therefore already removed. + mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? { guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { - preconditionFailure("We tried to fail a new connection that we know nothing about?") + return nil } let use: ConnectionUse @@ -607,22 +613,4 @@ extension HTTPConnectionPool { var connecting: Int = 0 var backingOff: Int = 0 } - - /// The pool cleanup todo list. - struct CleanupContext: Equatable { - /// the connections to close right away. These are idle. - var close: [Connection] - - /// the connections that currently run a request that needs to be cancelled to close the connections - var cancel: [Connection] - - /// the connections that are backing off from connection creation - var connectBackoff: [Connection.ID] - - init(close: [Connection] = [], cancel: [Connection] = [], connectBackoff: [Connection.ID] = []) { - self.close = close - self.cancel = cancel - self.connectBackoff = connectBackoff - } - } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift new file mode 100644 index 000000000..17582293d --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -0,0 +1,462 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO + +extension HTTPConnectionPool { + struct HTTP1StateMachine { + enum State: Equatable { + case running + case shuttingDown(unclean: Bool) + case shutDown + } + + typealias Action = HTTPConnectionPool.StateMachine.Action + + private var connections: HTTP1Connections + private var failedConsecutiveConnectionAttempts: Int = 0 + + private var requests: RequestQueue + private var state: State = .running + + init(idGenerator: Connection.ID.Generator, maximumConcurrentConnections: Int) { + self.connections = HTTP1Connections( + maximumConcurrentConnections: maximumConcurrentConnections, + generator: idGenerator + ) + + self.requests = RequestQueue() + } + + // MARK: - Events - + + mutating func executeRequest(_ request: Request) -> Action { + switch self.state { + case .running: + if let eventLoop = request.requiredEventLoop { + return self.executeRequestOnRequiredEventLoop(request, eventLoop: eventLoop) + } else { + return self.executeRequestOnPreferredEventLoop(request, eventLoop: request.preferredEventLoop) + } + case .shuttingDown, .shutDown: + // it is fairly unlikely that this condition is met, since the ConnectionPoolManager + // also fails new requests immediately, if it is shutting down. However there might + // be race conditions in which a request passes through a running connection pool + // manager, but hits a connection pool that is already shutting down. + // + // (Order in one lock does not guarantee order in the next lock!) + return .init( + request: .failRequest(request, HTTPClientError.alreadyShutdown, cancelTimeout: false), + connection: .none + ) + } + } + + private mutating func executeRequestOnPreferredEventLoop(_ request: Request, eventLoop: EventLoop) -> Action { + if let connection = self.connections.leaseConnection(onPreferred: eventLoop) { + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .cancelTimeoutTimer(connection.id) + ) + } + + // No matter what we do now, the request will need to wait! + self.requests.push(request) + let requestAction: StateMachine.RequestAction = .scheduleRequestTimeout( + for: request, + on: eventLoop + ) + + if !self.connections.canGrow { + // all connections are busy and there is no room for more connections, we need to wait! + return .init(request: requestAction, connection: .none) + } + + // if we are not at max connections, we may want to create a new connection + if self.connections.startingGeneralPurposeConnections >= self.requests.generalPurposeCount { + // If there are at least as many connections starting as we have request queued, we + // don't need to create a new connection. we just need to wait. + return .init(request: requestAction, connection: .none) + } + + // There are not enough connections starting for the current waiting request count. We + // should create a new one. + let newConnectionID = self.connections.createNewConnection(on: eventLoop) + + return .init( + request: requestAction, + connection: .createConnection(newConnectionID, on: eventLoop) + ) + } + + private mutating func executeRequestOnRequiredEventLoop(_ request: Request, eventLoop: EventLoop) -> Action { + if let connection = self.connections.leaseConnection(onRequired: eventLoop) { + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .cancelTimeoutTimer(connection.id) + ) + } + + // No matter what we do now, the request will need to wait! + self.requests.push(request) + let requestAction: StateMachine.RequestAction = .scheduleRequestTimeout( + for: request, + on: eventLoop + ) + + let starting = self.connections.startingEventLoopConnections(on: eventLoop) + let waiting = self.requests.count(for: eventLoop) + + if starting >= waiting { + // There are already as many connections starting as we need for the waiting + // requests. A new connection doesn't need to be created. + return .init(request: requestAction, connection: .none) + } + + // There are not enough connections starting for the number of requests in the queue. + // We should create a new connection. + let newConnectionID = self.connections.createNewOverflowConnection(on: eventLoop) + + return .init( + request: requestAction, + connection: .createConnection(newConnectionID, on: eventLoop) + ) + } + + mutating func newHTTP1ConnectionEstablished(_ connection: Connection) -> Action { + self.failedConsecutiveConnectionAttempts = 0 + let (index, context) = self.connections.newHTTP1ConnectionEstablished(connection) + return self.nextActionForIdleConnection(at: index, context: context) + } + + mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action { + self.failedConsecutiveConnectionAttempts += 1 + + switch self.state { + case .running: + // We don't care how many waiting requests we have at this point, we will schedule a + // retry. More tasks, may appear until the backoff has completed. The final + // decision about the retry will be made in `connectionCreationBackoffDone(_:)` + let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID) + + let backoff = self.calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts) + return .init( + request: .none, + connection: .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop) + ) + + case .shuttingDown: + guard let (index, context) = self.connections.failConnection(connectionID) else { + preconditionFailure("Failed to create a connection that is unknown to us?") + } + return self.nextActionForFailedConnection(at: index, context: context) + + case .shutDown: + preconditionFailure("The pool is already shutdown all connections must already been torn down") + } + } + + mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action { + assert(self.connections.stats.backingOff >= 1, "At least this connection is currently in backoff") + // The naming of `failConnection` is a little confusing here. All it does is moving the + // connection state from `.backingOff` to `.closed` here. It also returns the + // connection's index. + guard let (index, context) = self.connections.failConnection(connectionID) else { + preconditionFailure("Backing off a connection that is unknown to us?") + } + // In `nextActionForFailedConnection` a decision will be made whether the failed + // connection should be replaced or removed. + return self.nextActionForFailedConnection(at: index, context: context) + } + + mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action { + guard let connection = self.connections.closeConnectionIfIdle(connectionID) else { + // because of a race this connection (connection close runs against trigger of timeout) + // was already removed from the state machine. + return .none + } + + precondition(self.state == .running, "If we are shutting down, we must not have any idle connections") + + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + } + + mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action { + let (index, context) = self.connections.releaseConnection(connectionID) + return self.nextActionForIdleConnection(at: index, context: context) + } + + /// A connection has been unexpectedly closed + mutating func connectionClosed(_ connectionID: Connection.ID) -> Action { + guard let (index, context) = self.connections.failConnection(connectionID) else { + // When a connection close is initiated by the connection pool, the connection will + // still report its close to the state machine. In those cases we must ignore the + // event. + return .none + } + return self.nextActionForFailedConnection(at: index, context: context) + } + + mutating func timeoutRequest(_ requestID: Request.ID) -> Action { + // 1. check requests in queue + if let request = self.requests.remove(requestID) { + return .init( + request: .failRequest(request, HTTPClientError.getConnectionFromPoolTimeout, cancelTimeout: false), + connection: .none + ) + } + + // 2. This point is reached, because the request may have already been scheduled. A + // connection might have become available shortly before the request timeout timer + // fired. + return .none + } + + mutating func cancelRequest(_ requestID: Request.ID) -> Action { + // 1. check requests in queue + if self.requests.remove(requestID) != nil { + return .init( + request: .cancelRequestTimeout(requestID), + connection: .none + ) + } + + // 2. This is point is reached, because the request may already have been forwarded to + // an idle connection. In this case the connection will need to handle the + // cancellation. + return .none + } + + mutating func shutdown() -> Action { + precondition(self.state == .running, "Shutdown must only be called once") + + // If we have remaining request queued, we should fail all of them with a cancelled + // error. + let waitingRequests = self.requests.removeAll() + + var requestAction: StateMachine.RequestAction = .none + if !waitingRequests.isEmpty { + requestAction = .failRequestsAndCancelTimeouts(waitingRequests, HTTPClientError.cancelled) + } + + // clean up the connections, we can cleanup now! + let cleanupContext = self.connections.shutdown() + + // If there aren't any more connections, everything is shutdown + let isShutdown: StateMachine.ConnectionAction.IsShutdown + let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty) + if self.connections.isEmpty { + self.state = .shutDown + isShutdown = .yes(unclean: unclean) + } else { + self.state = .shuttingDown(unclean: unclean) + isShutdown = .no + } + + return .init( + request: requestAction, + connection: .cleanupConnections(cleanupContext, isShutdown: isShutdown) + ) + } + + // MARK: - Private Methods - + + // MARK: Idle connection management + + private mutating func nextActionForIdleConnection( + at index: Int, + context: HTTP1Connections.IdleConnectionContext + ) -> Action { + switch self.state { + case .running: + switch context.use { + case .generalPurpose: + return self.nextActionForIdleGeneralPurposeConnection(at: index, context: context) + case .eventLoop: + return self.nextActionForIdleEventLoopConnection(at: index, context: context) + } + case .shuttingDown(let unclean): + assert(self.requests.isEmpty) + let connection = self.connections.closeConnection(at: index) + if self.connections.isEmpty { + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean)) + ) + } + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + + case .shutDown: + preconditionFailure("It the pool is already shutdown, all connections must have been torn down.") + } + } + + private mutating func nextActionForIdleGeneralPurposeConnection( + at index: Int, + context: HTTP1Connections.IdleConnectionContext + ) -> Action { + // 1. Check if there are waiting requests in the general purpose queue + if let request = self.requests.popFirst(for: nil) { + return .init( + request: .executeRequest(request, self.connections.leaseConnection(at: index), cancelTimeout: true), + connection: .none + ) + } + + // 2. Check if there are waiting requests in the matching eventLoop queue + if let request = self.requests.popFirst(for: context.eventLoop) { + return .init( + request: .executeRequest(request, self.connections.leaseConnection(at: index), cancelTimeout: true), + connection: .none + ) + } + + // 3. Create a timeout timer to ensure the connection is closed if it is idle for too + // long. + let (connectionID, eventLoop) = self.connections.parkConnection(at: index) + return .init( + request: .none, + connection: .scheduleTimeoutTimer(connectionID, on: eventLoop) + ) + } + + private mutating func nextActionForIdleEventLoopConnection( + at index: Int, + context: HTTP1Connections.IdleConnectionContext + ) -> Action { + // Check if there are waiting requests in the matching eventLoop queue + if let request = self.requests.popFirst(for: context.eventLoop) { + return .init( + request: .executeRequest(request, self.connections.leaseConnection(at: index), cancelTimeout: true), + connection: .none + ) + } + + // TBD: What do we want to do, if there are more requests in the general purpose queue? + // For now, we don't care. The general purpose connections will pick those up + // eventually. + // + // If there is no more eventLoop bound work, we close the eventLoop bound connections. + // We don't park them. + return .init( + request: .none, + connection: .closeConnection(self.connections.closeConnection(at: index), isShutdown: .no) + ) + } + + // MARK: Failed/Closed connection management + + private mutating func nextActionForFailedConnection( + at index: Int, + context: HTTP1Connections.FailedConnectionContext + ) -> Action { + switch self.state { + case .running: + switch context.use { + case .generalPurpose: + return self.nextActionForFailedGeneralPurposeConnection(at: index, context: context) + case .eventLoop: + return self.nextActionForFailedEventLoopConnection(at: index, context: context) + } + + case .shuttingDown(let unclean): + assert(self.requests.isEmpty) + self.connections.removeConnection(at: index) + if self.connections.isEmpty { + return .init( + request: .none, + connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean)) + ) + } + return .none + + case .shutDown: + preconditionFailure("If the pool is already shutdown, all connections must have been torn down.") + } + } + + private mutating func nextActionForFailedGeneralPurposeConnection( + at index: Int, + context: HTTP1Connections.FailedConnectionContext + ) -> Action { + if context.connectionsStartingForUseCase < self.requests.generalPurposeCount { + // if we have more requests queued up, than we have starting connections, we should + // create a new connection + let (newConnectionID, newEventLoop) = self.connections.replaceConnection(at: index) + return .init( + request: .none, + connection: .createConnection(newConnectionID, on: newEventLoop) + ) + } + self.connections.removeConnection(at: index) + return .none + } + + private mutating func nextActionForFailedEventLoopConnection( + at index: Int, + context: HTTP1Connections.FailedConnectionContext + ) -> Action { + if context.connectionsStartingForUseCase < self.requests.count(for: context.eventLoop) { + // if we have more requests queued up, than we have starting connections, we should + // create a new connection + let (newConnectionID, newEventLoop) = self.connections.replaceConnection(at: index) + return .init( + request: .none, + connection: .createConnection(newConnectionID, on: newEventLoop) + ) + } + self.connections.removeConnection(at: index) + return .none + } + + private func calculateBackoff(failedAttempt attempts: Int) -> TimeAmount { + // Our backoff formula is: 100ms * 1.25^(attempts - 1) that is capped of at 1minute + // This means for: + // - 1 failed attempt : 100ms + // - 5 failed attempts: ~300ms + // - 10 failed attempts: ~930ms + // - 15 failed attempts: ~2.84s + // - 20 failed attempts: ~8.67s + // - 25 failed attempts: ~26s + // - 29 failed attempts: ~60s (max out) + + let start = Double(TimeAmount.milliseconds(100).nanoseconds) + let backoffNanoseconds = Int64(start * pow(1.25, Double(attempts - 1))) + + let backoff: TimeAmount = min(.nanoseconds(backoffNanoseconds), .seconds(60)) + + // Calculate a 3% jitter range + let jitterRange = (backoff.nanoseconds / 100) * 3 + // Pick a random element from the range +/- jitter range. + let jitter: TimeAmount = .nanoseconds((-jitterRange...jitterRange).randomElement()!) + let jitteredBackoff = backoff + jitter + return jitteredBackoff + } + } +} + +extension HTTPConnectionPool.HTTP1StateMachine: CustomStringConvertible { + var description: String { + let stats = self.connections.stats + let queued = self.requests.count + + return "connections: [connecting: \(stats.connecting) | backoff: \(stats.backingOff) | leased: \(stats.leased) | idle: \(stats.idle)], queued: \(queued)" + } +} diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift index ca7af66fb..c58e18818 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift @@ -33,11 +33,12 @@ extension HTTPConnectionPool { self.count == 0 } - func count(for eventLoop: EventLoop?) -> Int { - if let eventLoop = eventLoop { - return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0 - } - return self.generalPurposeQueue.count + var generalPurposeCount: Int { + self.generalPurposeQueue.count + } + + func count(for eventLoop: EventLoop) -> Int { + self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0 } func isEmpty(for eventLoop: EventLoop?) -> Bool { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift new file mode 100644 index 000000000..b2b79c8df --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift @@ -0,0 +1,220 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO +import NIOHTTP1 + +extension HTTPConnectionPool { + struct StateMachine { + struct Action { + let request: RequestAction + let connection: ConnectionAction + + init(request: RequestAction, connection: ConnectionAction) { + self.request = request + self.connection = connection + } + + static let none: Action = Action(request: .none, connection: .none) + } + + enum ConnectionAction { + enum IsShutdown: Equatable { + case yes(unclean: Bool) + case no + } + + case createConnection(Connection.ID, on: EventLoop) + case scheduleBackoffTimer(Connection.ID, backoff: TimeAmount, on: EventLoop) + + case scheduleTimeoutTimer(Connection.ID, on: EventLoop) + case cancelTimeoutTimer(Connection.ID) + + case closeConnection(Connection, isShutdown: IsShutdown) + case cleanupConnections(CleanupContext, isShutdown: IsShutdown) + + case none + } + + enum RequestAction { + case executeRequest(Request, Connection, cancelTimeout: Bool) + case executeRequestsAndCancelTimeouts([Request], Connection) + + case failRequest(Request, Error, cancelTimeout: Bool) + case failRequestsAndCancelTimeouts([Request], Error) + + case scheduleRequestTimeout(for: Request, on: EventLoop) + case cancelRequestTimeout(Request.ID) + + case none + } + + enum HTTPVersionState { + case http1(HTTP1StateMachine) + } + + var state: HTTPVersionState + var isShuttingDown: Bool = false + + let eventLoopGroup: EventLoopGroup + let maximumConcurrentHTTP1Connections: Int + + init(eventLoopGroup: EventLoopGroup, idGenerator: Connection.ID.Generator, maximumConcurrentHTTP1Connections: Int) { + self.maximumConcurrentHTTP1Connections = maximumConcurrentHTTP1Connections + let http1State = HTTP1StateMachine( + idGenerator: idGenerator, + maximumConcurrentConnections: maximumConcurrentHTTP1Connections + ) + self.state = .http1(http1State) + self.eventLoopGroup = eventLoopGroup + } + + mutating func executeRequest(_ request: Request) -> Action { + switch self.state { + case .http1(var http1StateMachine): + let action = http1StateMachine.executeRequest(request) + self.state = .http1(http1StateMachine) + return action + } + } + + mutating func newHTTP1ConnectionCreated(_ connection: Connection) -> Action { + switch self.state { + case .http1(var http1StateMachine): + let action = http1StateMachine.newHTTP1ConnectionEstablished(connection) + self.state = .http1(http1StateMachine) + return action + } + } + + mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action { + switch self.state { + case .http1(var http1StateMachine): + let action = http1StateMachine.failedToCreateNewConnection( + error, + connectionID: connectionID + ) + self.state = .http1(http1StateMachine) + return action + } + } + + mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action { + switch self.state { + case .http1(var http1StateMachine): + let action = http1StateMachine.connectionCreationBackoffDone(connectionID) + self.state = .http1(http1StateMachine) + return action + } + } + + /// A request has timed out. + /// + /// This is different to a request being cancelled. If a request times out, we need to fail the + /// request, but don't need to cancel the timer (it already triggered). If a request is cancelled + /// we don't need to fail it but we need to cancel its timeout timer. + mutating func timeoutRequest(_ requestID: Request.ID) -> Action { + switch self.state { + case .http1(var http1StateMachine): + let action = http1StateMachine.timeoutRequest(requestID) + self.state = .http1(http1StateMachine) + return action + } + } + + /// A request was cancelled. + /// + /// This is different to a request timing out. If a request is cancelled we don't need to fail it but we + /// need to cancel its timeout timer. If a request times out, we need to fail the request, but don't + /// need to cancel the timer (it already triggered). + mutating func cancelRequest(_ requestID: Request.ID) -> Action { + switch self.state { + case .http1(var http1StateMachine): + let action = http1StateMachine.cancelRequest(requestID) + self.state = .http1(http1StateMachine) + return action + } + } + + mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action { + switch self.state { + case .http1(var http1StateMachine): + let action = http1StateMachine.connectionIdleTimeout(connectionID) + self.state = .http1(http1StateMachine) + return action + } + } + + /// A connection has been closed + mutating func connectionClosed(_ connectionID: Connection.ID) -> Action { + switch self.state { + case .http1(var http1StateMachine): + let action = http1StateMachine.connectionClosed(connectionID) + self.state = .http1(http1StateMachine) + return action + } + } + + mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action { + switch self.state { + case .http1(var http1StateMachine): + let action = http1StateMachine.http1ConnectionReleased(connectionID) + self.state = .http1(http1StateMachine) + return action + } + } + + mutating func shutdown() -> Action { + precondition(!self.isShuttingDown, "Shutdown must only be called once") + + self.isShuttingDown = true + + switch self.state { + case .http1(var http1StateMachine): + let action = http1StateMachine.shutdown() + self.state = .http1(http1StateMachine) + return action + } + } + } +} + +extension HTTPConnectionPool { + /// The pool cleanup todo list. + struct CleanupContext: Equatable { + /// the connections to close right away. These are idle. + var close: [Connection] + + /// the connections that currently run a request that needs to be cancelled to close the connections + var cancel: [Connection] + + /// the connections that are backing off from connection creation + var connectBackoff: [Connection.ID] + + init(close: [Connection] = [], cancel: [Connection] = [], connectBackoff: [Connection.ID] = []) { + self.close = close + self.cancel = cancel + self.connectBackoff = connectBackoff + } + } +} + +extension HTTPConnectionPool.StateMachine: CustomStringConvertible { + var description: String { + switch self.state { + case .http1(let http1): + return ".http1(\(http1))" + } + } +} diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 8ab715d60..478f62a6a 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -925,6 +925,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible { case tlsHandshakeTimeout case serverOfferedUnsupportedApplicationProtocol(String) case requestStreamCancelled + case getConnectionFromPoolTimeout } private var code: Code @@ -997,4 +998,11 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible { /// The remote server responded with a status code >= 300, before the full request was sent. The request stream /// was therefore cancelled public static let requestStreamCancelled = HTTPClientError(code: .requestStreamCancelled) + + /// Aquiring a HTTP connection from the connection pool timed out. + /// + /// This can have multiple reasons: + /// - A connection could not be created within the timout period. + /// - Tasks are not processed fast enough on the existing connections, to process all waiters in time + public static let getConnectionFromPoolTimeout = HTTPClientError(code: .getConnectionFromPoolTimeout) } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift index 30d10997c..8123657dd 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift @@ -68,7 +68,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { let backoff1EL = connections.backoffNextConnectionAttempt(conn1ID) XCTAssert(backoff1EL === el1) // backoff done. 2. decide what's next - let (conn1FailIndex, conn1FailContext) = connections.failConnection(conn1ID) + guard let (conn1FailIndex, conn1FailContext) = connections.failConnection(conn1ID) else { + return XCTFail("Expected that the connection is remembered") + } XCTAssert(conn1FailContext.eventLoop === el1) XCTAssertEqual(conn1FailContext.use, .generalPurpose) XCTAssertEqual(conn1FailContext.connectionsStartingForUseCase, 0) @@ -83,7 +85,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { XCTAssertEqual(connections.startingEventLoopConnections(on: el2), 1) let backoff2EL = connections.backoffNextConnectionAttempt(conn2ID) XCTAssert(backoff2EL === el2) - let (conn2FailIndex, conn2FailContext) = connections.failConnection(conn2ID) + guard let (conn2FailIndex, conn2FailContext) = connections.failConnection(conn2ID) else { + return XCTFail("Expected that the connection is remembered") + } XCTAssert(conn2FailContext.eventLoop === el2) XCTAssertEqual(conn2FailContext.use, .eventLoop(el2)) XCTAssertEqual(conn2FailContext.connectionsStartingForUseCase, 0) @@ -329,7 +333,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { XCTAssertEqual(connections.closeConnection(at: releaseIndex), lease) XCTAssertFalse(connections.isEmpty) - let (failIndex, _) = connections.failConnection(startingID) + guard let (failIndex, _) = connections.failConnection(startingID) else { + return XCTFail("Expected that the connection is remembered") + } connections.removeConnection(at: failIndex) XCTAssertTrue(connections.isEmpty) } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests+XCTest.swift new file mode 100644 index 000000000..590f078ab --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests+XCTest.swift @@ -0,0 +1,42 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// +// HTTPConnectionPool+HTTP1StateTests+XCTest.swift +// +import XCTest + +/// +/// NOTE: This file was generated by generate_linux_tests.rb +/// +/// Do NOT edit this file directly as it will be regenerated automatically when needed. +/// + +extension HTTPConnectionPool_HTTP1StateMachineTests { + static var allTests: [(String, (HTTPConnectionPool_HTTP1StateMachineTests) -> () throws -> Void)] { + return [ + ("testCreatingAndFailingConnections", testCreatingAndFailingConnections), + ("testConnectionFailureBackoff", testConnectionFailureBackoff), + ("testCancelRequestWorks", testCancelRequestWorks), + ("testExecuteOnShuttingDownPool", testExecuteOnShuttingDownPool), + ("testRequestsAreQueuedIfAllConnectionsAreInUseAndRequestsAreDequeuedInOrder", testRequestsAreQueuedIfAllConnectionsAreInUseAndRequestsAreDequeuedInOrder), + ("testBestConnectionIsPicked", testBestConnectionIsPicked), + ("testConnectionAbortIsIgnoredIfThereAreNoQueuedRequests", testConnectionAbortIsIgnoredIfThereAreNoQueuedRequests), + ("testConnectionCloseLeadsToTumbleWeedIfThereNoQueuedRequests", testConnectionCloseLeadsToTumbleWeedIfThereNoQueuedRequests), + ("testConnectionAbortLeadsToNewConnectionsIfThereAreQueuedRequests", testConnectionAbortLeadsToNewConnectionsIfThereAreQueuedRequests), + ("testParkedConnectionTimesOut", testParkedConnectionTimesOut), + ("testConnectionPoolFullOfParkedConnectionsIsShutdownImmediately", testConnectionPoolFullOfParkedConnectionsIsShutdownImmediately), + ("testParkedConnectionTimesOutButIsAlsoClosedByRemote", testParkedConnectionTimesOutButIsAlsoClosedByRemote), + ] + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift new file mode 100644 index 000000000..f781349b9 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift @@ -0,0 +1,590 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +import NIO +import NIOHTTP1 +import XCTest + +class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { + func testCreatingAndFailingConnections() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.StateMachine( + eventLoopGroup: elg, + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 8 + ) + + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + + // for the first eight requests, the pool should try to create new connections. + + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + guard case .createConnection(let connectionID, let connectionEL) = action.connection else { + return XCTFail("Unexpected connection action") + } + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + XCTAssert(connectionEL === mockRequest.eventLoop) + + XCTAssertNoThrow(try connections.createConnection(connectionID, on: connectionEL)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + // the next eight requests should only be queued. + + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + guard case .none = action.connection else { + return XCTFail("Unexpected connection action") + } + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + // timeout all queued requests except for two + + // fail all connection attempts + while let randomConnectionID = connections.randomStartingConnection() { + struct SomeError: Error, Equatable {} + + XCTAssertNoThrow(try connections.failConnectionCreation(randomConnectionID)) + let action = state.failedToCreateNewConnection(SomeError(), connectionID: randomConnectionID) + + // After a failed connection attempt, must not fail a request. Instead we should retry + // to create the connection with a backoff and a small jitter. The request should only + // be failed, once the connection setup timeout is hit or the request reaches it + // deadline. + + XCTAssertEqual(action.request, .none) + + guard case .scheduleBackoffTimer(randomConnectionID, backoff: _, on: _) = action.connection else { + return XCTFail("Unexpected request action: \(action.request)") + } + + XCTAssertNoThrow(try connections.startConnectionBackoffTimer(randomConnectionID)) + } + + // cancel all queued requests + while let request = queuer.timeoutRandomRequest() { + let cancelAction = state.cancelRequest(request) + XCTAssertEqual(cancelAction.connection, .none) + XCTAssertEqual(cancelAction.request, .cancelRequestTimeout(request)) + } + + // connection backoff done + while let connectionID = connections.randomBackingOffConnection() { + XCTAssertNoThrow(try connections.connectionBackoffTimerDone(connectionID)) + let backoffAction = state.connectionCreationBackoffDone(connectionID) + XCTAssertEqual(backoffAction.connection, .none) + XCTAssertEqual(backoffAction.request, .none) + } + + XCTAssert(queuer.isEmpty) + XCTAssert(connections.isEmpty) + } + + func testConnectionFailureBackoff() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.StateMachine( + eventLoopGroup: elg, + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 2 + ) + + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let action = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = action.connection else { + return XCTFail("Unexpected connection action: \(action.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + let failedConnect1 = state.failedToCreateNewConnection(HTTPClientError.connectTimeout, connectionID: connectionID) + XCTAssertEqual(failedConnect1.request, .none) + guard case .scheduleBackoffTimer(connectionID, let backoffTimeAmount1, _) = failedConnect1.connection else { + return XCTFail("Unexpected connection action: \(failedConnect1.connection)") + } + + // 2. connection attempt + let backoffDoneAction = state.connectionCreationBackoffDone(connectionID) + XCTAssertEqual(backoffDoneAction.request, .none) + guard case .createConnection(let newConnectionID, on: let newEventLoop) = backoffDoneAction.connection else { + return XCTFail("Unexpected connection action: \(backoffDoneAction.connection)") + } + XCTAssertGreaterThan(newConnectionID, connectionID) + XCTAssert(connectionEL === newEventLoop) // XCTAssertIdentical not available on Linux + + let failedConnect2 = state.failedToCreateNewConnection(HTTPClientError.connectTimeout, connectionID: newConnectionID) + XCTAssertEqual(failedConnect2.request, .none) + guard case .scheduleBackoffTimer(newConnectionID, let backoffTimeAmount2, _) = failedConnect2.connection else { + return XCTFail("Unexpected connection action: \(failedConnect2.connection)") + } + + XCTAssertNotEqual(backoffTimeAmount2, backoffTimeAmount1) + + // 3. request times out + let failRequest = state.timeoutRequest(request.id) + guard case .failRequest(let requestToFail, let requestError, cancelTimeout: false) = failRequest.request else { + return XCTFail("Unexpected request action: \(action.request)") + } + XCTAssert(requestToFail.__testOnly_wrapped_request() === mockRequest) // XCTAssertIdentical not available on Linux + XCTAssertEqual(requestError as? HTTPClientError, .getConnectionFromPoolTimeout) + XCTAssertEqual(failRequest.connection, .none) + + // 4. retry connection, but no more queued requests. + XCTAssertEqual(state.connectionCreationBackoffDone(newConnectionID), .none) + } + + func testCancelRequestWorks() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.StateMachine( + eventLoopGroup: elg, + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 2 + ) + + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let executeAction = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), executeAction.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = executeAction.connection else { + return XCTFail("Unexpected connection action: \(executeAction.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + // 2. cancel request + + let cancelAction = state.cancelRequest(request.id) + XCTAssertEqual(cancelAction.request, .cancelRequestTimeout(request.id)) + XCTAssertEqual(cancelAction.connection, .none) + + // 3. request timeout triggers to late + XCTAssertEqual(state.timeoutRequest(request.id), .none, "To late timeout is ignored") + + // 4. succeed connection attempt + let connectedAction = state.newHTTP1ConnectionCreated(.__testOnly_connection(id: connectionID, eventLoop: connectionEL)) + XCTAssertEqual(connectedAction.request, .none, "Request must not be executed") + XCTAssertEqual(connectedAction.connection, .scheduleTimeoutTimer(connectionID, on: connectionEL)) + } + + func testExecuteOnShuttingDownPool() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.StateMachine( + eventLoopGroup: elg, + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 2 + ) + + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let executeAction = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), executeAction.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = executeAction.connection else { + return XCTFail("Unexpected connection action: \(executeAction.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + // 2. connection succeeds + let connection: HTTPConnectionPool.Connection = .__testOnly_connection(id: connectionID, eventLoop: connectionEL) + let connectedAction = state.newHTTP1ConnectionCreated(connection) + guard case .executeRequest(request, connection, cancelTimeout: true) = connectedAction.request else { + return XCTFail("Unexpected request action: \(connectedAction.request)") + } + XCTAssert(request.__testOnly_wrapped_request() === mockRequest) // XCTAssertIdentical not available on Linux + XCTAssertEqual(connectedAction.connection, .none) + + // 3. shutdown + let shutdownAction = state.shutdown() + XCTAssertEqual(.none, shutdownAction.request) + guard case .cleanupConnections(let cleanupContext, isShutdown: .no) = shutdownAction.connection else { + return XCTFail("Unexpected connection action: \(shutdownAction.connection)") + } + + XCTAssertEqual(cleanupContext.cancel.count, 1) + XCTAssertEqual(cleanupContext.cancel.first?.id, connectionID) + XCTAssertEqual(cleanupContext.close, []) + XCTAssertEqual(cleanupContext.connectBackoff, []) + + // 4. execute another request + let finalMockRequest = MockHTTPRequest(eventLoop: elg.next()) + let finalRequest = HTTPConnectionPool.Request(finalMockRequest) + let failAction = state.executeRequest(finalRequest) + XCTAssertEqual(failAction.connection, .none) + XCTAssertEqual(failAction.request, .failRequest(finalRequest, HTTPClientError.alreadyShutdown, cancelTimeout: false)) + + // 5. close open connection + let closeAction = state.connectionClosed(connectionID) + XCTAssertEqual(closeAction.connection, .cleanupConnections(.init(), isShutdown: .yes(unclean: true))) + XCTAssertEqual(closeAction.request, .none) + } + + func testRequestsAreQueuedIfAllConnectionsAreInUseAndRequestsAreDequeuedInOrder() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + guard var (connections, state) = try? MockConnectionPool.http1(elg: elg, numberOfConnections: 8) else { + return XCTFail("Test setup failed") + } + + XCTAssertEqual(connections.parked, 8) + + // Add eight requests to fill all connections + for _ in 0..<8 { + let eventLoop = elg.next() + guard let expectedConnection = connections.newestParkedConnection(for: eventLoop) ?? connections.newestParkedConnection else { + return XCTFail("Expected to still have connections available") + } + + let mockRequest = MockHTTPRequest(eventLoop: eventLoop) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + + XCTAssertEqual(action.connection, .cancelTimeoutTimer(expectedConnection.id)) + guard case .executeRequest(let returnedRequest, expectedConnection, cancelTimeout: false) = action.request else { + return XCTFail("Expected to execute a request next, but got: \(action.request)") + } + + XCTAssert(mockRequest === returnedRequest.__testOnly_wrapped_request()) + + XCTAssertNoThrow(try connections.activateConnection(expectedConnection.id)) + XCTAssertNoThrow(try connections.execute(mockRequest, on: expectedConnection)) + } + + // Add 100 requests to fill request queue + var queuedRequestsOrder = CircularBuffer() + var queuer = MockRequestQueuer() + for _ in 0..<100 { + let eventLoop = elg.next() + let mockRequest = MockHTTPRequest(eventLoop: eventLoop, requiresEventLoopForChannel: false) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + + XCTAssertEqual(action.connection, .none) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + queuedRequestsOrder.append(request.id) + } + + while let connection = connections.randomLeasedConnection() { + XCTAssertNoThrow(try connections.finishExecution(connection.id)) + let action = state.http1ConnectionReleased(connection.id) + + switch action.connection { + case .scheduleTimeoutTimer(connection.id, on: let timerEL): + // if all queued requests are processed, the connection will be parked + XCTAssert(queuedRequestsOrder.isEmpty) + XCTAssertEqual(action.request, .none) + XCTAssert(connection.eventLoop === timerEL) + XCTAssertNoThrow(try connections.parkConnection(connection.id)) + case .none: + guard case .executeRequest(let request, connection, cancelTimeout: true) = action.request else { + return XCTFail("Unexpected request action: \(action.request)") + } + XCTAssertEqual(request.id, queuedRequestsOrder.popFirst()) + let mockRequest = request.__testOnly_wrapped_request() + XCTAssertNoThrow(try connections.execute(queuer.get(request.id, request: mockRequest), on: connection)) + + default: + XCTFail("Unexpected connection action: \(action)") + } + } + + XCTAssertEqual(connections.parked, 8) + XCTAssert(queuer.isEmpty) + } + + func testBestConnectionIsPicked() { + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 64) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + guard var (connections, state) = try? MockConnectionPool.http1(elg: elg, numberOfConnections: 8) else { + return XCTFail("Test setup failed") + } + + for index in 1...300 { + // Every iteration we start with eight parked connections + XCTAssertEqual(connections.parked, 8) + + var reqEventLoop: EventLoop = elg.next() + for _ in 0..<((0..<63).randomElement()!) { + // pick a random eventLoop for the next request + reqEventLoop = elg.next() + } + + // 10% of the cases enforce the eventLoop + let elRequired = (0..<10).randomElement().flatMap { $0 == 0 ? true : false }! + let mockRequest = MockHTTPRequest(eventLoop: reqEventLoop, requiresEventLoopForChannel: elRequired) + let request = HTTPConnectionPool.Request(mockRequest) + + let action = state.executeRequest(request) + + switch action.connection { + case .createConnection(let connectionID, on: let connEventLoop): + XCTAssertTrue(elRequired) + XCTAssertNil(connections.newestParkedConnection(for: reqEventLoop)) + XCTAssert(connEventLoop === reqEventLoop) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: reqEventLoop)) + + let connection: HTTPConnectionPool.Connection = .__testOnly_connection(id: connectionID, eventLoop: connEventLoop) + let createdAction = state.newHTTP1ConnectionCreated(connection) + XCTAssertEqual(createdAction.request, .executeRequest(request, connection, cancelTimeout: true)) + XCTAssertEqual(createdAction.connection, .none) + + let doneAction = state.http1ConnectionReleased(connectionID) + XCTAssertEqual(doneAction.request, .none) + XCTAssertEqual(doneAction.connection, .closeConnection(connection, isShutdown: .no)) + XCTAssertEqual(state.connectionClosed(connectionID), .none) + + case .cancelTimeoutTimer(let connectionID): + guard let expectedConnection = connections.newestParkedConnection(for: reqEventLoop) ?? connections.newestParkedConnection else { + return XCTFail("Expected to have connections available") + } + + if elRequired { + XCTAssert(expectedConnection.eventLoop === reqEventLoop) + } + + XCTAssertEqual(connectionID, expectedConnection.id, "Request is scheduled on the connection we expected") + XCTAssertNoThrow(try connections.activateConnection(connectionID)) + + guard case .executeRequest(let request, let connection, cancelTimeout: false) = action.request else { + return XCTFail("Expected to execute a request, but got: \(action.request)") + } + XCTAssertEqual(connection, expectedConnection) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: connection)) + XCTAssertNoThrow(try connections.finishExecution(connection.id)) + + XCTAssertEqual(state.http1ConnectionReleased(connection.id), + .init(request: .none, connection: .scheduleTimeoutTimer(connection.id, on: connection.eventLoop))) + XCTAssertNoThrow(try connections.parkConnection(connectionID)) + + default: + XCTFail("Unexpected connection action in iteration \(index): \(action.connection)") + } + } + + XCTAssertEqual(connections.parked, 8) + } + + func testConnectionAbortIsIgnoredIfThereAreNoQueuedRequests() { + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + guard var (connections, state) = try? MockConnectionPool.http1(elg: elg, numberOfConnections: 8) else { + return XCTFail("Test setup failed") + } + + XCTAssertEqual(connections.parked, 8) + + // close a leased connection == abort + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + guard let connectionToAbort = connections.newestParkedConnection else { + return XCTFail("Expected to have a parked connection") + } + let action = state.executeRequest(request) + XCTAssertEqual(action.connection, .cancelTimeoutTimer(connectionToAbort.id)) + XCTAssertNoThrow(try connections.activateConnection(connectionToAbort.id)) + XCTAssertEqual(action.request, .executeRequest(request, connectionToAbort, cancelTimeout: false)) + XCTAssertNoThrow(try connections.execute(mockRequest, on: connectionToAbort)) + XCTAssertEqual(connections.parked, 7) + XCTAssertEqual(connections.used, 1) + XCTAssertNoThrow(try connections.abortConnection(connectionToAbort.id)) + XCTAssertEqual(state.connectionClosed(connectionToAbort.id), .none) + XCTAssertEqual(connections.parked, 7) + XCTAssertEqual(connections.used, 0) + } + + func testConnectionCloseLeadsToTumbleWeedIfThereNoQueuedRequests() { + let elg = EmbeddedEventLoopGroup(loops: 1) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + guard var (connections, state) = try? MockConnectionPool.http1(elg: elg, numberOfConnections: 8) else { + return XCTFail("Test setup failed") + } + + XCTAssertEqual(connections.parked, 8) + + // close a parked connection + guard let connectionToClose = connections.randomParkedConnection() else { + return XCTFail("Expected to have a parked connection") + } + XCTAssertNoThrow(try connections.closeConnection(connectionToClose)) + XCTAssertEqual(state.connectionClosed(connectionToClose.id), .none) + XCTAssertEqual(connections.parked, 7) + } + + func testConnectionAbortLeadsToNewConnectionsIfThereAreQueuedRequests() { + let elg = EmbeddedEventLoopGroup(loops: 8) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + guard var (connections, state) = try? MockConnectionPool.http1(elg: elg, numberOfConnections: 8) else { + return XCTFail("Test setup failed") + } + + XCTAssertEqual(connections.parked, 8) + + // Add eight requests to fill all connections + for _ in 0..<8 { + let eventLoop = elg.next() + guard let expectedConnection = connections.newestParkedConnection(for: eventLoop) ?? connections.newestParkedConnection else { + return XCTFail("Expected to still have connections available") + } + + let mockRequest = MockHTTPRequest(eventLoop: eventLoop) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + + XCTAssertEqual(action.connection, .cancelTimeoutTimer(expectedConnection.id)) + XCTAssertEqual(action.request, .executeRequest(request, expectedConnection, cancelTimeout: false)) + + XCTAssertNoThrow(try connections.activateConnection(expectedConnection.id)) + XCTAssertNoThrow(try connections.execute(mockRequest, on: expectedConnection)) + } + + // Add 100 requests to fill request queue + var queuedRequestsOrder = CircularBuffer() + var queuer = MockRequestQueuer() + for _ in 0..<100 { + let eventLoop = elg.next() + + let mockRequest = MockHTTPRequest(eventLoop: eventLoop, requiresEventLoopForChannel: false) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + + XCTAssertEqual(.none, action.connection) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + queuedRequestsOrder.append(request.id) + } + + while let closedConnection = connections.randomLeasedConnection() { + XCTAssertNoThrow(try connections.abortConnection(closedConnection.id)) + XCTAssertEqual(connections.parked, 0) + let action = state.connectionClosed(closedConnection.id) + + switch action.connection { + case .createConnection(let newConnectionID, on: let eventLoop): + XCTAssertEqual(action.request, .none) + XCTAssertNoThrow(try connections.createConnection(newConnectionID, on: eventLoop)) + XCTAssertEqual(connections.starting, 1) + + var maybeNewConnection: HTTPConnectionPool.Connection? + XCTAssertNoThrow(maybeNewConnection = try connections.succeedConnectionCreationHTTP1(newConnectionID)) + guard let newConnection = maybeNewConnection else { return XCTFail("Expected to get a new connection") } + let afterRecreationAction = state.newHTTP1ConnectionCreated(newConnection) + XCTAssertEqual(afterRecreationAction.connection, .none) + guard case .executeRequest(let request, newConnection, cancelTimeout: true) = afterRecreationAction.request else { + return XCTFail("Unexpected request action: \(action.request)") + } + + XCTAssertEqual(request.id, queuedRequestsOrder.popFirst()) + XCTAssertNoThrow(try connections.execute(queuer.get(request.id, request: request.__testOnly_wrapped_request()), on: newConnection)) + + case .none: + XCTAssert(queuer.isEmpty) + default: + XCTFail("Unexpected connection action: \(action.connection)") + } + } + } + + func testParkedConnectionTimesOut() { + let elg = EmbeddedEventLoopGroup(loops: 1) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + guard var (connections, state) = try? MockConnectionPool.http1(elg: elg, numberOfConnections: 1) else { + return XCTFail("Test setup failed") + } + + guard let connection = connections.randomParkedConnection() else { + return XCTFail("Expected to have one parked connection") + } + + let action = state.connectionIdleTimeout(connection.id) + XCTAssertEqual(action.connection, .closeConnection(connection, isShutdown: .no)) + XCTAssertEqual(action.request, .none) + XCTAssertNoThrow(try connections.closeConnection(connection)) + } + + func testConnectionPoolFullOfParkedConnectionsIsShutdownImmediately() { + let elg = EmbeddedEventLoopGroup(loops: 8) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + guard var (connections, state) = try? MockConnectionPool.http1(elg: elg, numberOfConnections: 8) else { + return XCTFail("Test setup failed") + } + + XCTAssertEqual(connections.parked, 8) + let action = state.shutdown() + XCTAssertEqual(.none, action.request) + + guard case .cleanupConnections(let closeContext, isShutdown: .yes(unclean: false)) = action.connection else { + return XCTFail("Unexpected connection event: \(action.connection)") + } + + XCTAssertEqual(closeContext.close.count, 8) + + for connection in closeContext.close { + XCTAssertNoThrow(try connections.closeConnection(connection)) + } + + XCTAssertEqual(connections.count, 0) + } + + func testParkedConnectionTimesOutButIsAlsoClosedByRemote() { + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + guard var (connections, state) = try? MockConnectionPool.http1(elg: elg, numberOfConnections: 1) else { + return XCTFail("Test setup failed") + } + + guard let connection = connections.randomParkedConnection() else { + return XCTFail("Expected to have one parked connection") + } + + // triggered by remote peer + XCTAssertNoThrow(try connections.abortConnection(connection.id)) + XCTAssertEqual(state.connectionClosed(connection.id), .none) + + // triggered by timer + XCTAssertEqual(state.connectionIdleTimeout(connection.id), .none) + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift index 90bb43a2c..f4ebd4ebb 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift @@ -29,7 +29,7 @@ class HTTPConnectionPool_RequestQueueTests: XCTestCase { XCTAssertFalse(queue.isEmpty) XCTAssertFalse(queue.isEmpty(for: nil)) XCTAssertEqual(queue.count, 1) - XCTAssertEqual(queue.count(for: nil), 1) + XCTAssertEqual(queue.generalPurposeCount, 1) let req2 = MockScheduledRequest(requiredEventLoop: nil) let req2ID = queue.push(.init(req2)) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift index 814b0d078..716a13358 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift @@ -53,6 +53,12 @@ final class EmbeddedEventLoopGroup: EventLoopGroup { } } +extension HTTPConnectionPool.Request: Equatable { + public static func == (lhs: Self, rhs: Self) -> Bool { + return lhs.id == rhs.id + } +} + extension HTTPConnectionPool.HTTP1Connections.ConnectionUse: Equatable { public static func == (lhs: Self, rhs: Self) -> Bool { switch (lhs, rhs) { @@ -65,3 +71,55 @@ extension HTTPConnectionPool.HTTP1Connections.ConnectionUse: Equatable { } } } + +extension HTTPConnectionPool.StateMachine.ConnectionAction: Equatable { + public static func == (lhs: Self, rhs: Self) -> Bool { + switch (lhs, rhs) { + case (.createConnection(let lhsConnID, on: let lhsEL), .createConnection(let rhsConnID, on: let rhsEL)): + return lhsConnID == rhsConnID && lhsEL === rhsEL + case (.scheduleBackoffTimer(let lhsConnID, let lhsBackoff, on: let lhsEL), .scheduleBackoffTimer(let rhsConnID, let rhsBackoff, on: let rhsEL)): + return lhsConnID == rhsConnID && lhsBackoff == rhsBackoff && lhsEL === rhsEL + case (.scheduleTimeoutTimer(let lhsConnID, on: let lhsEL), .scheduleTimeoutTimer(let rhsConnID, on: let rhsEL)): + return lhsConnID == rhsConnID && lhsEL === rhsEL + case (.cancelTimeoutTimer(let lhsConnID), .cancelTimeoutTimer(let rhsConnID)): + return lhsConnID == rhsConnID + case (.closeConnection(let lhsConn, isShutdown: let lhsShut), .closeConnection(let rhsConn, isShutdown: let rhsShut)): + return lhsConn == rhsConn && lhsShut == rhsShut + case (.cleanupConnections(let lhsContext, isShutdown: let lhsShut), .cleanupConnections(let rhsContext, isShutdown: let rhsShut)): + return lhsContext == rhsContext && lhsShut == rhsShut + case (.none, .none): + return true + default: + return false + } + } +} + +extension HTTPConnectionPool.StateMachine.RequestAction: Equatable { + public static func == (lhs: Self, rhs: Self) -> Bool { + switch (lhs, rhs) { + case (.executeRequest(let lhsReq, let lhsConn, let lhsReqID), .executeRequest(let rhsReq, let rhsConn, let rhsReqID)): + return lhsReq == rhsReq && lhsConn == rhsConn && lhsReqID == rhsReqID + case (.executeRequestsAndCancelTimeouts(let lhsReqs, let lhsConn), .executeRequestsAndCancelTimeouts(let rhsReqs, let rhsConn)): + return lhsReqs.elementsEqual(rhsReqs, by: { $0 == $1 }) && lhsConn == rhsConn + case (.failRequest(let lhsReq, _, cancelTimeout: let lhsReqID), .failRequest(let rhsReq, _, cancelTimeout: let rhsReqID)): + return lhsReq == rhsReq && lhsReqID == rhsReqID + case (.failRequestsAndCancelTimeouts(let lhsReqs, _), .failRequestsAndCancelTimeouts(let rhsReqs, _)): + return lhsReqs.elementsEqual(rhsReqs, by: { $0 == $1 }) + case (.scheduleRequestTimeout(for: let lhsReq, on: let lhsEL), .scheduleRequestTimeout(for: let rhsReq, on: let rhsEL)): + return lhsReq == rhsReq && lhsEL === rhsEL + case (.cancelRequestTimeout(let lhsReqID), .cancelRequestTimeout(let rhsReqID)): + return lhsReqID == rhsReqID + case (.none, .none): + return true + default: + return false + } + } +} + +extension HTTPConnectionPool.StateMachine.Action: Equatable { + public static func == (lhs: Self, rhs: Self) -> Bool { + lhs.connection == rhs.connection && lhs.request == rhs.request + } +} diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift index acee1fa53..eb49a6fa6 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift @@ -467,6 +467,79 @@ extension MockConnectionPool { self.connections.removeValue(forKey: connectionID) return connectionID } + + enum SetupError: Error { + case totalNumberOfConnectionsMustBeLowerThanIdle + case expectedConnectionToBeCreated + case expectedRequestToBeAddedToQueue + case expectedPreviouslyQueuedRequestToBeRunNow + case expectedNoConnectionAction + case expectedConnectionToBeParked + } + + static func http1( + elg: EventLoopGroup, + on eventLoop: EventLoop? = nil, + numberOfConnections: Int, + maxNumberOfConnections: Int = 8 + ) throws -> (Self, HTTPConnectionPool.StateMachine) { + var state = HTTPConnectionPool.StateMachine( + eventLoopGroup: elg, + idGenerator: .init(), + maximumConcurrentHTTP1Connections: maxNumberOfConnections + ) + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + + for _ in 0..