Skip to content

[ConnectionPool] HTTP1StateMachine #416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 9, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -375,6 +375,11 @@ extension HTTPConnectionPool {
self.connections[index].lease()
}

func parkConnection(at index: Int) -> (Connection.ID, EventLoop) {
precondition(self.connections[index].isIdle)
return (self.connections[index].connectionID, self.connections[index].eventLoop)
}

/// A new HTTP/1.1 connection was released.
///
/// This will put the position into the idle state.
@@ -446,12 +451,13 @@ extension HTTPConnectionPool {
/// This will put the position into the closed state.
///
/// - Parameter connectionID: The failed connection's id.
/// - Returns: An index and an IdleConnectionContext to determine the next action for the now closed connection.
/// - Returns: An optional index and an IdleConnectionContext to determine the next action for the closed connection.
/// You must call ``removeConnection(at:)`` or ``replaceConnection(at:)`` with the
/// supplied index after this.
mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext) {
/// supplied index after this. If nil is returned the connection was closed by the state machine and was
/// therefore already removed.
mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to fail a new connection that we know nothing about?")
return nil
}

let use: ConnectionUse
@@ -607,22 +613,4 @@ extension HTTPConnectionPool {
var connecting: Int = 0
var backingOff: Int = 0
}

/// The pool cleanup todo list.
struct CleanupContext: Equatable {
/// the connections to close right away. These are idle.
var close: [Connection]

/// the connections that currently run a request that needs to be cancelled to close the connections
var cancel: [Connection]

/// the connections that are backing off from connection creation
var connectBackoff: [Connection.ID]

init(close: [Connection] = [], cancel: [Connection] = [], connectBackoff: [Connection.ID] = []) {
self.close = close
self.cancel = cancel
self.connectBackoff = connectBackoff
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -33,11 +33,12 @@ extension HTTPConnectionPool {
self.count == 0
}

func count(for eventLoop: EventLoop?) -> Int {
if let eventLoop = eventLoop {
return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0
}
return self.generalPurposeQueue.count
var generalPurposeCount: Int {
self.generalPurposeQueue.count
}

func count(for eventLoop: EventLoop) -> Int {
self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0
}

func isEmpty(for eventLoop: EventLoop?) -> Bool {
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO
import NIOHTTP1

extension HTTPConnectionPool {
struct StateMachine {
struct Action {
let request: RequestAction
let connection: ConnectionAction

init(request: RequestAction, connection: ConnectionAction) {
self.request = request
self.connection = connection
}

static let none: Action = Action(request: .none, connection: .none)
}

enum ConnectionAction {
enum IsShutdown: Equatable {
case yes(unclean: Bool)
case no
}

case createConnection(Connection.ID, on: EventLoop)
case scheduleBackoffTimer(Connection.ID, backoff: TimeAmount, on: EventLoop)

case scheduleTimeoutTimer(Connection.ID, on: EventLoop)
case cancelTimeoutTimer(Connection.ID)

case closeConnection(Connection, isShutdown: IsShutdown)
case cleanupConnections(CleanupContext, isShutdown: IsShutdown)

case none
}

enum RequestAction {
case executeRequest(Request, Connection, cancelTimeout: Bool)
case executeRequestsAndCancelTimeouts([Request], Connection)

case failRequest(Request, Error, cancelTimeout: Bool)
case failRequestsAndCancelTimeouts([Request], Error)

case scheduleRequestTimeout(for: Request, on: EventLoop)
case cancelRequestTimeout(Request.ID)

case none
}

enum HTTPVersionState {
case http1(HTTP1StateMachine)
}

var state: HTTPVersionState
var isShuttingDown: Bool = false

let eventLoopGroup: EventLoopGroup
let maximumConcurrentHTTP1Connections: Int

init(eventLoopGroup: EventLoopGroup, idGenerator: Connection.ID.Generator, maximumConcurrentHTTP1Connections: Int) {
self.maximumConcurrentHTTP1Connections = maximumConcurrentHTTP1Connections
let http1State = HTTP1StateMachine(
idGenerator: idGenerator,
maximumConcurrentConnections: maximumConcurrentHTTP1Connections
)
self.state = .http1(http1State)
self.eventLoopGroup = eventLoopGroup
}

mutating func executeRequest(_ request: Request) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.executeRequest(request)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func newHTTP1ConnectionCreated(_ connection: Connection) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.newHTTP1ConnectionEstablished(connection)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.failedToCreateNewConnection(
error,
connectionID: connectionID
)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.connectionCreationBackoffDone(connectionID)
self.state = .http1(http1StateMachine)
return action
}
}

/// A request has timed out.
///
/// This is different to a request being cancelled. If a request times out, we need to fail the
/// request, but don't need to cancel the timer (it already triggered). If a request is cancelled
/// we don't need to fail it but we need to cancel its timeout timer.
mutating func timeoutRequest(_ requestID: Request.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.timeoutRequest(requestID)
self.state = .http1(http1StateMachine)
return action
}
}

/// A request was cancelled.
///
/// This is different to a request timing out. If a request is cancelled we don't need to fail it but we
/// need to cancel its timeout timer. If a request times out, we need to fail the request, but don't
/// need to cancel the timer (it already triggered).
mutating func cancelRequest(_ requestID: Request.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.cancelRequest(requestID)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.connectionIdleTimeout(connectionID)
self.state = .http1(http1StateMachine)
return action
}
}

/// A connection has been closed
mutating func connectionClosed(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.connectionClosed(connectionID)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.http1ConnectionReleased(connectionID)
self.state = .http1(http1StateMachine)
return action
}
}

mutating func shutdown() -> Action {
precondition(!self.isShuttingDown, "Shutdown must only be called once")

self.isShuttingDown = true

switch self.state {
case .http1(var http1StateMachine):
let action = http1StateMachine.shutdown()
self.state = .http1(http1StateMachine)
return action
}
}
}
}

