diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 560d5ff0f..e2ef07065 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -49,6 +49,15 @@ extension HTTPConnectionPool { } } + var isStartingOrActive: Bool { + switch self.state { + case .starting, .active: + return true + case .draining, .backingOff, .closed: + return false + } + } + /// A connection is established and can potentially execute requests if not all streams are leased var isActive: Bool { switch self.state { @@ -439,24 +448,26 @@ extension HTTPConnectionPool { } } - /// used after backoff is done to determine if we need to create a new connection - func hasStartingOrActiveConnection() -> Bool { - self.connections.contains { connection in - connection.canOrWillBeAbleToExecuteRequests - } - } - /// used after backoff is done to determine if we need to create a new connection /// - Parameters: /// - eventLoop: connection `EventLoop` to search for - /// - Returns: true if at least one connection is starting or active for the given `eventLoop` - func hasStartingOrActiveConnection( + /// - Returns: if we have a starting or active general purpose connection and if we have also one for the given `eventLoop` + func backingOffTimerDone( for eventLoop: EventLoop - ) -> Bool { - self.connections.contains { connection in - connection.eventLoop === eventLoop && - connection.canOrWillBeAbleToExecuteRequests + ) -> RetryConnectionCreationContext { + var hasGeneralPurposeConnection: Bool = false + var hasConnectionOnSpecifiedEventLoop: Bool = false + for connection in self.connections { + guard connection.isStartingOrActive else { continue } + hasGeneralPurposeConnection = true + guard connection.eventLoop === eventLoop else { continue } + hasConnectionOnSpecifiedEventLoop = true + break } + return RetryConnectionCreationContext( + hasGeneralPurposeConnection: hasGeneralPurposeConnection, + hasConnectionOnSpecifiedEventLoop: hasConnectionOnSpecifiedEventLoop + ) } mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID { @@ -675,6 +686,14 @@ extension HTTPConnectionPool { // MARK: Result structs + struct RetryConnectionCreationContext { + /// true if at least one connection is starting or active regardless of the event loop. + let hasGeneralPurposeConnection: Bool + + /// true if at least one connection is starting or active for the given `eventLoop` + let hasConnectionOnSpecifiedEventLoop: Bool + } + /// Information around a connection which is either in the .active or .draining state. struct EstablishedConnectionContext { /// number of streams which can be leased diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 94ae3edda..7a9f4dc7c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -321,18 +321,21 @@ extension HTTPConnectionPool { // per event loop for required event loop requests and only need one connection for // general purpose requests. + // precompute if we have starting or active connections to only iterate once over `self.connections` + let context = self.connections.backingOffTimerDone(for: eventLoop) + // we need to start a new on connection in two cases: let needGeneralPurposeConnection = // 1. if we have general purpose requests !self.requests.isEmpty(for: nil) && // and no connection starting or active - !self.connections.hasStartingOrActiveConnection() + !context.hasGeneralPurposeConnection let needRequiredEventLoopConnection = // 2. or if we have requests for a required event loop !self.requests.isEmpty(for: eventLoop) && // and no connection starting or active for the given event loop - !self.connections.hasStartingOrActiveConnection(for: eventLoop) + !context.hasConnectionOnSpecifiedEventLoop guard needGeneralPurposeConnection || needRequiredEventLoopConnection else { // otherwise we can remove the connection diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index dbb489503..2c335779e 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -39,6 +39,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testMigrationFromHTTP1ToHTTP2", testMigrationFromHTTP1ToHTTP2), ("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections), ("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration), + ("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 767f2f032..e8a4edbbe 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -869,4 +869,78 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(releaseAction.request, .none) XCTAssertNoThrow(try connections.closeConnection(http2Conn)) } + + func testConnectionIsImmediatelyCreatedAfterBackoffTimerFires() { + let elg = EmbeddedEventLoopGroup(loops: 2) + let el1 = elg.next() + let el2 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + var connectionIDs: [HTTPConnectionPool.Connection.ID] = [] + for el in [el1, el2, el2] { + let mockRequest = MockHTTPRequest(eventLoop: el, requiresEventLoopForChannel: true) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + guard case .createConnection(let connID, let eventLoop) = action.connection else { + return XCTFail("Unexpected connection action \(action.connection)") + } + connectionIDs.append(connID) + XCTAssertTrue(eventLoop === el) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(connID, on: el)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + // fail the two connections for el2 + for connectionID in connectionIDs.dropFirst() { + struct SomeError: Error {} + XCTAssertNoThrow(try connections.failConnectionCreation(connectionID)) + let action = state.failedToCreateNewConnection(SomeError(), connectionID: connectionID) + XCTAssertEqual(action.request, .none) + guard case .scheduleBackoffTimer(connectionID, backoff: _, on: _) = action.connection else { + return XCTFail("unexpected connection action \(connectionID)") + } + XCTAssertNoThrow(try connections.startConnectionBackoffTimer(connectionID)) + } + let http2ConnID1 = connectionIDs[0] + let http2ConnID2 = connectionIDs[1] + let http2ConnID3 = connectionIDs[2] + + // let the first connection on el1 succeed as a http2 connection + let http2Conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: http2ConnID1, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(http2ConnID1, maxConcurrentStreams: 10)) + let migrationAction1 = state.newHTTP2ConnectionCreated(http2Conn1, maxConcurrentStreams: 10) + guard case .executeRequestsAndCancelTimeouts(let requests, http2Conn1) = migrationAction1.request else { + return XCTFail("unexpected request action \(migrationAction1.request)") + } + XCTAssertEqual(migrationAction1.connection, .migration(createConnections: [], closeConnections: [], scheduleTimeout: nil)) + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: http2Conn1)) + } + + // we now have 1 active connection on el1 and 2 backing off connections on el2 + // with 2 queued requests with a requirement to be executed on el2 + + // if the backoff timer fires for a connection on el2, we should immediately start a new connection + XCTAssertNoThrow(try connections.connectionBackoffTimerDone(http2ConnID2)) + let action2 = state.connectionCreationBackoffDone(http2ConnID2) + XCTAssertEqual(action2.request, .none) + guard case .createConnection(let newHttp2ConnID2, let eventLoop2) = action2.connection else { + return XCTFail("Unexpected connection action \(action2.connection)") + } + XCTAssertTrue(eventLoop2 === el2) + XCTAssertNoThrow(try connections.createConnection(newHttp2ConnID2, on: el2)) + + // we now have a starting connection for el2 and another one backing off + + // if the backoff timer fires now for a connection on el2, we should *not* start a new connection + XCTAssertNoThrow(try connections.connectionBackoffTimerDone(http2ConnID3)) + let action3 = state.connectionCreationBackoffDone(http2ConnID3) + XCTAssertEqual(action3.request, .none) + XCTAssertEqual(action3.connection, .none) + } }