diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index 446720140..ce638ac29 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -618,9 +618,10 @@ extension HTTPConnectionPool { // TODO: improve algorithm to create connections uniformly across all preferred event loops // while paying attention to the number of queued request per event loop // Currently we start by creating new connections on the event loop with the most queued - // requests. If we have create a enough connections to cover all requests for the given + // requests. If we have created enough connections to cover all requests for the first // event loop we will continue with the event loop with the second most queued requests - // and so on and so forth. We do not need to sort the array because + // and so on and so forth. The `generalPurposeRequestCountGroupedByPreferredEventLoop` + // array is already ordered so we can just iterate over it without sorting by request count. let newGeneralPurposeConnections: [(Connection.ID, EventLoop)] = generalPurposeRequestCountGroupedByPreferredEventLoop // we do not want to allocated intermediate arrays. .lazy diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index b7505ed33..2fefd0420 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -45,18 +45,6 @@ extension HTTPConnectionPool { self.requests = RequestQueue() } - mutating func migrateFromHTTP2( - http2State: HTTP2StateMachine, - newHTTP1Connection: Connection - ) -> Action { - self.migrateFromHTTP2( - http1Connections: http2State.http1Connections, - http2Connections: http2State.connections, - requests: http2State.requests, - newHTTP1Connection: newHTTP1Connection - ) - } - mutating func migrateFromHTTP2( http1Connections: HTTP1Connections? = nil, http2Connections: HTTP2Connections, diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 0a6cdbb16..560d5ff0f 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -404,9 +404,9 @@ extension HTTPConnectionPool { self.connections.removeAll { connection in switch connection.migrateToHTTP1(context: &context) { case .removeConnection: - return false - case .keepConnection: return true + case .keepConnection: + return false } } return context @@ -419,20 +419,46 @@ extension HTTPConnectionPool { self.connections.contains { $0.isActive } } - /// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one + /// used in general purpose connection scenarios to check if at least one connection is starting, backing off or active var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool { self.connections.contains { $0.canOrWillBeAbleToExecuteRequests } } /// used in eventLoop scenarios. does at least one connection exist for this eventLoop, or should we create a new one? /// - Parameter eventLoop: connection `EventLoop` to search for - /// - Returns: true if at least one connection is starting or active for the given `eventLoop` + /// - Returns: true if at least one connection is starting, backing off or active for the given `eventLoop` func hasConnectionThatCanOrWillBeAbleToExecuteRequests(for eventLoop: EventLoop) -> Bool { self.connections.contains { $0.eventLoop === eventLoop && $0.canOrWillBeAbleToExecuteRequests } } + func hasActiveConnection(for eventLoop: EventLoop) -> Bool { + self.connections.contains { + $0.eventLoop === eventLoop && $0.isActive + } + } + + /// used after backoff is done to determine if we need to create a new connection + func hasStartingOrActiveConnection() -> Bool { + self.connections.contains { connection in + connection.canOrWillBeAbleToExecuteRequests + } + } + + /// used after backoff is done to determine if we need to create a new connection + /// - Parameters: + /// - eventLoop: connection `EventLoop` to search for + /// - Returns: true if at least one connection is starting or active for the given `eventLoop` + func hasStartingOrActiveConnection( + for eventLoop: EventLoop + ) -> Bool { + self.connections.contains { connection in + connection.eventLoop === eventLoop && + connection.canOrWillBeAbleToExecuteRequests + } + } + mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID { assert( !self.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop), diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 1d5a4f68d..94ae3edda 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -49,20 +49,6 @@ extension HTTPConnectionPool { self.connections = HTTP2Connections(generator: idGenerator) } - mutating func migrateFromHTTP1( - http1State: HTTP1StateMachine, - newHTTP2Connection: Connection, - maxConcurrentStreams: Int - ) -> Action { - self.migrateFromHTTP1( - http1Connections: http1State.connections, - http2Connections: http1State.http2Connections, - requests: http1State.requests, - newHTTP2Connection: newHTTP2Connection, - maxConcurrentStreams: maxConcurrentStreams - ) - } - mutating func migrateFromHTTP1( http1Connections: HTTP1Connections, http2Connections: HTTP2Connections? = nil, @@ -228,11 +214,22 @@ extension HTTPConnectionPool { private mutating func _newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> EstablishedAction { self.failedConsecutiveConnectionAttempts = 0 self.lastConnectFailure = nil - let (index, context) = self.connections.newHTTP2ConnectionEstablished( - connection, - maxConcurrentStreams: maxConcurrentStreams - ) - return self.nextActionForAvailableConnection(at: index, context: context) + if self.connections.hasActiveConnection(for: connection.eventLoop) { + guard let (index, _) = self.connections.failConnection(connection.id) else { + preconditionFailure("we have established a new connection that we know nothing about?") + } + self.connections.removeConnection(at: index) + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + } else { + let (index, context) = self.connections.newHTTP2ConnectionEstablished( + connection, + maxConcurrentStreams: maxConcurrentStreams + ) + return self.nextActionForAvailableConnection(at: index, context: context) + } } private mutating func nextActionForAvailableConnection( @@ -318,8 +315,28 @@ extension HTTPConnectionPool { private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action { switch self.state { case .running: - let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) - guard hasPendingRequest else { + // we do not know if we have created this connection for a request with a required + // event loop or not. However, we do not need this information and can infer + // if we need to create a new connection because we will only ever create one connection + // per event loop for required event loop requests and only need one connection for + // general purpose requests. + + // we need to start a new on connection in two cases: + let needGeneralPurposeConnection = + // 1. if we have general purpose requests + !self.requests.isEmpty(for: nil) && + // and no connection starting or active + !self.connections.hasStartingOrActiveConnection() + + let needRequiredEventLoopConnection = + // 2. or if we have requests for a required event loop + !self.requests.isEmpty(for: eventLoop) && + // and no connection starting or active for the given event loop + !self.connections.hasStartingOrActiveConnection(for: eventLoop) + + guard needGeneralPurposeConnection || needRequiredEventLoopConnection else { + // otherwise we can remove the connection + self.connections.removeConnection(at: index) return .none } @@ -330,6 +347,7 @@ extension HTTPConnectionPool { request: .none, connection: .createConnection(newConnectionID, on: eventLoop) ) + case .shuttingDown(let unclean): assert(self.requests.isEmpty) self.connections.removeConnection(at: index) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift index dab1354c9..c2b621841 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift @@ -68,6 +68,23 @@ extension HTTPConnectionPool { enum HTTPVersionState { case http1(HTTP1StateMachine) + case http2(HTTP2StateMachine) + + mutating func modify( + http1: (inout HTTP1StateMachine) -> ReturnValue, + http2: (inout HTTP2StateMachine) -> ReturnValue + ) -> ReturnValue { + let returnValue: ReturnValue + switch self { + case .http1(var http1State): + returnValue = http1(&http1State) + self = .http1(http1State) + case .http2(var http2State): + returnValue = http2(&http2State) + self = .http2(http2State) + } + return returnValue + } } var state: HTTPVersionState @@ -87,12 +104,11 @@ extension HTTPConnectionPool { } mutating func executeRequest(_ request: Request) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.executeRequest(request) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.executeRequest(request) + }, http2: { http2 in + http2.executeRequest(request) + }) } mutating func newHTTP1ConnectionCreated(_ connection: Connection) -> Action { @@ -101,28 +117,98 @@ extension HTTPConnectionPool { let action = http1StateMachine.newHTTP1ConnectionEstablished(connection) self.state = .http1(http1StateMachine) return action + + case .http2(let http2StateMachine): + var http1StateMachine = HTTP1StateMachine( + idGenerator: self.idGenerator, + maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections + ) + + let newConnectionAction = http1StateMachine.migrateFromHTTP2( + http1Connections: http2StateMachine.http1Connections, + http2Connections: http2StateMachine.connections, + requests: http2StateMachine.requests, + newHTTP1Connection: connection + ) + self.state = .http1(http1StateMachine) + return newConnectionAction } } - mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action { + mutating func newHTTP2ConnectionCreated(_ connection: Connection, maxConcurrentStreams: Int) -> Action { switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.failedToCreateNewConnection( - error, - connectionID: connectionID + case .http1(let http1StateMachine): + + var http2StateMachine = HTTP2StateMachine( + idGenerator: self.idGenerator ) - self.state = .http1(http1StateMachine) - return action + let migrationAction = http2StateMachine.migrateFromHTTP1( + http1Connections: http1StateMachine.connections, + http2Connections: http1StateMachine.http2Connections, + requests: http1StateMachine.requests, + newHTTP2Connection: connection, + maxConcurrentStreams: maxConcurrentStreams + ) + + self.state = .http2(http2StateMachine) + return migrationAction + + case .http2(var http2StateMachine): + let newConnectionAction = http2StateMachine.newHTTP2ConnectionEstablished( + connection, + maxConcurrentStreams: maxConcurrentStreams + ) + self.state = .http2(http2StateMachine) + return newConnectionAction } } + mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action { + self.state.modify(http1: { http1 in + http1.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) + }, http2: { http2 in + http2.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) + }) + } + + mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action { + self.state.modify(http1: { http1 in + http1.http2ConnectionGoAwayReceived(connectionID) + }, http2: { http2 in + http2.http2ConnectionGoAwayReceived(connectionID) + }) + } + + mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action { + self.state.modify(http1: { http1 in + http1.http2ConnectionClosed(connectionID) + }, http2: { http2 in + http2.http2ConnectionClosed(connectionID) + }) + } + + mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action { + self.state.modify(http1: { http1 in + http1.http2ConnectionStreamClosed(connectionID) + }, http2: { http2 in + http2.http2ConnectionStreamClosed(connectionID) + }) + } + + mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action { + self.state.modify(http1: { http1 in + http1.failedToCreateNewConnection(error, connectionID: connectionID) + }, http2: { http2 in + http2.failedToCreateNewConnection(error, connectionID: connectionID) + }) + } + mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.connectionCreationBackoffDone(connectionID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.connectionCreationBackoffDone(connectionID) + }, http2: { http2 in + http2.connectionCreationBackoffDone(connectionID) + }) } /// A request has timed out. @@ -131,12 +217,11 @@ extension HTTPConnectionPool { /// request, but don't need to cancel the timer (it already triggered). If a request is cancelled /// we don't need to fail it but we need to cancel its timeout timer. mutating func timeoutRequest(_ requestID: Request.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.timeoutRequest(requestID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.timeoutRequest(requestID) + }, http2: { http2 in + http2.timeoutRequest(requestID) + }) } /// A request was cancelled. @@ -145,40 +230,36 @@ extension HTTPConnectionPool { /// need to cancel its timeout timer. If a request times out, we need to fail the request, but don't /// need to cancel the timer (it already triggered). mutating func cancelRequest(_ requestID: Request.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.cancelRequest(requestID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.cancelRequest(requestID) + }, http2: { http2 in + http2.cancelRequest(requestID) + }) } mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.connectionIdleTimeout(connectionID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.connectionIdleTimeout(connectionID) + }, http2: { http2 in + http2.connectionIdleTimeout(connectionID) + }) } /// A connection has been closed mutating func http1ConnectionClosed(_ connectionID: Connection.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.http1ConnectionClosed(connectionID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.http1ConnectionClosed(connectionID) + }, http2: { http2 in + http2.http1ConnectionClosed(connectionID) + }) } mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.http1ConnectionReleased(connectionID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.http1ConnectionReleased(connectionID) + }, http2: { http2 in + http2.http1ConnectionReleased(connectionID) + }) } mutating func shutdown() -> Action { @@ -186,12 +267,11 @@ extension HTTPConnectionPool { self.isShuttingDown = true - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.shutdown() - self.state = .http1(http1StateMachine) - return action - } + return self.state.modify(http1: { http1 in + http1.shutdown() + }, http2: { http2 in + http2.shutdown() + }) } } } @@ -221,6 +301,8 @@ extension HTTPConnectionPool.StateMachine: CustomStringConvertible { switch self.state { case .http1(let http1): return ".http1(\(http1))" + case .http2(let http2): + return ".http2(\(http2))" } } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift index 2ad98202a..f9afa713e 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift @@ -40,6 +40,7 @@ extension HTTPConnectionPool_HTTP2ConnectionsTests { ("testNewMaxConcurrentStreamsSetting", testNewMaxConcurrentStreamsSetting), ("testLeaseOnPreferredEventLoopWithoutAnyAvailable", testLeaseOnPreferredEventLoopWithoutAnyAvailable), ("testMigrationFromHTTP1", testMigrationFromHTTP1), + ("testMigrationToHTTP1", testMigrationToHTTP1), ("testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop), ("testMigrationFromHTTP1WithAlreadyEstablishedHTTP2Connection", testMigrationFromHTTP1WithAlreadyEstablishedHTTP2Connection), ] diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift index 0d3f45ad0..804707959 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift @@ -556,6 +556,71 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { ) } + func testMigrationToHTTP1() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let generator = HTTPConnectionPool.Connection.ID.Generator() + var connections = HTTPConnectionPool.HTTP2Connections(generator: generator) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let el4 = elg.next() + + let conn1ID = generator.next() + let conn2ID = generator.next() + let conn3ID = generator.next() + let conn4ID = generator.next() + + connections.migrateFromHTTP1( + starting: [(conn1ID, el1), (conn2ID, el2), (conn3ID, el3)], + backingOff: [(conn4ID, el4)] + ) + + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(conn1CreatedContext.availableStreams, 100) + + let (leasedConn1, leasdConnContext1) = connections.leaseStreams(at: conn1Index, count: 2) + XCTAssertEqual(leasedConn1, conn1) + XCTAssertEqual(leasdConnContext1.wasIdle, true) + + let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el2) + let (_, conn2CreatedContext) = connections.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 100) + XCTAssertEqual(conn2CreatedContext.availableStreams, 100) + + XCTAssertEqual( + connections.stats, + .init( + startingConnections: 1, + backingOffConnections: 1, + idleConnections: 1, + availableConnections: 2, + drainingConnections: 0, + leasedStreams: 2, + availableStreams: 198 + ) + ) + + let migrationContext = connections.migrateToHTTP1() + XCTAssertEqual(migrationContext.close, [conn2]) + XCTAssertEqual(migrationContext.starting.map { $0.0 }, [conn3ID]) + XCTAssertEqual(migrationContext.starting.map { $0.1.id }, [el3.id]) + XCTAssertEqual(migrationContext.backingOff.map { $0.0 }, [conn4ID]) + XCTAssertEqual(migrationContext.backingOff.map { $0.1.id }, [el4.id]) + + XCTAssertEqual( + connections.stats, + .init( + startingConnections: 0, + backingOffConnections: 0, + idleConnections: 0, + availableConnections: 1, + drainingConnections: 0, + leasedStreams: 2, + availableStreams: 98 + ) + ) + } + func testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop() { let elg = EmbeddedEventLoopGroup(loops: 4) let generator = HTTPConnectionPool.Connection.ID.Generator() diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index a54d8c578..dbb489503 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -36,6 +36,9 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testGoAwayOnIdleConnection", testGoAwayOnIdleConnection), ("testGoAwayWithLeasedStream", testGoAwayWithLeasedStream), ("testGoAwayWithPendingRequestsStartsNewConnection", testGoAwayWithPendingRequestsStartsNewConnection), + ("testMigrationFromHTTP1ToHTTP2", testMigrationFromHTTP1ToHTTP2), + ("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections), + ("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index ce728147f..767f2f032 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -312,7 +312,13 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el1) var http2State = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - let http2ConnectAction = http2State.migrateFromHTTP1(http1State: http1State, newHTTP2Connection: conn2, maxConcurrentStreams: 100) + let http2ConnectAction = http2State.migrateFromHTTP1( + http1Connections: http1State.connections, + http2Connections: http1State.http2Connections, + requests: http1State.requests, + newHTTP2Connection: conn2, + maxConcurrentStreams: 100 + ) XCTAssertEqual(http2ConnectAction.connection, .migration(createConnections: [], closeConnections: [], scheduleTimeout: nil)) guard case .executeRequestsAndCancelTimeouts([request2], conn2) = http2ConnectAction.request else { return XCTFail("Unexpected request action \(http2ConnectAction.request)") @@ -577,4 +583,290 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(closeStream2Action.request, .none) XCTAssertEqual(closeStream2Action.connection, .scheduleTimeoutTimer(conn2ID, on: el1)) } + + func testMigrationFromHTTP1ToHTTP2() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + /// first 8 request should create a new connection + var connectionIDs: [HTTPConnectionPool.Connection.ID] = [] + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + guard case .createConnection(let connID, let eventLoop) = action.connection else { + return XCTFail("Unexpected connection action \(action.connection)") + } + connectionIDs.append(connID) + XCTAssertTrue(eventLoop === el1) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(connID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + guard let conn1ID = connectionIDs.first else { + return XCTFail("could not create connection") + } + + /// after we reached the `maximumConcurrentHTTP1Connections`, we will not create new connections + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + XCTAssertEqual(action.connection, .none) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + /// first new HTTP2 connection should migrate from HTTP1 to HTTP2 and execute requests + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(conn1ID, maxConcurrentStreams: 10)) + let migrationAction = state.newHTTP2ConnectionCreated(conn1, maxConcurrentStreams: 10) + guard case .executeRequestsAndCancelTimeouts(let requests, let conn) = migrationAction.request else { + return XCTFail("unexpected request action \(migrationAction.request)") + } + XCTAssertEqual(conn, conn1) + XCTAssertEqual(requests.count, 10) + + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: conn1)) + } + + XCTAssertEqual(migrationAction.connection, .migration( + createConnections: [], + closeConnections: [], + scheduleTimeout: nil + )) + + /// remaining connections should be closed immediately without executing any request + for connID in connectionIDs.dropFirst() { + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(connID, maxConcurrentStreams: 10)) + let action = state.newHTTP2ConnectionCreated(conn, maxConcurrentStreams: 10) + XCTAssertEqual(action.request, .none) + XCTAssertEqual(action.connection, .closeConnection(conn, isShutdown: .no)) + XCTAssertNoThrow(try connections.closeConnection(conn)) + } + + /// closing a stream while we have requests queued should result in one request execution action + for _ in 0..<6 { + XCTAssertNoThrow(try connections.finishExecution(conn1ID)) + let action = state.http2ConnectionStreamClosed(conn1ID) + XCTAssertEqual(action.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, conn) = action.request else { + return XCTFail("Unexpected request action \(action.request)") + } + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.cancel(request.id)) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: conn1)) + } + } + XCTAssertTrue(queuer.isEmpty) + } + + func testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + /// first 8 request should create a new connection + var connectionIDs: [HTTPConnectionPool.Connection.ID] = [] + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + guard case .createConnection(let connID, let eventLoop) = action.connection else { + return XCTFail("Unexpected connection action \(action.connection)") + } + connectionIDs.append(connID) + XCTAssertTrue(eventLoop === el1) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(connID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + /// after we reached the `maximumConcurrentHTTP1Connections`, we will not create new connections + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + XCTAssertEqual(action.connection, .none) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + let http1ConnIDs = connectionIDs.prefix(4) + let succesfullHTTP1ConnIDs = http1ConnIDs.prefix(2) + let failedHTTP1ConnIDs = http1ConnIDs.dropFirst(2) + + /// new http1 connection should execute 1 request + for connID in succesfullHTTP1ConnIDs { + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP1(connID)) + let action = state.newHTTP1ConnectionCreated(conn) + guard case .executeRequest(let request, conn, cancelTimeout: true) = action.request else { + return XCTFail("unexpected request action \(action.request)") + } + XCTAssertEqual(action.connection, .none) + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: conn)) + } + + /// failing connection should backoff connection + for connID in failedHTTP1ConnIDs { + XCTAssertNoThrow(try connections.failConnectionCreation(connID)) + struct SomeError: Error {} + let action = state.failedToCreateNewConnection(SomeError(), connectionID: connID) + guard case .scheduleBackoffTimer(connID, backoff: _, let el) = action.connection else { + return XCTFail("unexpected connection action \(action.connection)") + } + XCTAssertEqual(action.request, .none) + XCTAssertTrue(el === el1) + XCTAssertNoThrow(try connections.startConnectionBackoffTimer(connID)) + } + + let http2ConnectionIDs = Array(connectionIDs.dropFirst(4)) + + guard let firstHTTP2ConnID = http2ConnectionIDs.first else { + return XCTFail("could not create connection") + } + + /// first new HTTP2 connection should migrate from HTTP1 to HTTP2 and execute requests + let http2Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: firstHTTP2ConnID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(firstHTTP2ConnID, maxConcurrentStreams: 10)) + let migrationAction = state.newHTTP2ConnectionCreated(http2Conn, maxConcurrentStreams: 10) + guard case .executeRequestsAndCancelTimeouts(let requests, let conn) = migrationAction.request else { + return XCTFail("unexpected request action \(migrationAction.request)") + } + XCTAssertEqual(migrationAction.connection, .migration(createConnections: [], closeConnections: [], scheduleTimeout: nil)) + + XCTAssertEqual(conn, http2Conn) + XCTAssertEqual(requests.count, 10) + + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: http2Conn)) + } + + /// remaining connections should be closed immediately without executing any request + for connID in http2ConnectionIDs.dropFirst() { + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(connID, maxConcurrentStreams: 10)) + let action = state.newHTTP2ConnectionCreated(conn, maxConcurrentStreams: 10) + XCTAssertEqual(action.request, .none) + XCTAssertEqual(action.connection, .closeConnection(conn, isShutdown: .no)) + XCTAssertNoThrow(try connections.closeConnection(conn)) + } + + /// after a request has finished on a http1 connection, the connection should be closed + /// because we are now in http/2 mode + for http1ConnectionID in succesfullHTTP1ConnIDs { + XCTAssertNoThrow(try connections.finishExecution(http1ConnectionID)) + let action = state.http1ConnectionReleased(http1ConnectionID) + XCTAssertEqual(action.request, .none) + guard case .closeConnection(let conn, isShutdown: .no) = action.connection else { + return XCTFail("unexpected connection action \(migrationAction.connection)") + } + XCTAssertEqual(conn.id, http1ConnectionID) + } + + /// if a backoff timer fires for an old http1 connection we should not start a new connection + /// because we are already in http2 mode + for http1ConnectionID in failedHTTP1ConnIDs { + XCTAssertNoThrow(try connections.connectionBackoffTimerDone(http1ConnectionID)) + let action = state.connectionCreationBackoffDone(http1ConnectionID) + XCTAssertEqual(action, .none) + } + + /// closing a stream while we have requests queued should result in one request execution action + for _ in 0..<4 { + XCTAssertNoThrow(try connections.finishExecution(http2Conn.id)) + let action = state.http2ConnectionStreamClosed(http2Conn.id) + XCTAssertEqual(action.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, http2Conn) = action.request else { + return XCTFail("Unexpected request action \(action.request)") + } + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: http2Conn)) + } + } + + XCTAssertTrue(queuer.isEmpty) + } + + func testHTTP2toHTTP1Migration() { + let elg = EmbeddedEventLoopGroup(loops: 2) + let el1 = elg.next() + let el2 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + // create http2 connection + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request1 = HTTPConnectionPool.Request(mockRequest) + let action1 = state.executeRequest(request1) + guard case .createConnection(let http2ConnID, let http2EventLoop) = action1.connection else { + return XCTFail("Unexpected connection action \(action1.connection)") + } + XCTAssertTrue(http2EventLoop === el1) + XCTAssertEqual(action1.request, .scheduleRequestTimeout(for: request1, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(http2ConnID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request1.id)) + let http2Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http2ConnID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(http2ConnID, maxConcurrentStreams: 10)) + let migrationAction1 = state.newHTTP2ConnectionCreated(http2Conn, maxConcurrentStreams: 10) + guard case .executeRequestsAndCancelTimeouts(let requests, http2Conn) = migrationAction1.request else { + return XCTFail("unexpected request action \(migrationAction1.request)") + } + XCTAssertEqual(migrationAction1.connection, .migration(createConnections: [], closeConnections: [], scheduleTimeout: nil)) + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: http2Conn)) + } + + // a request with new required event loop should create a new connection + let mockRequestWithRequiredEventLoop = MockHTTPRequest(eventLoop: el2, requiresEventLoopForChannel: true) + let requestWithRequiredEventLoop = HTTPConnectionPool.Request(mockRequestWithRequiredEventLoop) + let action2 = state.executeRequest(requestWithRequiredEventLoop) + guard case .createConnection(let http1ConnId, let http1EventLoop) = action2.connection else { + return XCTFail("Unexpected connection action \(action2.connection)") + } + XCTAssertTrue(http1EventLoop === el2) + XCTAssertEqual(action2.request, .scheduleRequestTimeout(for: requestWithRequiredEventLoop, on: mockRequestWithRequiredEventLoop.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(http1ConnId, on: el2)) + XCTAssertNoThrow(try queuer.queue(mockRequestWithRequiredEventLoop, id: requestWithRequiredEventLoop.id)) + + // if we established a new http/1 connection we should migrate back to http/1 + let http1Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http1ConnId, eventLoop: el2) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP1(http1ConnId)) + let migrationAction2 = state.newHTTP1ConnectionCreated(http1Conn) + guard case .executeRequest(let request2, http1Conn, cancelTimeout: true) = migrationAction2.request else { + return XCTFail("unexpected request action \(migrationAction2.request)") + } + guard case .migration(let createConnections, closeConnections: [], scheduleTimeout: nil) = migrationAction2.connection else { + return XCTFail("unexpected connection action \(migrationAction2.connection)") + } + XCTAssertEqual(createConnections.map { $0.1.id }, [el2.id]) + XCTAssertNoThrow(try queuer.get(request2.id, request: request2.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request2.__testOnly_wrapped_request(), on: http1Conn)) + + // in http/1 state, we should close idle http2 connections + XCTAssertNoThrow(try connections.finishExecution(http2Conn.id)) + let releaseAction = state.http2ConnectionStreamClosed(http2Conn.id) + XCTAssertEqual(releaseAction.connection, .closeConnection(http2Conn, isShutdown: .no)) + XCTAssertEqual(releaseAction.request, .none) + XCTAssertNoThrow(try connections.closeConnection(http2Conn)) + } }