extension HTTPConnectionPool {
/// The pool cleanup todo list.
struct CleanupContext: Equatable {
/// the connections to close right away. These are idle.
var close: [Connection]

/// the connections that currently run a request that needs to be cancelled to close the connections
var cancel: [Connection]

/// the connections that are backing off from connection creation
var connectBackoff: [Connection.ID]

init(close: [Connection] = [], cancel: [Connection] = [], connectBackoff: [Connection.ID] = []) {
self.close = close
self.cancel = cancel
self.connectBackoff = connectBackoff
}
}
}

extension HTTPConnectionPool.StateMachine: CustomStringConvertible {
var description: String {
switch self.state {
case .http1(let http1):
return ".http1(\(http1))"
}
}
}
8 changes: 8 additions & 0 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
@@ -925,6 +925,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
case tlsHandshakeTimeout
case serverOfferedUnsupportedApplicationProtocol(String)
case requestStreamCancelled
case getConnectionFromPoolTimeout
}

private var code: Code
@@ -997,4 +998,11 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
/// The remote server responded with a status code >= 300, before the full request was sent. The request stream
/// was therefore cancelled
public static let requestStreamCancelled = HTTPClientError(code: .requestStreamCancelled)

/// Aquiring a HTTP connection from the connection pool timed out.
///
/// This can have multiple reasons:
/// - A connection could not be created within the timout period.
/// - Tasks are not processed fast enough on the existing connections, to process all waiters in time
public static let getConnectionFromPoolTimeout = HTTPClientError(code: .getConnectionFromPoolTimeout)
}
Original file line number Diff line number Diff line change
@@ -68,7 +68,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
let backoff1EL = connections.backoffNextConnectionAttempt(conn1ID)
XCTAssert(backoff1EL === el1)
// backoff done. 2. decide what's next
let (conn1FailIndex, conn1FailContext) = connections.failConnection(conn1ID)
guard let (conn1FailIndex, conn1FailContext) = connections.failConnection(conn1ID) else {
return XCTFail("Expected that the connection is remembered")
}
XCTAssert(conn1FailContext.eventLoop === el1)
XCTAssertEqual(conn1FailContext.use, .generalPurpose)
XCTAssertEqual(conn1FailContext.connectionsStartingForUseCase, 0)
@@ -83,7 +85,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
XCTAssertEqual(connections.startingEventLoopConnections(on: el2), 1)
let backoff2EL = connections.backoffNextConnectionAttempt(conn2ID)
XCTAssert(backoff2EL === el2)
let (conn2FailIndex, conn2FailContext) = connections.failConnection(conn2ID)
guard let (conn2FailIndex, conn2FailContext) = connections.failConnection(conn2ID) else {
return XCTFail("Expected that the connection is remembered")
}
XCTAssert(conn2FailContext.eventLoop === el2)
XCTAssertEqual(conn2FailContext.use, .eventLoop(el2))
XCTAssertEqual(conn2FailContext.connectionsStartingForUseCase, 0)
@@ -329,7 +333,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
XCTAssertEqual(connections.closeConnection(at: releaseIndex), lease)
XCTAssertFalse(connections.isEmpty)

let (failIndex, _) = connections.failConnection(startingID)
guard let (failIndex, _) = connections.failConnection(startingID) else {
return XCTFail("Expected that the connection is remembered")
}
connections.removeConnection(at: failIndex)
XCTAssertTrue(connections.isEmpty)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// HTTPConnectionPool+HTTP1StateTests+XCTest.swift
//
import XCTest

///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///

