diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index c38479778..b0317eef4 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -189,6 +189,30 @@ extension HTTPConnectionPool { preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)") } } + + enum MigrateAction { + case removeConnection + case keepConnection + } + + func migrateToHTTP2(_ context: inout HTTP1Connections.HTTP1ToHTTP2MigrationContext) -> MigrateAction { + switch self.state { + case .starting: + context.starting.append((self.connectionID, self.eventLoop)) + return .removeConnection + case .backingOff: + context.backingOff.append((self.connectionID, self.eventLoop)) + return .removeConnection + case .idle(let connection, since: _): + // Idle connections can be removed right away + context.close.append(connection) + return .removeConnection + case .leased: + return .keepConnection + case .closed: + preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)") + } + } } /// A structure to hold the currently active HTTP/1.1 connections. @@ -298,6 +322,12 @@ extension HTTPConnectionPool { var connectionsStartingForUseCase: Int } + struct HTTP1ToHTTP2MigrationContext { + var backingOff: [(Connection.ID, EventLoop)] = [] + var starting: [(Connection.ID, EventLoop)] = [] + var close: [Connection] = [] + } + // MARK: Connection creation mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID { @@ -485,6 +515,21 @@ extension HTTPConnectionPool { return (index, context) } + // MARK: Migration + + mutating func migrateToHTTP2() -> HTTP1ToHTTP2MigrationContext { + var migrationContext = HTTP1ToHTTP2MigrationContext() + self.connections.removeAll { connection in + switch connection.migrateToHTTP2(&migrationContext) { + case .removeConnection: + return true + case .keepConnection: + return false + } + } + return migrationContext + } + // MARK: Shutdown mutating func shutdown() -> CleanupContext { @@ -610,12 +655,12 @@ extension HTTPConnectionPool { return nil } - } - struct Stats { - var idle: Int = 0 - var leased: Int = 0 - var connecting: Int = 0 - var backingOff: Int = 0 + struct Stats { + var idle: Int = 0 + var leased: Int = 0 + var connecting: Int = 0 + var backingOff: Int = 0 + } } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index bd1eaeff1..0790f70db 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -24,12 +24,12 @@ extension HTTPConnectionPool { typealias Action = HTTPConnectionPool.StateMachine.Action - private var connections: HTTP1Connections + private(set) var connections: HTTP1Connections private var failedConsecutiveConnectionAttempts: Int = 0 /// the error from the last connection creation private var lastConnectFailure: Error? - private var requests: RequestQueue + private(set) var requests: RequestQueue private var state: State = .running init(idGenerator: Connection.ID.Generator, maximumConcurrentConnections: Int) { @@ -41,7 +41,7 @@ extension HTTPConnectionPool { self.requests = RequestQueue() } - // MARK: - Events - + // MARK: - Events mutating func executeRequest(_ request: Request) -> Action { switch self.state { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 6573ea368..6e82773ab 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -49,6 +49,16 @@ extension HTTPConnectionPool { } } + /// A connection is established and can potentially execute requests if not all streams are leased + var isActive: Bool { + switch self.state { + case .active: + return true + case .starting, .backingOff, .draining, .closed: + return false + } + } + /// A request can be scheduled on the connection var isAvailable: Bool { switch self.state { @@ -326,6 +336,11 @@ extension HTTPConnectionPool { // MARK: Connection creation + /// true if one ore more connections are active + var hasActiveConnections: Bool { + self.connections.contains { $0.isActive } + } + /// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool { self.connections.contains { $0.canOrWillBeAbleToExecuteRequests } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift new file mode 100644 index 000000000..5a79f93c5 --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -0,0 +1,470 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore +import NIOHTTP2 + +extension HTTPConnectionPool { + struct HTTP2StateMachine { + typealias Action = HTTPConnectionPool.StateMachine.Action + typealias RequestAction = HTTPConnectionPool.StateMachine.RequestAction + typealias ConnectionAction = HTTPConnectionPool.StateMachine.ConnectionAction + + private enum State: Equatable { + case running + case shuttingDown(unclean: Bool) + case shutDown + } + + private var lastConnectFailure: Error? + private var failedConsecutiveConnectionAttempts = 0 + + private var connections: HTTP2Connections + private var http1Connections: HTTP1Connections? + + private var requests: RequestQueue + + private let idGenerator: Connection.ID.Generator + + private var state: State = .running + + init( + idGenerator: Connection.ID.Generator + ) { + self.idGenerator = idGenerator + self.requests = RequestQueue() + + self.connections = HTTP2Connections(generator: idGenerator) + } + + mutating func migrateConnectionsFromHTTP1( + connections http1Connections: HTTP1Connections, + requests: RequestQueue + ) -> Action { + precondition(self.http1Connections == nil) + precondition(self.connections.isEmpty) + precondition(self.requests.isEmpty) + + var http1Connections = http1Connections // make http1Connections mutable + let context = http1Connections.migrateToHTTP2() + self.connections.migrateConnections( + starting: context.starting, + backingOff: context.backingOff + ) + + if !http1Connections.isEmpty { + self.http1Connections = http1Connections + } + + self.requests = requests + + // TODO: Close all idle connections from context.close + // TODO: Start new http2 connections for + // TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap) + + return .none + } + + mutating func executeRequest(_ request: Request) -> Action { + switch self.state { + case .running: + if let eventLoop = request.requiredEventLoop { + return self.executeRequest(request, onRequired: eventLoop) + } else { + return self.executeRequest(request, onPreferred: request.preferredEventLoop) + } + case .shutDown, .shuttingDown: + // it is fairly unlikely that this condition is met, since the ConnectionPoolManager + // also fails new requests immediately, if it is shutting down. However there might + // be race conditions in which a request passes through a running connection pool + // manager, but hits a connection pool that is already shutting down. + // + // (Order in one lock does not guarantee order in the next lock!) + return .init( + request: .failRequest(request, HTTPClientError.alreadyShutdown, cancelTimeout: false), + connection: .none + ) + } + } + + private mutating func executeRequest( + _ request: Request, + onRequired eventLoop: EventLoop + ) -> Action { + if let (connection, context) = self.connections.leaseStream(onRequired: eventLoop) { + /// 1. we have a stream available and can execute the request immediately + if context.wasIdle { + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .cancelTimeoutTimer(connection.id) + ) + } else { + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .none + ) + } + } + /// 2. No available stream so we definitely need to wait until we have one + self.requests.push(request) + + if self.connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop) { + /// 3. we already have a connection, we just need to wait until until it becomes available + return .init( + request: .scheduleRequestTimeout(for: request, on: eventLoop), + connection: .none + ) + } else { + /// 4. we do *not* have a connection, need to create a new one and wait until it is connected. + let connectionId = self.connections.createNewConnection(on: eventLoop) + return .init( + request: .scheduleRequestTimeout(for: request, on: eventLoop), + connection: .createConnection(connectionId, on: eventLoop) + ) + } + } + + private mutating func executeRequest( + _ request: Request, + onPreferred eventLoop: EventLoop + ) -> Action { + if let (connection, context) = self.connections.leaseStream(onPreferred: eventLoop) { + /// 1. we have a stream available and can execute the request immediately + if context.wasIdle { + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .cancelTimeoutTimer(connection.id) + ) + } else { + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .none + ) + } + } + /// 2. No available stream so we definitely need to wait until we have one + self.requests.push(request) + + if self.connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests { + /// 3. we already have a connection, we just need to wait until until it becomes available + return .init( + request: .scheduleRequestTimeout(for: request, on: eventLoop), + connection: .none + ) + } else { + /// 4. we do *not* have a connection, need to create a new one and wait until it is connected. + let connectionId = self.connections.createNewConnection(on: eventLoop) + return .init( + request: .scheduleRequestTimeout(for: request, on: eventLoop), + connection: .createConnection(connectionId, on: eventLoop) + ) + } + } + + mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> Action { + self.failedConsecutiveConnectionAttempts = 0 + self.lastConnectFailure = nil + let (index, context) = self.connections.newHTTP2ConnectionEstablished( + connection, + maxConcurrentStreams: maxConcurrentStreams + ) + return self.nextActionForAvailableConnection(at: index, context: context) + } + + private mutating func nextActionForAvailableConnection( + at index: Int, + context: HTTP2Connections.AvailableConnectionContext + ) -> Action { + switch self.state { + case .running: + // We prioritise requests with a required event loop over those without a requirement. + // This can cause starvation for request without a required event loop. + // We should come up with a better algorithm in the future. + + var requestsToExecute = self.requests.popFirst(max: context.availableStreams, for: context.eventLoop) + let remainingAvailableStreams = context.availableStreams - requestsToExecute.count + // use the remaining available streams for requests without a required event loop + requestsToExecute += self.requests.popFirst(max: remainingAvailableStreams, for: nil) + let (connection, _) = self.connections.leaseStreams(at: index, count: requestsToExecute.count) + + let requestAction = { () -> RequestAction in + if requestsToExecute.isEmpty { + return .none + } else { + return .executeRequestsAndCancelTimeouts(requestsToExecute, connection) + } + }() + + let connectionAction = { () -> ConnectionAction in + if context.isIdle, requestsToExecute.isEmpty { + return .scheduleTimeoutTimer(connection.id, on: context.eventLoop) + } else { + return .none + } + }() + + return .init( + request: requestAction, + connection: connectionAction + ) + case .shuttingDown(let unclean): + guard context.isIdle else { + return .none + } + + let connection = self.connections.closeConnection(at: index) + if self.connections.isEmpty { + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean)) + ) + } + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + case .shutDown: + preconditionFailure("It the pool is already shutdown, all connections must have been torn down.") + } + } + + mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action { + let (index, context) = self.connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) + return self.nextActionForAvailableConnection(at: index, context: context) + } + + mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action { + let context = self.connections.goAwayReceived(connectionID) + return self.nextActionForClosingConnection(on: context.eventLoop) + } + + mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action { + guard let (index, context) = self.connections.failConnection(connectionID) else { + // When a connection close is initiated by the connection pool, the connection will + // still report its close to the state machine. In those cases we must ignore the + // event. + return .none + } + return self.nextActionForFailedConnection(at: index, on: context.eventLoop) + } + + private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action { + switch self.state { + case .running: + let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) + guard hasPendingRequest else { + return .none + } + + let (newConnectionID, previousEventLoop) = self.connections.createNewConnectionByReplacingClosedConnection(at: index) + precondition(previousEventLoop === eventLoop) + + return .init( + request: .none, + connection: .createConnection(newConnectionID, on: eventLoop) + ) + case .shuttingDown(let unclean): + assert(self.requests.isEmpty) + self.connections.removeConnection(at: index) + if self.connections.isEmpty { + return .init( + request: .none, + connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean)) + ) + } + return .none + + case .shutDown: + preconditionFailure("If the pool is already shutdown, all connections must have been torn down.") + } + } + + private mutating func nextActionForClosingConnection(on eventLoop: EventLoop) -> Action { + switch self.state { + case .running: + let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) + guard hasPendingRequest else { + return .none + } + + let newConnectionID = self.connections.createNewConnection(on: eventLoop) + + return .init( + request: .none, + connection: .createConnection(newConnectionID, on: eventLoop) + ) + case .shutDown, .shuttingDown: + return .none + } + } + + mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action { + let (index, context) = self.connections.releaseStream(connectionID) + return self.nextActionForAvailableConnection(at: index, context: context) + } + + mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action { + self.failedConsecutiveConnectionAttempts += 1 + self.lastConnectFailure = error + + let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID) + let backoff = calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts) + return .init(request: .none, connection: .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop)) + } + + mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action { + // The naming of `failConnection` is a little confusing here. All it does is moving the + // connection state from `.backingOff` to `.closed` here. It also returns the + // connection's index. + guard let (index, context) = self.connections.failConnection(connectionID) else { + preconditionFailure("Backing off a connection that is unknown to us?") + } + return self.nextActionForFailedConnection(at: index, on: context.eventLoop) + } + + mutating func timeoutRequest(_ requestID: Request.ID) -> Action { + // 1. check requests in queue + if let request = self.requests.remove(requestID) { + var error: Error = HTTPClientError.getConnectionFromPoolTimeout + if let lastError = self.lastConnectFailure { + error = lastError + } else if !self.connections.hasActiveConnections { + error = HTTPClientError.connectTimeout + } + return .init( + request: .failRequest(request, error, cancelTimeout: false), + connection: .none + ) + } + + // 2. This point is reached, because the request may have already been scheduled. A + // connection might have become available shortly before the request timeout timer + // fired. + return .none + } + + mutating func cancelRequest(_ requestID: Request.ID) -> Action { + // 1. check requests in queue + if self.requests.remove(requestID) != nil { + return .init( + request: .cancelRequestTimeout(requestID), + connection: .none + ) + } + + // 2. This is point is reached, because the request may already have been forwarded to + // an idle connection. In this case the connection will need to handle the + // cancellation. + return .none + } + + mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action { + guard let connection = connections.closeConnectionIfIdle(connectionID) else { + // because of a race this connection (connection close runs against trigger of timeout) + // was already removed from the state machine. + return .none + } + + precondition(self.state == .running, "If we are shutting down, we must not have any idle connections") + + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + } + + mutating func http1ConnectionClosed(_ connectionID: Connection.ID) -> Action { + guard let index = self.http1Connections?.failConnection(connectionID)?.0 else { + return .none + } + self.http1Connections!.removeConnection(at: index) + if self.http1Connections!.isEmpty { + self.http1Connections = nil + } + switch self.state { + case .running: + return .none + case .shuttingDown(let unclean): + if self.http1Connections == nil, self.connections.isEmpty { + return .init( + request: .none, + connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean)) + ) + } else { + return .none + } + case .shutDown: + preconditionFailure("If the pool is already shutdown, all connections must have been torn down.") + } + } + + mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action { + // It is save to bang the http1Connections here. If we get this callback but we don't have + // http1 connections something has gone terribly wrong. + let (index, _) = self.http1Connections!.releaseConnection(connectionID) + // Any http1 connection that becomes idle should be closed right away after the transition + // to http2. + let connection = self.http1Connections!.closeConnection(at: index) + guard self.http1Connections!.isEmpty else { + return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) + } + // if there are no more http1Connections, we can remove the struct. + self.http1Connections = nil + + // we must also check, if we are shutting down. Was this maybe out last connection? + switch self.state { + case .running: + return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) + case .shuttingDown(let unclean): + if self.connections.isEmpty { + // if the http2connections are empty as well, there are no more connections. Shutdown completed. + return .init(request: .none, connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean))) + } else { + return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) + } + case .shutDown: + preconditionFailure("If the pool is already shutdown, all connections must have been torn down.") + } + } + + mutating func shutdown() -> Action { + // If we have remaining request queued, we should fail all of them with a cancelled + // error. + let waitingRequests = self.requests.removeAll() + + var requestAction: StateMachine.RequestAction = .none + if !waitingRequests.isEmpty { + requestAction = .failRequestsAndCancelTimeouts(waitingRequests, HTTPClientError.cancelled) + } + + // clean up the connections, we can cleanup now! + let cleanupContext = self.connections.shutdown() + + // If there aren't any more connections, everything is shutdown + let isShutdown: StateMachine.ConnectionAction.IsShutdown + let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty && self.http1Connections == nil) + if self.connections.isEmpty, self.http1Connections == nil { + isShutdown = .yes(unclean: unclean) + self.state = .shutDown + } else { + isShutdown = .no + self.state = .shuttingDown(unclean: unclean) + } + return .init( + request: requestAction, + connection: .cleanupConnections(cleanupContext, isShutdown: isShutdown) + ) + } + } +} diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift index c58e18818..74707e3f3 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift @@ -70,6 +70,21 @@ extension HTTPConnectionPool { } } + /// removes up to `max` requests from the queue for the given `eventLoop` and returns them. + /// - Parameters: + /// - max: maximum number of requests to pop + /// - eventLoop: required event loop of the request + /// - Returns: requests for the given `eventLoop` + mutating func popFirst(max: Int, for eventLoop: EventLoop? = nil) -> [Request] { + if let eventLoop = eventLoop { + return self.withEventLoopQueue(for: eventLoop.id) { queue in + queue.popFirst(max: max) + } + } else { + return self.generalPurposeQueue.popFirst(max: max) + } + } + mutating func remove(_ requestID: Request.ID) -> Request? { if let eventLoopID = requestID.eventLoopID { return self.withEventLoopQueue(for: eventLoopID) { queue in @@ -118,3 +133,24 @@ extension HTTPConnectionPool { } } } + +extension CircularBuffer { + /// Removes up to `max` elements from the beginning of the + /// `CircularBuffer` and returns them. + /// + /// Calling this method may invalidate any existing indices for use with this + /// `CircularBuffer`. + /// + /// - Parameter max: The number of elements to remove. + /// `max` must be greater than or equal to zero. + /// - Returns: removed elements + /// + /// - Complexity: O(*k*), where *k* is the number of elements removed. + fileprivate mutating func popFirst(max: Int) -> [Element] { + precondition(max >= 0) + let elementCountToRemove = Swift.min(max, self.count) + let array = Array(self[self.startIndex.. () throws -> Void)] { + return [ + ("testCreatingOfConnection", testCreatingOfConnection), + ("testConnectionFailureBackoff", testConnectionFailureBackoff), + ("testCancelRequestWorks", testCancelRequestWorks), + ("testExecuteOnShuttingDownPool", testExecuteOnShuttingDownPool), + ("testHTTP1ToHTTP2MigrationAndShutdownIfFirstConnectionIsHTTP1", testHTTP1ToHTTP2MigrationAndShutdownIfFirstConnectionIsHTTP1), + ("testSchedulingAndCancelingOfIdleTimeout", testSchedulingAndCancelingOfIdleTimeout), + ("testConnectionTimeout", testConnectionTimeout), + ("testConnectionEstablishmentFailure", testConnectionEstablishmentFailure), + ] + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift new file mode 100644 index 000000000..bbef727e0 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -0,0 +1,438 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +import NIOCore +import NIOEmbedded +import NIOHTTP1 +import NIOPosix +import XCTest + +private typealias Action = HTTPConnectionPool.StateMachine.Action +private typealias ConnectionAction = HTTPConnectionPool.StateMachine.ConnectionAction +private typealias RequestAction = HTTPConnectionPool.StateMachine.RequestAction + +class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { + func testCreatingOfConnection() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: .init()) + + /// first request should create a new connection + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let executeAction = state.executeRequest(request) + + guard case .createConnection(let connID, let eventLoop) = executeAction.connection else { + return XCTFail("Unexpected connection action \(executeAction.connection)") + } + XCTAssertTrue(eventLoop === el1) + + XCTAssertEqual(executeAction.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + + XCTAssertNoThrow(try connections.createConnection(connID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + + /// subsequent requests should not create a connection + for _ in 0..<9 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + + XCTAssertEqual(action.connection, .none) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + /// connection establishment should result in 5 request executions because we set max concurrent streams to 5 + var maybeConn: HTTPConnectionPool.Connection? + XCTAssertNoThrow(maybeConn = try connections.succeedConnectionCreationHTTP2(connID, maxConcurrentStreams: 5)) + guard let conn = maybeConn else { + return XCTFail("unexpected throw") + } + let action = state.newHTTP2ConnectionEstablished(conn, maxConcurrentStreams: 5) + + XCTAssertEqual(action.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, conn) = action.request else { + return XCTFail("Unexpected request action \(action.request)") + } + XCTAssertEqual(requests.count, 5) + + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + } + + /// closing a stream while we have requests queued should result in one request execution action + for _ in 0..<5 { + let action = state.http2ConnectionStreamClosed(connID) + XCTAssertEqual(action.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, conn) = action.request else { + return XCTFail("Unexpected request action \(action.request)") + } + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.cancel(request.id)) + } + } + XCTAssertTrue(queuer.isEmpty) + + /// closing streams without any queued requests shouldn't do anything if it's *not* the last stream + for _ in 0..<4 { + let action = state.http2ConnectionStreamClosed(connID) + XCTAssertEqual(action.request, .none) + XCTAssertEqual(action.connection, .none) + } + + /// 4 streams are available and therefore request should be executed immediately + for _ in 0..<4 { + let mockRequest = MockHTTPRequest(eventLoop: el1, requiresEventLoopForChannel: true) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + + XCTAssertEqual(action.connection, .none) + XCTAssertEqual(action.request, .executeRequest(request, conn, cancelTimeout: false)) + } + + /// closing streams without any queued requests shouldn't do anything if it's *not* the last stream + for _ in 0..<4 { + let action = state.http2ConnectionStreamClosed(connID) + XCTAssertEqual(action.request, .none) + XCTAssertEqual(action.connection, .none) + } + + /// closing the last stream should schedule a idle timeout + let streamCloseAction = state.http2ConnectionStreamClosed(connID) + XCTAssertEqual(streamCloseAction.request, .none) + XCTAssertEqual(streamCloseAction.connection, .scheduleTimeoutTimer(connID, on: el1)) + + /// shutdown should only close one connection + let shutdownAction = state.shutdown() + XCTAssertEqual(shutdownAction.request, .none) + XCTAssertEqual(shutdownAction.connection, .cleanupConnections( + .init( + close: [conn], + cancel: [], + connectBackoff: [] + ), + isShutdown: .yes(unclean: false) + )) + } + + func testConnectionFailureBackoff() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.HTTP2StateMachine( + idGenerator: .init() + ) + + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let action = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = action.connection else { + return XCTFail("Unexpected connection action: \(action.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + let failedConnect1 = state.failedToCreateNewConnection(HTTPClientError.connectTimeout, connectionID: connectionID) + XCTAssertEqual(failedConnect1.request, .none) + guard case .scheduleBackoffTimer(connectionID, let backoffTimeAmount1, _) = failedConnect1.connection else { + return XCTFail("Unexpected connection action: \(failedConnect1.connection)") + } + + // 2. connection attempt + let backoffDoneAction = state.connectionCreationBackoffDone(connectionID) + XCTAssertEqual(backoffDoneAction.request, .none) + guard case .createConnection(let newConnectionID, on: let newEventLoop) = backoffDoneAction.connection else { + return XCTFail("Unexpected connection action: \(backoffDoneAction.connection)") + } + XCTAssertGreaterThan(newConnectionID, connectionID) + XCTAssert(connectionEL === newEventLoop) // XCTAssertIdentical not available on Linux + + let failedConnect2 = state.failedToCreateNewConnection(HTTPClientError.connectTimeout, connectionID: newConnectionID) + XCTAssertEqual(failedConnect2.request, .none) + guard case .scheduleBackoffTimer(newConnectionID, let backoffTimeAmount2, _) = failedConnect2.connection else { + return XCTFail("Unexpected connection action: \(failedConnect2.connection)") + } + + XCTAssertNotEqual(backoffTimeAmount2, backoffTimeAmount1) + + // 3. request times out + let failRequest = state.timeoutRequest(request.id) + guard case .failRequest(let requestToFail, let requestError, cancelTimeout: false) = failRequest.request else { + return XCTFail("Unexpected request action: \(action.request)") + } + XCTAssert(requestToFail.__testOnly_wrapped_request() === mockRequest) // XCTAssertIdentical not available on Linux + XCTAssertEqual(requestError as? HTTPClientError, .connectTimeout) + XCTAssertEqual(failRequest.connection, .none) + + // 4. retry connection, but no more queued requests. + XCTAssertEqual(state.connectionCreationBackoffDone(newConnectionID), .none) + } + + func testCancelRequestWorks() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.HTTP2StateMachine( + idGenerator: .init() + ) + + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let executeAction = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), executeAction.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = executeAction.connection else { + return XCTFail("Unexpected connection action: \(executeAction.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + // 2. cancel request + let cancelAction = state.cancelRequest(request.id) + XCTAssertEqual(cancelAction.request, .cancelRequestTimeout(request.id)) + XCTAssertEqual(cancelAction.connection, .none) + + // 3. request timeout triggers to late + XCTAssertEqual(state.timeoutRequest(request.id), .none, "To late timeout is ignored") + + // 4. succeed connection attempt + let connectedAction = state.newHTTP2ConnectionEstablished( + .__testOnly_connection(id: connectionID, eventLoop: connectionEL), + maxConcurrentStreams: 100 + ) + XCTAssertEqual(connectedAction.request, .none, "Request must not be executed") + XCTAssertEqual(connectedAction.connection, .scheduleTimeoutTimer(connectionID, on: connectionEL)) + } + + func testExecuteOnShuttingDownPool() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.HTTP2StateMachine( + idGenerator: .init() + ) + + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let executeAction = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), executeAction.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = executeAction.connection else { + return XCTFail("Unexpected connection action: \(executeAction.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + // 2. connection succeeds + let connection: HTTPConnectionPool.Connection = .__testOnly_connection(id: connectionID, eventLoop: connectionEL) + let connectedAction = state.newHTTP2ConnectionEstablished(connection, maxConcurrentStreams: 100) + guard case .executeRequestsAndCancelTimeouts([request], connection) = connectedAction.request else { + return XCTFail("Unexpected request action: \(connectedAction.request)") + } + XCTAssert(request.__testOnly_wrapped_request() === mockRequest) // XCTAssertIdentical not available on Linux + XCTAssertEqual(connectedAction.connection, .none) + + // 3. shutdown + let shutdownAction = state.shutdown() + XCTAssertEqual(.none, shutdownAction.request) + guard case .cleanupConnections(let cleanupContext, isShutdown: .no) = shutdownAction.connection else { + return XCTFail("Unexpected connection action: \(shutdownAction.connection)") + } + + XCTAssertEqual(cleanupContext.cancel.count, 1) + XCTAssertEqual(cleanupContext.cancel.first?.id, connectionID) + XCTAssertEqual(cleanupContext.close, []) + XCTAssertEqual(cleanupContext.connectBackoff, []) + + // 4. execute another request + let finalMockRequest = MockHTTPRequest(eventLoop: elg.next()) + let finalRequest = HTTPConnectionPool.Request(finalMockRequest) + let failAction = state.executeRequest(finalRequest) + XCTAssertEqual(failAction.connection, .none) + XCTAssertEqual(failAction.request, .failRequest(finalRequest, HTTPClientError.alreadyShutdown, cancelTimeout: false)) + + // 5. close open connection + let closeAction = state.http2ConnectionClosed(connectionID) + XCTAssertEqual(closeAction.connection, .cleanupConnections(.init(), isShutdown: .yes(unclean: true))) + XCTAssertEqual(closeAction.request, .none) + } + + func testHTTP1ToHTTP2MigrationAndShutdownIfFirstConnectionIsHTTP1() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let el1 = elg.next() + + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1State = HTTPConnectionPool.HTTP1StateMachine(idGenerator: idGenerator, maximumConcurrentConnections: 8) + + let mockRequest1 = MockHTTPRequest(eventLoop: el1) + let request1 = HTTPConnectionPool.Request(mockRequest1) + let mockRequest2 = MockHTTPRequest(eventLoop: el1) + let request2 = HTTPConnectionPool.Request(mockRequest2) + + let executeAction1 = http1State.executeRequest(request1) + XCTAssertEqual(executeAction1.request, .scheduleRequestTimeout(for: request1, on: el1)) + guard case .createConnection(let conn1ID, _) = executeAction1.connection else { + return XCTFail("unexpected connection action \(executeAction1.connection)") + } + let executeAction2 = http1State.executeRequest(request2) + XCTAssertEqual(executeAction2.request, .scheduleRequestTimeout(for: request2, on: el1)) + guard case .createConnection(let conn2ID, _) = executeAction2.connection else { + return XCTFail("unexpected connection action \(executeAction2.connection)") + } + + // first connection is a HTTP1 connection + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let conn1Action = http1State.newHTTP1ConnectionEstablished(conn1) + XCTAssertEqual(conn1Action.connection, .none) + XCTAssertEqual(conn1Action.request, .executeRequest(request1, conn1, cancelTimeout: true)) + + // second connection is a HTTP2 connection and we need to migrate + let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el1) + var http2State = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + + let migrationAction = http2State.migrateConnectionsFromHTTP1( + connections: http1State.connections, + requests: http1State.requests + ) + XCTAssertEqual(migrationAction, .none) + + let http2ConnectAction = http2State.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 100) + XCTAssertEqual(http2ConnectAction.connection, .none) + guard case .executeRequestsAndCancelTimeouts([request2], conn2) = http2ConnectAction.request else { + return XCTFail("Unexpected request action \(http2ConnectAction.request)") + } + + // second request is done first + let closeAction = http2State.http2ConnectionStreamClosed(conn2ID) + XCTAssertEqual(closeAction.request, .none) + XCTAssertEqual(closeAction.connection, .scheduleTimeoutTimer(conn2ID, on: el1)) + + let shutdownAction = http2State.shutdown() + XCTAssertEqual(shutdownAction.request, .none) + XCTAssertEqual(shutdownAction.connection, .cleanupConnections(.init( + close: [conn2], + cancel: [], + connectBackoff: [] + ), isShutdown: .no)) + + let releaseAction = http2State.http1ConnectionReleased(conn1ID) + XCTAssertEqual(releaseAction.request, .none) + XCTAssertEqual(releaseAction.connection, .closeConnection(conn1, isShutdown: .yes(unclean: true))) + } + + func testSchedulingAndCancelingOfIdleTimeout() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + + // establish one idle http2 connection + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) + let conn1ID = http1Conns.createNewConnection(on: el1) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + let migrationAction = state.migrateConnectionsFromHTTP1( + connections: http1Conns, + requests: HTTPConnectionPool.RequestQueue() + ) + XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) + let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(connectAction.request, .none) + XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + + // execute request on idle connection + let mockRequest1 = MockHTTPRequest(eventLoop: el1) + let request1 = HTTPConnectionPool.Request(mockRequest1) + let request1Action = state.executeRequest(request1) + XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false)) + XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID)) + + // close stream + let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID) + XCTAssertEqual(closeStream1Action.request, .none) + XCTAssertEqual(closeStream1Action.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + + // execute request on idle connection with required event loop + let mockRequest2 = MockHTTPRequest(eventLoop: el1, requiresEventLoopForChannel: true) + let request2 = HTTPConnectionPool.Request(mockRequest2) + let request2Action = state.executeRequest(request2) + XCTAssertEqual(request2Action.request, .executeRequest(request2, conn1, cancelTimeout: false)) + XCTAssertEqual(request2Action.connection, .cancelTimeoutTimer(conn1ID)) + + // close stream + let closeStream2Action = state.http2ConnectionStreamClosed(conn1ID) + XCTAssertEqual(closeStream2Action.request, .none) + XCTAssertEqual(closeStream2Action.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + } + + func testConnectionTimeout() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + + // establish one idle http2 connection + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) + let conn1ID = http1Conns.createNewConnection(on: el1) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + let migrationAction = state.migrateConnectionsFromHTTP1( + connections: http1Conns, + requests: HTTPConnectionPool.RequestQueue() + ) + XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) + let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(connectAction.request, .none) + XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + + // let the connection timeout + let timeoutAction = state.connectionIdleTimeout(conn1ID) + XCTAssertEqual(timeoutAction.request, .none) + XCTAssertEqual(timeoutAction.connection, .closeConnection(conn1, isShutdown: .no)) + } + + func testConnectionEstablishmentFailure() { + struct SomeError: Error, Equatable {} + + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + + // establish one idle http2 connection + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) + let conn1ID = http1Conns.createNewConnection(on: el1) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + let migrationAction = state.migrateConnectionsFromHTTP1( + connections: http1Conns, + requests: HTTPConnectionPool.RequestQueue() + ) + XCTAssertEqual(migrationAction, .none) + + let action = state.failedToCreateNewConnection(SomeError(), connectionID: conn1ID) + XCTAssertEqual(action.request, .none) + guard case .scheduleBackoffTimer(conn1ID, _, let eventLoop) = action.connection else { + return XCTFail("unexpected connection action \(action.connection)") + } + XCTAssertEqual(eventLoop.id, el1.id) + } +} diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift index 513cb42b6..009caa922 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift @@ -162,6 +162,14 @@ struct MockConnectionPool { self.state = .http1(.idle(parked: false, idleSince: .now())) } + mutating func http2Started(maxConcurrentStreams: Int) throws { + guard case .starting = self.state else { + throw Errors.connectionIsNotStarting + } + + self.state = .http2(.idle(maxConcurrentStreams: maxConcurrentStreams, parked: false, lastIdle: .now())) + } + mutating func park() throws { switch self.state { case .starting, .closed, .http1(.inUse), .http2(.inUse): @@ -333,6 +341,19 @@ struct MockConnectionPool { return .__testOnly_connection(id: connection.id, eventLoop: connection.eventLoop) } + mutating func succeedConnectionCreationHTTP2( + _ connectionID: Connection.ID, + maxConcurrentStreams: Int + ) throws -> Connection { + guard var connection = self.connections[connectionID] else { + throw Errors.connectionNotFound + } + + try connection.http2Started(maxConcurrentStreams: maxConcurrentStreams) + self.connections[connection.id] = connection + return .__testOnly_connection(id: connection.id, eventLoop: connection.eventLoop) + } + mutating func failConnectionCreation(_ connectionID: Connection.ID) throws { guard let connection = self.connections[connectionID] else { throw Errors.connectionNotFound diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 988f6b194..1adb04801 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -43,6 +43,7 @@ import XCTest testCase(HTTPConnectionPool_HTTP1ConnectionsTests.allTests), testCase(HTTPConnectionPool_HTTP1StateMachineTests.allTests), testCase(HTTPConnectionPool_HTTP2ConnectionsTests.allTests), + testCase(HTTPConnectionPool_HTTP2StateMachineTests.allTests), testCase(HTTPConnectionPool_ManagerTests.allTests), testCase(HTTPConnectionPool_RequestQueueTests.allTests), testCase(HTTPRequestStateMachineTests.allTests),