diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift new file mode 100644 index 000000000..3d0e08e51 --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -0,0 +1,628 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +extension HTTPConnectionPool { + /// Represents the state of a single HTTP/1.1 connection + private struct HTTP1ConnectionState { + enum State { + /// the connection is creating a connection. Valid transitions are to: .backingOff, .idle, and .closed + case starting + /// the connection is waiting to retry the establishing a connection. Valid transitions are to: .closed. + /// This means, the connection can be removed from the connections without cancelling external + /// state. The connection state can then be replaced by a new one. + case backingOff + /// the connection is idle for a new request. Valid transitions to: .leased and .closed + case idle(Connection, since: NIODeadline) + /// the connection is leased and running for a request. Valid transitions to: .idle and .closed + case leased(Connection) + /// the connection is closed. final state. + case closed + } + + private var state: State + let connectionID: Connection.ID + let eventLoop: EventLoop + + init(connectionID: Connection.ID, eventLoop: EventLoop) { + self.connectionID = connectionID + self.eventLoop = eventLoop + self.state = .starting + } + + var isConnecting: Bool { + switch self.state { + case .starting: + return true + case .backingOff, .closed, .idle, .leased: + return false + } + } + + var isBackingOff: Bool { + switch self.state { + case .backingOff: + return true + case .starting, .closed, .idle, .leased: + return false + } + } + + var isIdle: Bool { + switch self.state { + case .idle: + return true + case .backingOff, .starting, .leased, .closed: + return false + } + } + + var isLeased: Bool { + switch self.state { + case .leased: + return true + case .backingOff, .starting, .idle, .closed: + return false + } + } + + var idleSince: NIODeadline? { + switch self.state { + case .idle(_, since: let lastReturn): + return lastReturn + case .backingOff, .starting, .leased, .closed: + return nil + } + } + + var isClosed: Bool { + switch self.state { + case .closed: + return true + case .idle, .starting, .leased, .backingOff: + return false + } + } + + mutating func connected(_ connection: Connection) { + switch self.state { + case .starting: + self.state = .idle(connection, since: .now()) + case .backingOff, .idle, .leased, .closed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + /// The connection failed to start + mutating func failedToConnect() { + switch self.state { + case .starting: + self.state = .backingOff + case .backingOff, .idle, .leased, .closed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func lease() -> Connection { + switch self.state { + case .idle(let connection, since: _): + self.state = .leased(connection) + return connection + case .backingOff, .starting, .leased, .closed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func release() { + switch self.state { + case .leased(let connection): + self.state = .idle(connection, since: .now()) + case .backingOff, .starting, .idle, .closed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func close() -> Connection { + switch self.state { + case .idle(let connection, since: _): + self.state = .closed + return connection + case .backingOff, .starting, .leased, .closed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func fail() { + switch self.state { + case .starting, .backingOff, .idle, .leased: + self.state = .closed + case .closed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + enum CleanupAction { + case removeConnection + case keepConnection + } + + /// Cleanup the current connection for shutdown. + /// + /// This method is called, when the connections shall shutdown. Depending on the state + /// the connection is in, it adds itself to one of the arrays that are used to signal shutdown + /// intent to the underlying connections. Connections that are backing off can be easily + /// dropped (since, we only need to cancel the backoff timer), connections that are leased + /// need to be cancelled (notifying the `ChannelHandler` that we want to cancel the + /// running request), connections that are idle can be closed right away. Sadly we can't + /// cancel connection starts right now. For this reason we need to wait for them to succeed + /// or fail until we finalize the shutdown. + /// + /// - Parameter context: A cleanup context to add the connection to based on its state. + /// - Returns: A cleanup action indicating if the connection can be removed from the + /// connection list. + func cleanup(_ context: inout CleanupContext) -> CleanupAction { + switch self.state { + case .backingOff: + context.connectBackoff.append(self.connectionID) + return .removeConnection + case .starting: + return .keepConnection + case .idle(let connection, since: _): + context.close.append(connection) + return .removeConnection + case .leased(let connection): + context.cancel.append(connection) + return .keepConnection + case .closed: + preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)") + } + } + } + + /// A structure to hold the currently active HTTP/1.1 connections. + /// + /// The general purpose connection pool (pool for requests that don't have special `EventLoop` + /// requirements) will grow up until `maximumConcurrentConnections`. If requests have + /// special `EventLoop` requirements overflow connections might be opened. + /// + /// All connections live in the same `connections` array. In the front are the general purpose + /// connections. In the back (starting with the `overflowIndex`) are the connections for + /// requests with special needs. + struct HTTP1Connections { + /// The maximum number of connections in the general purpose pool. + private let maximumConcurrentConnections: Int + /// A connectionID generator. + private let generator: Connection.ID.Generator + /// The connections states + private var connections: [HTTP1ConnectionState] + /// The index after which you will find the connections for requests with `EventLoop` + /// requirements in `connections`. + private var overflowIndex: Array.Index + + init(maximumConcurrentConnections: Int, generator: Connection.ID.Generator) { + self.connections = [] + self.connections.reserveCapacity(maximumConcurrentConnections) + self.overflowIndex = self.connections.endIndex + self.maximumConcurrentConnections = maximumConcurrentConnections + self.generator = generator + } + + var stats: Stats { + var stats = Stats() + // all additions here can be unchecked, since we will have at max self.connections.count + // which itself is an Int. For this reason we will never overflow. + for connectionState in self.connections { + if connectionState.isConnecting { + stats.connecting &+= 1 + } else if connectionState.isBackingOff { + stats.backingOff &+= 1 + } else if connectionState.isLeased { + stats.leased &+= 1 + } else if connectionState.isIdle { + stats.idle &+= 1 + } + } + return stats + } + + var isEmpty: Bool { + self.connections.isEmpty + } + + var canGrow: Bool { + self.overflowIndex < self.maximumConcurrentConnections + } + + var startingGeneralPurposeConnections: Int { + var connecting = 0 + for connectionState in self.connections[0.. Int { + return self.connections[self.overflowIndex.. Connection.ID { + precondition(self.canGrow) + let connection = HTTP1ConnectionState(connectionID: self.generator.next(), eventLoop: eventLoop) + self.connections.insert(connection, at: self.overflowIndex) + self.overflowIndex = self.connections.index(after: self.overflowIndex) + return connection.connectionID + } + + mutating func createNewOverflowConnection(on eventLoop: EventLoop) -> Connection.ID { + let connection = HTTP1ConnectionState(connectionID: self.generator.next(), eventLoop: eventLoop) + self.connections.append(connection) + return connection.connectionID + } + + /// A new HTTP/1.1 connection was established. + /// + /// This will put the connection into the idle state. + /// + /// - Parameter connection: The new established connection. + /// - Returns: An index and an IdleConnectionContext to determine the next action for the now idle connection. + /// Call ``leaseConnection(at:)`` or ``closeConnection(at:)`` with the supplied index after + /// this. + mutating func newHTTP1ConnectionEstablished(_ connection: Connection) -> (Int, IdleConnectionContext) { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connection.id }) else { + preconditionFailure("There is a new connection that we didn't request!") + } + precondition(connection.eventLoop === self.connections[index].eventLoop, "Expected the new connection to be on EL") + self.connections[index].connected(connection) + let context = self.generateIdleConnectionContextForConnection(at: index) + return (index, context) + } + + /// Move the HTTP1ConnectionState to backingOff. + /// + /// - Parameter connectionID: The connectionID of the failed connection attempt + /// - Returns: The eventLoop on which to schedule the backoff timer + mutating func backoffNextConnectionAttempt(_ connectionID: Connection.ID) -> EventLoop { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { + preconditionFailure("We tried to create a new connection that we know nothing about?") + } + + self.connections[index].failedToConnect() + return self.connections[index].eventLoop + } + + // MARK: Leasing and releasing + + /// Lease a connection on the preferred EventLoop + /// + /// If no connection is available on the preferred EventLoop, a connection on + /// another eventLoop might be returned, if one is available. + /// + /// - Parameter eventLoop: The preferred EventLoop for the request + /// - Returns: A connection to execute a request on. + mutating func leaseConnection(onPreferred eventLoop: EventLoop) -> Connection? { + guard let index = self.findIdleConnection(onPreferred: eventLoop) else { + return nil + } + + return self.connections[index].lease() + } + + /// Lease a connection on the required EventLoop + /// + /// If no connection is available on the required EventLoop nil is returned. + /// + /// - Parameter eventLoop: The required EventLoop for the request + /// - Returns: A connection to execute a request on. + mutating func leaseConnection(onRequired eventLoop: EventLoop) -> Connection? { + guard let index = self.findIdleConnection(onRequired: eventLoop) else { + return nil + } + + return self.connections[index].lease() + } + + mutating func leaseConnection(at index: Int) -> Connection { + self.connections[index].lease() + } + + /// A new HTTP/1.1 connection was released. + /// + /// This will put the position into the idle state. + /// + /// - Parameter connectionID: The released connection's id. + /// - Returns: An index and an IdleConnectionContext to determine the next action for the now idle connection. + /// Call ``leaseConnection(at:)`` or ``closeConnection(at:)`` with the supplied index after + /// this. If you want to park the connection no further call is required. + mutating func releaseConnection(_ connectionID: Connection.ID) -> (Int, IdleConnectionContext) { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { + preconditionFailure("A connection that we don't know was released? Something is very wrong...") + } + + self.connections[index].release() + let context = self.generateIdleConnectionContextForConnection(at: index) + return (index, context) + } + + // MARK: Connection close/removal + + /// Closes the connection at the given index. This will also remove the connection right away. + mutating func closeConnection(at index: Int) -> Connection { + if index < self.overflowIndex { + self.overflowIndex = self.connections.index(before: self.overflowIndex) + } + var connectionState = self.connections.remove(at: index) + return connectionState.close() + } + + mutating func removeConnection(at index: Int) { + precondition(self.connections[index].isClosed) + if index < self.overflowIndex { + self.overflowIndex = self.connections.index(before: self.overflowIndex) + } + self.connections.remove(at: index) + } + + mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> Connection? { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { + // because of a race this connection (connection close runs against trigger of timeout) + // was already removed from the state machine. + return nil + } + + guard self.connections[index].isIdle else { + // connection is not idle anymore, we may have just leased it for a request + return nil + } + + return self.closeConnection(at: index) + } + + mutating func replaceConnection(at index: Int) -> (Connection.ID, EventLoop) { + precondition(self.connections[index].isClosed) + let newConnection = HTTP1ConnectionState( + connectionID: self.generator.next(), + eventLoop: self.connections[index].eventLoop + ) + + self.connections[index] = newConnection + return (newConnection.connectionID, newConnection.eventLoop) + } + + // MARK: Connection failure + + /// Fail a connection. Call this method, if a connection suddenly closed, did not startup correctly, + /// or the backoff time is done. + /// + /// This will put the position into the closed state. + /// + /// - Parameter connectionID: The failed connection's id. + /// - Returns: An index and an IdleConnectionContext to determine the next action for the now closed connection. + /// You must call ``removeConnection(at:)`` or ``replaceConnection(at:)`` with the + /// supplied index after this. + mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext) { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { + preconditionFailure("We tried to fail a new connection that we know nothing about?") + } + + let use: ConnectionUse + self.connections[index].fail() + let eventLoop = self.connections[index].eventLoop + let starting: Int + if index < self.overflowIndex { + use = .generalPurpose + starting = self.startingGeneralPurposeConnections + } else { + use = .eventLoop(eventLoop) + starting = self.startingEventLoopConnections(on: eventLoop) + } + + let context = FailedConnectionContext( + eventLoop: eventLoop, + use: use, + connectionsStartingForUseCase: starting + ) + return (index, context) + } + + // MARK: Shutdown + + mutating func shutdown() -> CleanupContext { + var cleanupContext = CleanupContext() + let initialOverflowIndex = self.overflowIndex + + self.connections = self.connections.enumerated().compactMap { index, connectionState in + switch connectionState.cleanup(&cleanupContext) { + case .removeConnection: + if index < initialOverflowIndex { + self.overflowIndex = self.connections.index(before: self.overflowIndex) + } + return nil + + case .keepConnection: + return connectionState + } + } + + return cleanupContext + } + + // MARK: - Private functions - + + private func generateIdleConnectionContextForConnection(at index: Int) -> IdleConnectionContext { + precondition(self.connections[index].isIdle) + let eventLoop = self.connections[index].eventLoop + let use: ConnectionUse + if index < self.overflowIndex { + use = .generalPurpose + } else { + use = .eventLoop(eventLoop) + } + return IdleConnectionContext(eventLoop: eventLoop, use: use) + } + + private func findIdleConnection(onPreferred preferredEL: EventLoop) -> Int? { + var eventLoopMatch: (Int, NIODeadline)? + var goodMatch: (Int, NIODeadline)? + + // To find an appropriate connection we iterate all existing connections. + // While we do this we try to find the best fitting connection for our request. + // + // A perfect match, runs on the same eventLoop and has been idle the shortest amount + // of time (returned the most recently). + // + // An okay match is not on the same eventLoop, and has been idle for the shortest + // time. + for (index, conn) in self.connections.enumerated() { + guard let connReturn = conn.idleSince else { + continue + } + + if conn.eventLoop === preferredEL { + switch eventLoopMatch { + case .none: + eventLoopMatch = (index, connReturn) + case .some((_, let existingMatchReturn)) where connReturn > existingMatchReturn: + eventLoopMatch = (index, connReturn) + default: + break + } + } else { + switch goodMatch { + case .none: + goodMatch = (index, connReturn) + case .some((_, let existingMatchReturn)): + // We don't require a specific eventLoop. For this reason we want to pick a + // matching eventLoop that has been idle the shortest. + if connReturn > existingMatchReturn { + goodMatch = (index, connReturn) + } + } + } + } + + if let (index, _) = eventLoopMatch { + return index + } + + if let (index, _) = goodMatch { + return index + } + + return nil + } + + func findIdleConnection(onRequired requiredEL: EventLoop) -> Int? { + var match: (Int, NIODeadline)? + + // To find an appropriate connection we iterate all existing connections. + // While we do this we try to find the best fitting connection for our request. + // + // A match, runs on the same eventLoop and has been idle the shortest amount of time. + for (index, conn) in self.connections.enumerated() { + // 1. Ensure we are on the correct EL. + guard conn.eventLoop === requiredEL else { + continue + } + + // 2. Ensure the connection is idle + guard let connReturn = conn.idleSince else { + continue + } + + switch match { + case .none: + match = (index, connReturn) + case .some((_, let existingMatchReturn)) where connReturn > existingMatchReturn: + // the currently iterated eventLoop has been idle for a shorter amount than + // the current best match. + match = (index, connReturn) + default: + // the currently iterated eventLoop has been idle for a longer amount than + // the current best match. We continue the iteration. + continue + } + } + + if let (index, _) = match { + return index + } + + return nil + } + } + + struct Stats { + var idle: Int = 0 + var leased: Int = 0 + var connecting: Int = 0 + var backingOff: Int = 0 + } + + /// The pool cleanup todo list. + struct CleanupContext: Equatable { + /// the connections to close right away. These are idle. + var close: [Connection] + + /// the connections that currently run a request that needs to be cancelled to close the connections + var cancel: [Connection] + + /// the connections that are backing off from connection creation + var connectBackoff: [Connection.ID] + + init(close: [Connection] = [], cancel: [Connection] = [], connectBackoff: [Connection.ID] = []) { + self.close = close + self.cancel = cancel + self.connectBackoff = connectBackoff + } + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest+XCTest.swift new file mode 100644 index 000000000..d6d7d8176 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest+XCTest.swift @@ -0,0 +1,40 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// +// HTTPConnectionPool+HTTP1ConnectionsTest+XCTest.swift +// +import XCTest + +/// +/// NOTE: This file was generated by generate_linux_tests.rb +/// +/// Do NOT edit this file directly as it will be regenerated automatically when needed. +/// + +extension HTTPConnectionPool_HTTP1ConnectionsTests { + static var allTests: [(String, (HTTPConnectionPool_HTTP1ConnectionsTests) -> () throws -> Void)] { + return [ + ("testCreatingConnections", testCreatingConnections), + ("testCreatingConnectionAndFailing", testCreatingConnectionAndFailing), + ("testLeaseConnectionOnPreferredAndAvailableEL", testLeaseConnectionOnPreferredAndAvailableEL), + ("testLeaseConnectionOnPreferredButUnavailableEL", testLeaseConnectionOnPreferredButUnavailableEL), + ("testLeaseConnectionOnRequiredButUnavailableEL", testLeaseConnectionOnRequiredButUnavailableEL), + ("testLeaseConnectionOnRequiredAndAvailableEL", testLeaseConnectionOnRequiredAndAvailableEL), + ("testCloseConnectionIfIdle", testCloseConnectionIfIdle), + ("testCloseConnectionIfIdleButLeasedRaceCondition", testCloseConnectionIfIdleButLeasedRaceCondition), + ("testCloseConnectionIfIdleButClosedRaceCondition", testCloseConnectionIfIdleButClosedRaceCondition), + ("testShutdown", testShutdown), + ] + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift new file mode 100644 index 000000000..30d10997c --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift @@ -0,0 +1,336 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +import NIOCore +import NIOEmbedded +import XCTest + +class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { + func testCreatingConnections() { + let elg = EmbeddedEventLoopGroup(loops: 4) + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: .init()) + + let el1 = elg.next() + let el2 = elg.next() + + // general purpose connection + XCTAssertEqual(connections.startingGeneralPurposeConnections, 0) + XCTAssertEqual(connections.startingEventLoopConnections(on: el1), 0) + let conn1ID = connections.createNewConnection(on: el1) + XCTAssertEqual(connections.startingGeneralPurposeConnections, 1) + XCTAssertEqual(connections.startingEventLoopConnections(on: el1), 0) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (conn1Index, conn1CreatedContext) = connections.newHTTP1ConnectionEstablished(conn1) + XCTAssertEqual(conn1CreatedContext.use, .generalPurpose) + XCTAssert(conn1CreatedContext.eventLoop === el1) + XCTAssertEqual(connections.leaseConnection(at: conn1Index), conn1) + XCTAssertEqual(connections.startingGeneralPurposeConnections, 0) + + // eventLoop connection + let conn2ID = connections.createNewOverflowConnection(on: el2) + XCTAssertEqual(connections.startingGeneralPurposeConnections, 0) + XCTAssertEqual(connections.startingEventLoopConnections(on: el2), 1) + let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el2) + let (conn2Index, conn2CreatedContext) = connections.newHTTP1ConnectionEstablished(conn2) + XCTAssertEqual(conn2CreatedContext.use, .eventLoop(el2)) + XCTAssert(conn2CreatedContext.eventLoop === el2) + XCTAssertEqual(connections.leaseConnection(at: conn2Index), conn2) + XCTAssertEqual(connections.startingEventLoopConnections(on: el2), 0) + } + + func testCreatingConnectionAndFailing() { + let elg = EmbeddedEventLoopGroup(loops: 4) + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: .init()) + + let el1 = elg.next() + let el2 = elg.next() + + // general purpose connection + XCTAssertEqual(connections.startingGeneralPurposeConnections, 0) + XCTAssertEqual(connections.startingEventLoopConnections(on: el1), 0) + let conn1ID = connections.createNewConnection(on: el1) + XCTAssertEqual(conn1ID, 0) + XCTAssertEqual(connections.startingGeneralPurposeConnections, 1) + XCTAssertEqual(connections.startingEventLoopConnections(on: el1), 0) + // connection failed to start. 1. backoff + let backoff1EL = connections.backoffNextConnectionAttempt(conn1ID) + XCTAssert(backoff1EL === el1) + // backoff done. 2. decide what's next + let (conn1FailIndex, conn1FailContext) = connections.failConnection(conn1ID) + XCTAssert(conn1FailContext.eventLoop === el1) + XCTAssertEqual(conn1FailContext.use, .generalPurpose) + XCTAssertEqual(conn1FailContext.connectionsStartingForUseCase, 0) + let (replaceConn1ID, replaceConn1EL) = connections.replaceConnection(at: conn1FailIndex) + XCTAssert(replaceConn1EL === el1) + XCTAssertEqual(replaceConn1ID, 1) + + // eventLoop connection + let conn2ID = connections.createNewOverflowConnection(on: el2) + // the replacement connection is starting + XCTAssertEqual(connections.startingGeneralPurposeConnections, 1) + XCTAssertEqual(connections.startingEventLoopConnections(on: el2), 1) + let backoff2EL = connections.backoffNextConnectionAttempt(conn2ID) + XCTAssert(backoff2EL === el2) + let (conn2FailIndex, conn2FailContext) = connections.failConnection(conn2ID) + XCTAssert(conn2FailContext.eventLoop === el2) + XCTAssertEqual(conn2FailContext.use, .eventLoop(el2)) + XCTAssertEqual(conn2FailContext.connectionsStartingForUseCase, 0) + connections.removeConnection(at: conn2FailIndex) + // the replacement connection is still starting + XCTAssertEqual(connections.startingGeneralPurposeConnections, 1) + } + + func testLeaseConnectionOnPreferredAndAvailableEL() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let el4 = elg.next() + + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: .init()) + + for el in [el1, el2, el3, el4] { + XCTAssertEqual(connections.startingGeneralPurposeConnections, 0) + XCTAssertEqual(connections.startingEventLoopConnections(on: el), 0) + let connID = connections.createNewConnection(on: el) + XCTAssertEqual(connections.startingGeneralPurposeConnections, 1) + XCTAssertEqual(connections.startingEventLoopConnections(on: el), 0) + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el) + let (_, connCreatedContext) = connections.newHTTP1ConnectionEstablished(conn) + XCTAssertEqual(connCreatedContext.use, .generalPurpose) + XCTAssert(connCreatedContext.eventLoop === el) + XCTAssertEqual(connections.startingGeneralPurposeConnections, 0) + } + + let connection = connections.leaseConnection(onPreferred: el1) + XCTAssertEqual(connection, .__testOnly_connection(id: 0, eventLoop: el1)) + } + + func testLeaseConnectionOnPreferredButUnavailableEL() { + let elg = EmbeddedEventLoopGroup(loops: 5) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let el4 = elg.next() + let el5 = elg.next() + + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: .init()) + + for el in [el1, el2, el3, el4] { + XCTAssertEqual(connections.startingGeneralPurposeConnections, 0) + XCTAssertEqual(connections.startingEventLoopConnections(on: el), 0) + let connID = connections.createNewConnection(on: el) + XCTAssertEqual(connections.startingGeneralPurposeConnections, 1) + XCTAssertEqual(connections.startingEventLoopConnections(on: el), 0) + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el) + let (_, connCreatedContext) = connections.newHTTP1ConnectionEstablished(conn) + XCTAssertEqual(connCreatedContext.use, .generalPurpose) + XCTAssert(connCreatedContext.eventLoop === el) + XCTAssertEqual(connections.startingGeneralPurposeConnections, 0) + } + + let connection = connections.leaseConnection(onPreferred: el5) + XCTAssertEqual(connection, .__testOnly_connection(id: 3, eventLoop: el4)) + } + + func testLeaseConnectionOnRequiredButUnavailableEL() { + let elg = EmbeddedEventLoopGroup(loops: 5) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let el4 = elg.next() + let el5 = elg.next() + + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: .init()) + + for el in [el1, el2, el3, el4] { + XCTAssertEqual(connections.startingGeneralPurposeConnections, 0) + XCTAssertEqual(connections.startingEventLoopConnections(on: el), 0) + let connID = connections.createNewConnection(on: el) + XCTAssertEqual(connections.startingGeneralPurposeConnections, 1) + XCTAssertEqual(connections.startingEventLoopConnections(on: el), 0) + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el) + let (_, connCreatedContext) = connections.newHTTP1ConnectionEstablished(conn) + XCTAssertEqual(connCreatedContext.use, .generalPurpose) + XCTAssert(connCreatedContext.eventLoop === el) + XCTAssertEqual(connections.startingGeneralPurposeConnections, 0) + } + + let connection = connections.leaseConnection(onRequired: el5) + XCTAssertEqual(connection, .none) + } + + func testLeaseConnectionOnRequiredAndAvailableEL() { + let elg = EmbeddedEventLoopGroup(loops: 2) + let el1 = elg.next() + let el2 = elg.next() + + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: .init()) + + for el in [el1, el1, el1, el1, el2] { + let connID = connections.createNewConnection(on: el) + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el) + _ = connections.newHTTP1ConnectionEstablished(conn) + } + + // 1. get el from general pool, even though el is required! + guard let lease1 = connections.leaseConnection(onRequired: el1) else { + return XCTFail("Expected to get a connection at this point.") + } + // the last created connection on the correct el is the shortest amount idle. we should use this + XCTAssertEqual(lease1, .__testOnly_connection(id: 3, eventLoop: el1)) + _ = connections.releaseConnection(lease1.id) + + // 2. create specialized el connection + let connID5 = connections.createNewOverflowConnection(on: el1) + XCTAssertEqual(connections.startingEventLoopConnections(on: el1), 1) + let conn5: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID5, eventLoop: el1) + _ = connections.newHTTP1ConnectionEstablished(conn5) + XCTAssertEqual(connections.startingEventLoopConnections(on: el1), 0) + + // 3. get el from specialized pool, since it is the newest! + guard let lease2 = connections.leaseConnection(onRequired: el1) else { + return XCTFail("Expected to get a connection at this point.") + } + XCTAssertEqual(lease2, conn5) + _ = connections.releaseConnection(lease2.id) + + // 4. create another general purpose connection on the correct el + let connID6 = connections.createNewConnection(on: el1) + let conn6: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID6, eventLoop: el1) + _ = connections.newHTTP1ConnectionEstablished(conn6) + + // 5. get el from general pool, since it is the newest! + guard let lease3 = connections.leaseConnection(onRequired: el1) else { + return XCTFail("Expected to get a connection at this point.") + } + // the last created connection is the shortest amount idle. we should use this + XCTAssertEqual(lease3, conn6) + + _ = connections.releaseConnection(lease3.id) + } + + func testCloseConnectionIfIdle() { + let elg = EmbeddedEventLoopGroup(loops: 1) + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: .init()) + + let el1 = elg.next() + + // connection is idle + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + _ = connections.newHTTP1ConnectionEstablished(conn1) + XCTAssertEqual(connections.closeConnectionIfIdle(conn1ID), conn1) + + // connection is not idle + let conn2ID = connections.createNewConnection(on: el1) + let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el1) + let (conn2Index, _) = connections.newHTTP1ConnectionEstablished(conn2) + XCTAssertEqual(connections.leaseConnection(at: conn2Index), conn2) + XCTAssertNil(connections.closeConnectionIfIdle(conn2ID)) + } + + func testCloseConnectionIfIdleButLeasedRaceCondition() { + let elg = EmbeddedEventLoopGroup(loops: 1) + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: .init()) + + let el1 = elg.next() + + // connection is idle + let connID = connections.createNewConnection(on: el1) + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + _ = connections.newHTTP1ConnectionEstablished(conn) + + // connection is leased + let lease = connections.leaseConnection(onPreferred: el1) + XCTAssertEqual(lease, conn) + + // timeout arrives minimal to late + XCTAssertEqual(connections.closeConnectionIfIdle(connID), nil) + } + + func testCloseConnectionIfIdleButClosedRaceCondition() { + let elg = EmbeddedEventLoopGroup(loops: 1) + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: .init()) + + let el1 = elg.next() + + // connection is idle + let connID = connections.createNewConnection(on: el1) + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + _ = connections.newHTTP1ConnectionEstablished(conn) + _ = connections.failConnection(connID) + + // timeout arrives minimal to late + XCTAssertEqual(connections.closeConnectionIfIdle(connID), nil) + } + + func testShutdown() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let el4 = elg.next() + + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: .init()) + + for el in [el1, el2, el3, el4] { + let connID = connections.createNewConnection(on: el) + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el) + let (_, connContext) = connections.newHTTP1ConnectionEstablished(conn) + XCTAssertEqual(connContext.use, .generalPurpose) + XCTAssert(connContext.eventLoop === el) + } + + XCTAssertEqual(connections.stats.backingOff, 0) + XCTAssertEqual(connections.stats.leased, 0) + XCTAssertEqual(connections.stats.idle, 4) + + // connection is leased + guard let lease = connections.leaseConnection(onPreferred: el1) else { + return XCTFail("Expected to be able to lease a connection") + } + XCTAssertEqual(lease, .__testOnly_connection(id: 0, eventLoop: el1)) + + XCTAssertEqual(connections.stats.leased, 1) + XCTAssertEqual(connections.stats.idle, 3) + + // start another connection that fails + let backingOffID = connections.createNewConnection(on: el1) + XCTAssert(connections.backoffNextConnectionAttempt(backingOffID) === el1) + + // start another connection + let startingID = connections.createNewConnection(on: el2) + + let context = connections.shutdown() + XCTAssertEqual(context.close.count, 3) + XCTAssertEqual(context.cancel, [lease]) + XCTAssertEqual(context.connectBackoff, [backingOffID]) + + XCTAssertEqual(connections.stats.idle, 0) + XCTAssertEqual(connections.stats.backingOff, 0) + XCTAssertEqual(connections.stats.leased, 1) + XCTAssertEqual(connections.stats.connecting, 1) + XCTAssertFalse(connections.isEmpty) + + let (releaseIndex, _) = connections.releaseConnection(lease.id) + XCTAssertEqual(connections.closeConnection(at: releaseIndex), lease) + XCTAssertFalse(connections.isEmpty) + + let (failIndex, _) = connections.failConnection(startingID) + connections.removeConnection(at: failIndex) + XCTAssertTrue(connections.isEmpty) + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift index 0b1a1b8a4..814b0d078 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +@testable import AsyncHTTPClient import Dispatch import NIOConcurrencyHelpers import NIOCore @@ -51,3 +52,16 @@ final class EmbeddedEventLoopGroup: EventLoopGroup { } } } + +extension HTTPConnectionPool.HTTP1Connections.ConnectionUse: Equatable { + public static func == (lhs: Self, rhs: Self) -> Bool { + switch (lhs, rhs) { + case (.eventLoop(let lhsEventLoop), .eventLoop(let rhsEventLoop)): + return lhsEventLoop === rhsEventLoop + case (.generalPurpose, .generalPurpose): + return true + default: + return false + } + } +} diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 8c189d15a..9f7a199a6 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -41,6 +41,7 @@ import XCTest testCase(HTTPClientSOCKSTests.allTests), testCase(HTTPClientTests.allTests), testCase(HTTPConnectionPool_FactoryTests.allTests), + testCase(HTTPConnectionPool_HTTP1ConnectionsTests.allTests), testCase(HTTPConnectionPool_RequestQueueTests.allTests), testCase(HTTPRequestStateMachineTests.allTests), testCase(LRUCacheTests.allTests),