extension HTTPConnectionPool_HTTP1StateMachineTests {
static var allTests: [(String, (HTTPConnectionPool_HTTP1StateMachineTests) -> () throws -> Void)] {
return [
("testCreatingAndFailingConnections", testCreatingAndFailingConnections),
("testConnectionFailureBackoff", testConnectionFailureBackoff),
("testCancelRequestWorks", testCancelRequestWorks),
("testExecuteOnShuttingDownPool", testExecuteOnShuttingDownPool),
("testRequestsAreQueuedIfAllConnectionsAreInUseAndRequestsAreDequeuedInOrder", testRequestsAreQueuedIfAllConnectionsAreInUseAndRequestsAreDequeuedInOrder),
("testBestConnectionIsPicked", testBestConnectionIsPicked),
("testConnectionAbortIsIgnoredIfThereAreNoQueuedRequests", testConnectionAbortIsIgnoredIfThereAreNoQueuedRequests),
("testConnectionCloseLeadsToTumbleWeedIfThereNoQueuedRequests", testConnectionCloseLeadsToTumbleWeedIfThereNoQueuedRequests),
("testConnectionAbortLeadsToNewConnectionsIfThereAreQueuedRequests", testConnectionAbortLeadsToNewConnectionsIfThereAreQueuedRequests),
("testParkedConnectionTimesOut", testParkedConnectionTimesOut),
("testConnectionPoolFullOfParkedConnectionsIsShutdownImmediately", testConnectionPoolFullOfParkedConnectionsIsShutdownImmediately),
("testParkedConnectionTimesOutButIsAlsoClosedByRemote", testParkedConnectionTimesOutButIsAlsoClosedByRemote),
]
}
}
590 changes: 590 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ class HTTPConnectionPool_RequestQueueTests: XCTestCase {
XCTAssertFalse(queue.isEmpty)
XCTAssertFalse(queue.isEmpty(for: nil))
XCTAssertEqual(queue.count, 1)
XCTAssertEqual(queue.count(for: nil), 1)
XCTAssertEqual(queue.generalPurposeCount, 1)

let req2 = MockScheduledRequest(requiredEventLoop: nil)
let req2ID = queue.push(.init(req2))
58 changes: 58 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift
Original file line number Diff line number Diff line change
@@ -53,6 +53,12 @@ final class EmbeddedEventLoopGroup: EventLoopGroup {
}
}

extension HTTPConnectionPool.Request: Equatable {
public static func == (lhs: Self, rhs: Self) -> Bool {
return lhs.id == rhs.id
}
}

extension HTTPConnectionPool.HTTP1Connections.ConnectionUse: Equatable {
public static func == (lhs: Self, rhs: Self) -> Bool {
switch (lhs, rhs) {
@@ -65,3 +71,55 @@ extension HTTPConnectionPool.HTTP1Connections.ConnectionUse: Equatable {
}
}
}

extension HTTPConnectionPool.StateMachine.ConnectionAction: Equatable {
public static func == (lhs: Self, rhs: Self) -> Bool {
switch (lhs, rhs) {
case (.createConnection(let lhsConnID, on: let lhsEL), .createConnection(let rhsConnID, on: let rhsEL)):
return lhsConnID == rhsConnID && lhsEL === rhsEL
case (.scheduleBackoffTimer(let lhsConnID, let lhsBackoff, on: let lhsEL), .scheduleBackoffTimer(let rhsConnID, let rhsBackoff, on: let rhsEL)):
return lhsConnID == rhsConnID && lhsBackoff == rhsBackoff && lhsEL === rhsEL
case (.scheduleTimeoutTimer(let lhsConnID, on: let lhsEL), .scheduleTimeoutTimer(let rhsConnID, on: let rhsEL)):
return lhsConnID == rhsConnID && lhsEL === rhsEL
case (.cancelTimeoutTimer(let lhsConnID), .cancelTimeoutTimer(let rhsConnID)):
return lhsConnID == rhsConnID
case (.closeConnection(let lhsConn, isShutdown: let lhsShut), .closeConnection(let rhsConn, isShutdown: let rhsShut)):
return lhsConn == rhsConn && lhsShut == rhsShut
case (.cleanupConnections(let lhsContext, isShutdown: let lhsShut), .cleanupConnections(let rhsContext, isShutdown: let rhsShut)):
return lhsContext == rhsContext && lhsShut == rhsShut
case (.none, .none):
return true
default:
return false
}
}
}

extension HTTPConnectionPool.StateMachine.RequestAction: Equatable {
public static func == (lhs: Self, rhs: Self) -> Bool {
switch (lhs, rhs) {
case (.executeRequest(let lhsReq, let lhsConn, let lhsReqID), .executeRequest(let rhsReq, let rhsConn, let rhsReqID)):
return lhsReq == rhsReq && lhsConn == rhsConn && lhsReqID == rhsReqID
case (.executeRequestsAndCancelTimeouts(let lhsReqs, let lhsConn), .executeRequestsAndCancelTimeouts(let rhsReqs, let rhsConn)):
return lhsReqs.elementsEqual(rhsReqs, by: { $0 == $1 }) && lhsConn == rhsConn
case (.failRequest(let lhsReq, _, cancelTimeout: let lhsReqID), .failRequest(let rhsReq, _, cancelTimeout: let rhsReqID)):
return lhsReq == rhsReq && lhsReqID == rhsReqID
case (.failRequestsAndCancelTimeouts(let lhsReqs, _), .failRequestsAndCancelTimeouts(let rhsReqs, _)):
return lhsReqs.elementsEqual(rhsReqs, by: { $0 == $1 })
case (.scheduleRequestTimeout(for: let lhsReq, on: let lhsEL), .scheduleRequestTimeout(for: let rhsReq, on: let rhsEL)):
return lhsReq == rhsReq && lhsEL === rhsEL
case (.cancelRequestTimeout(let lhsReqID), .cancelRequestTimeout(let rhsReqID)):
return lhsReqID == rhsReqID
case (.none, .none):
return true
default:
return false
}
}
}

extension HTTPConnectionPool.StateMachine.Action: Equatable {
public static func == (lhs: Self, rhs: Self) -> Bool {
lhs.connection == rhs.connection && lhs.request == rhs.request
}
}
73 changes: 73 additions & 0 deletions Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift
Original file line number Diff line number Diff line change
@@ -467,6 +467,79 @@ extension MockConnectionPool {
self.connections.removeValue(forKey: connectionID)
return connectionID
}

enum SetupError: Error {
case totalNumberOfConnectionsMustBeLowerThanIdle
case expectedConnectionToBeCreated
case expectedRequestToBeAddedToQueue
case expectedPreviouslyQueuedRequestToBeRunNow
case expectedNoConnectionAction
case expectedConnectionToBeParked
}

static func http1(
elg: EventLoopGroup,
on eventLoop: EventLoop? = nil,
numberOfConnections: Int,
maxNumberOfConnections: Int = 8
) throws -> (Self, HTTPConnectionPool.StateMachine) {
var state = HTTPConnectionPool.StateMachine(
eventLoopGroup: elg,
idGenerator: .init(),
maximumConcurrentHTTP1Connections: maxNumberOfConnections
)
var connections = MockConnectionPool()
var queuer = MockRequestQueuer()

for _ in 0..<numberOfConnections {
let mockRequest = MockHTTPRequest(eventLoop: eventLoop ?? elg.next())
let request = HTTPConnectionPool.Request(mockRequest)
let action = state.executeRequest(request)

guard case .scheduleRequestTimeout(request, on: let waitEL) = action.request, mockRequest.eventLoop === waitEL else {
throw SetupError.expectedRequestToBeAddedToQueue
}

guard case .createConnection(let connectionID, on: let eventLoop) = action.connection else {
throw SetupError.expectedConnectionToBeCreated
}

try connections.createConnection(connectionID, on: eventLoop)
try queuer.queue(mockRequest, id: request.id)
}

while let connectionID = connections.randomStartingConnection() {
let newConnection = try connections.succeedConnectionCreationHTTP1(connectionID)
let action = state.newHTTP1ConnectionCreated(newConnection)

guard case .executeRequest(let request, newConnection, cancelTimeout: true) = action.request else {
throw SetupError.expectedPreviouslyQueuedRequestToBeRunNow
}

guard case .none = action.connection else {
throw SetupError.expectedNoConnectionAction
}

let mockRequest = try queuer.get(request.id, request: request.__testOnly_wrapped_request())
try connections.execute(mockRequest, on: newConnection)
}

while let connection = connections.randomLeasedConnection() {
try connections.finishExecution(connection.id)

let expected: HTTPConnectionPool.StateMachine.ConnectionAction = .scheduleTimeoutTimer(
connection.id,
on: connection.eventLoop
)
guard state.http1ConnectionReleased(connection.id) == .init(request: .none, connection: expected) else {
throw SetupError.expectedConnectionToBeParked
}

try connections.parkConnection(connection.id)
}

return (connections, state)
}
}

/// A request that can be used when testing the `HTTPConnectionPool.StateMachine`
1 change: 1 addition & 0 deletions Tests/LinuxMain.swift
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ import XCTest
testCase(HTTPClientTests.allTests),
testCase(HTTPConnectionPool_FactoryTests.allTests),
testCase(HTTPConnectionPool_HTTP1ConnectionsTests.allTests),
testCase(HTTPConnectionPool_HTTP1StateMachineTests.allTests),
testCase(HTTPConnectionPool_RequestQueueTests.allTests),
testCase(HTTPRequestStateMachineTests.allTests),
testCase(LRUCacheTests.allTests),