diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index fffd283c0..446720140 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -69,6 +69,15 @@ extension HTTPConnectionPool { } } + var canOrWillBeAbleToExecuteRequests: Bool { + switch self.state { + case .leased, .backingOff, .idle, .starting: + return true + case .closed: + return false + } + } + var isLeased: Bool { switch self.state { case .leased: @@ -281,6 +290,10 @@ extension HTTPConnectionPool { return connecting } + private var maximumAdditionalGeneralPurposeConnections: Int { + self.maximumConcurrentConnections - (self.overflowIndex - 1) + } + /// Is there at least one connection that is able to run requests var hasActiveConnections: Bool { self.connections.contains(where: { $0.isIdle || $0.isLeased }) @@ -530,8 +543,8 @@ extension HTTPConnectionPool { return migrationContext } - /// we only handle starting and backing off connection here. - /// All running connections must be handled by the enclosing state machine + /// We only handle starting and backing off connection here. + /// All already running connections must be handled by the enclosing state machine. /// - Parameters: /// - starting: starting HTTP connections from previous state machine /// - backingOff: backing off HTTP connections from previous state machine @@ -541,17 +554,96 @@ extension HTTPConnectionPool { ) { for (connectionID, eventLoop) in starting { let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop) - self.connections.append(newConnection) + self.connections.insert(newConnection, at: self.overflowIndex) + /// If we can grow, we mark the connection as a general purpose connection. + /// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop + if self.canGrow { + self.overflowIndex = self.connections.index(after: self.overflowIndex) + } } for (connectionID, eventLoop) in backingOff { var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop) // TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState backingOffConnection.failedToConnect() - self.connections.append(backingOffConnection) + self.connections.insert(backingOffConnection, at: self.overflowIndex) + /// If we can grow, we mark the connection as a general purpose connection. + /// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop + if self.canGrow { + self.overflowIndex = self.connections.index(after: self.overflowIndex) + } } } + /// We will create new connections for each `requiredEventLoopOfPendingRequests` + /// In addition, we also create more general purpose connections if we do not have enough to execute + /// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests` + /// until we reach `maximumConcurrentConnections` + /// - Parameters: + /// - requiredEventLoopsForPendingRequests: + /// event loops for which we have requests with a required event loop. + /// Duplicates are not allowed. + /// - generalPurposeRequestCountPerPreferredEventLoop: + /// request count with no required event loop, + /// grouped by preferred event loop and ordered descending by number of requests + /// - Returns: new connections that must be created + mutating func createConnectionsAfterMigrationIfNeeded( + requiredEventLoopOfPendingRequests: [(EventLoop, Int)], + generalPurposeRequestCountGroupedByPreferredEventLoop: [(EventLoop, Int)] + ) -> [(Connection.ID, EventLoop)] { + // create new connections for requests with a required event loop + + // we may already start connections for those requests and do not want to start to many + let startingRequiredEventLoopConnectionCount = Dictionary( + self.connections[self.overflowIndex.. [(Connection.ID, EventLoop)] in + // We need a connection for each queued request with a required event loop. + // Therefore, we look how many request we have queued for a given `eventLoop` and + // how many connections we are already starting on the given `eventLoop`. + // If we have not enough, we will create additional connections to have at least + // on connection per request. + let connectionsToStart = requestCount - startingRequiredEventLoopConnectionCount[eventLoop.id, default: 0] + return stride(from: 0, to: connectionsToStart, by: 1).lazy.map { _ in + (self.createNewOverflowConnection(on: eventLoop), eventLoop) + } + } + + // create new connections for requests without a required event loop + + // TODO: improve algorithm to create connections uniformly across all preferred event loops + // while paying attention to the number of queued request per event loop + // Currently we start by creating new connections on the event loop with the most queued + // requests. If we have create a enough connections to cover all requests for the given + // event loop we will continue with the event loop with the second most queued requests + // and so on and so forth. We do not need to sort the array because + let newGeneralPurposeConnections: [(Connection.ID, EventLoop)] = generalPurposeRequestCountGroupedByPreferredEventLoop + // we do not want to allocated intermediate arrays. + .lazy + // we flatten the grouped list of event loops by lazily repeating the event loop + // for each request. + // As a result we get one event loop per request (`[EventLoop]`). + .flatMap { eventLoop, requestCount in + repeatElement(eventLoop, count: requestCount) + } + // we may already start connections and do not want to start too many + .dropLast(self.startingGeneralPurposeConnections) + // we need to respect the used defined `maximumConcurrentConnections` + .prefix(self.maximumAdditionalGeneralPurposeConnections) + // we now create a connection for each remaining event loop + .map { eventLoop in + (self.createNewConnection(on: eventLoop), eventLoop) + } + + connectionToCreate.append(contentsOf: newGeneralPurposeConnections) + + return connectionToCreate + } + // MARK: Shutdown mutating func shutdown() -> CleanupContext { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index 42bad981b..b7505ed33 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -84,32 +84,40 @@ extension HTTPConnectionPool { http2Connections: HTTP2Connections, requests: RequestQueue ) -> ConnectionMigrationAction { - precondition(self.connections.isEmpty) - precondition(self.http2Connections == nil) - precondition(self.requests.isEmpty) + precondition(self.connections.isEmpty, "expected an empty state machine but connections are not empty") + precondition(self.http2Connections == nil, "expected an empty state machine but http2Connections are not nil") + precondition(self.requests.isEmpty, "expected an empty state machine but requests are not empty") + self.requests = requests + + // we may have remaining open http1 connections from a pervious migration to http2 if let http1Connections = http1Connections { self.connections = http1Connections } var http2Connections = http2Connections let migration = http2Connections.migrateToHTTP1() + self.connections.migrateFromHTTP2( starting: migration.starting, backingOff: migration.backingOff ) + let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded( + requiredEventLoopOfPendingRequests: requests.requestCountGroupedByRequiredEventLoop(), + generalPurposeRequestCountGroupedByPreferredEventLoop: requests.generalPurposeRequestCountGroupedByPreferredEventLoop() + ) + if !http2Connections.isEmpty { self.http2Connections = http2Connections } - // TODO: Close all idle connections from context.close - // TODO: Start new http1 connections for pending requests // TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap) - self.requests = requests - - return .init(closeConnections: [], createConnections: []) + return .init( + closeConnections: migration.close, + createConnections: createConnections + ) } // MARK: - Events diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 33c747eb1..0a6cdbb16 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -346,8 +346,8 @@ extension HTTPConnectionPool { // MARK: Migration - /// we only handle starting and backing off connection here. - /// All running connections must be handled by the enclosing state machine + /// We only handle starting and backing off connection here. + /// All already running connections must be handled by the enclosing state machine. /// - Parameters: /// - starting: starting HTTP connections from previous state machine /// - backingOff: backing off HTTP connections from previous state machine @@ -368,6 +368,31 @@ extension HTTPConnectionPool { } } + /// We will create new connections for `requiredEventLoopsOfPendingRequests` + /// if we do not already have a connection that can or will be able to execute requests on the given event loop. + /// - Parameters: + /// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed. + /// - Returns: new connections that need to be created + mutating func createConnectionsAfterMigrationIfNeeded( + requiredEventLoopsOfPendingRequests: [EventLoop] + ) -> [(Connection.ID, EventLoop)] { + // create new connections for requests with a required event loop + let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set( + self.connections.lazy + .filter { + $0.canOrWillBeAbleToExecuteRequests + }.map { + $0.eventLoop.id + } + ) + return requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in + guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id) + else { return nil } + let connectionID = self.createNewConnection(on: eventLoop) + return (connectionID, eventLoop) + } + } + struct HTTP2ToHTTP1MigrationContext { var backingOff: [(Connection.ID, EventLoop)] = [] var starting: [(Connection.ID, EventLoop)] = [] diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index a314c0861..1d5a4f68d 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -92,10 +92,13 @@ extension HTTPConnectionPool { http2Connections: HTTP2Connections?, requests: RequestQueue ) -> ConnectionMigrationAction { - precondition(self.http1Connections == nil) - precondition(self.connections.isEmpty) - precondition(self.requests.isEmpty) + precondition(self.connections.isEmpty, "expected an empty state machine but connections are not empty") + precondition(self.http1Connections == nil, "expected an empty state machine but http1Connections are not nil") + precondition(self.requests.isEmpty, "expected an empty state machine but requests are not empty") + self.requests = requests + + // we may have remaining open http2 connections from a pervious migration to http1 if let http2Connections = http2Connections { self.connections = http2Connections } @@ -107,17 +110,19 @@ extension HTTPConnectionPool { backingOff: context.backingOff ) + let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded( + requiredEventLoopsOfPendingRequests: requests.eventLoopsWithPendingRequests() + ) + if !http1Connections.isEmpty { self.http1Connections = http1Connections } - self.requests = requests - - // TODO: Close all idle connections from context.close - // TODO: Start new http2 connections for pending requests // TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap) - - return .init(closeConnections: [], createConnections: []) + return .init( + closeConnections: context.close, + createConnections: createConnections + ) } mutating func executeRequest(_ request: Request) -> Action { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift index 74707e3f3..110533f53 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift @@ -14,6 +14,21 @@ import NIOCore +private struct HashableEventLoop: Hashable { + static func == (lhs: HashableEventLoop, rhs: HashableEventLoop) -> Bool { + lhs.eventLoop === rhs.eventLoop + } + + init(_ eventLoop: EventLoop) { + self.eventLoop = eventLoop + } + + let eventLoop: EventLoop + func hash(into hasher: inout Hasher) { + self.eventLoop.id.hash(into: &hasher) + } +} + extension HTTPConnectionPool { /// A struct to store all queued requests. struct RequestQueue { @@ -131,6 +146,42 @@ extension HTTPConnectionPool { } return nil } + + /// - Returns: event loops with at least one request with a required event loop + func eventLoopsWithPendingRequests() -> [EventLoop] { + self.eventLoopQueues.compactMap { + /// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop` + /// however, a queue can be empty + $0.value.first?.requiredEventLoop! + } + } + + /// - Returns: request count for requests with required event loop, grouped by required event loop without any particular order + func requestCountGroupedByRequiredEventLoop() -> [(EventLoop, Int)] { + self.eventLoopQueues.values.compactMap { requests -> (EventLoop, Int)? in + /// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`, + /// however, a queue can be empty + guard let requiredEventLoop = requests.first?.requiredEventLoop! else { + return nil + } + return (requiredEventLoop, requests.count) + } + } + + /// - Returns: request count with **no** required event loop, grouped by preferred event loop and ordered descending by number of requests + func generalPurposeRequestCountGroupedByPreferredEventLoop() -> [(EventLoop, Int)] { + let requestCountPerEventLoop = Dictionary( + self.generalPurposeQueue.lazy.map { request in + (HashableEventLoop(request.preferredEventLoop), 1) + }, + uniquingKeysWith: + + ) + return requestCountPerEventLoop.lazy + .map { ($0.key.eventLoop, $0.value) } + .sorted { lhs, rhs in + lhs.1 > rhs.1 + } + } } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest+XCTest.swift index d6d7d8176..78ccf2d2b 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest+XCTest.swift @@ -35,6 +35,12 @@ extension HTTPConnectionPool_HTTP1ConnectionsTests { ("testCloseConnectionIfIdleButLeasedRaceCondition", testCloseConnectionIfIdleButLeasedRaceCondition), ("testCloseConnectionIfIdleButClosedRaceCondition", testCloseConnectionIfIdleButClosedRaceCondition), ("testShutdown", testShutdown), + ("testMigrationFromHTTP2", testMigrationFromHTTP2), + ("testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop), + ("testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop), + ("testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection", testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection), + ("testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections", testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections), + ("testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests", testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift index 8123657dd..35b94c239 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift @@ -339,4 +339,228 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { connections.removeConnection(at: failIndex) XCTAssertTrue(connections.isEmpty) } + + func testMigrationFromHTTP2() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let generator = HTTPConnectionPool.Connection.ID.Generator() + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator) + + let el1 = elg.next() + let el2 = elg.next() + + let conn1ID = generator.next() + let conn2ID = generator.next() + + connections.migrateFromHTTP2( + starting: [(conn1ID, el1)], + backingOff: [(conn2ID, el2)] + ) + let newConnections = connections.createConnectionsAfterMigrationIfNeeded( + requiredEventLoopOfPendingRequests: [], + generalPurposeRequestCountGroupedByPreferredEventLoop: [(el1, 1), (el2, 1)] + ) + + XCTAssertTrue(newConnections.isEmpty) + + let stats = connections.stats + XCTAssertEqual(stats.idle, 0) + XCTAssertEqual(stats.leased, 0) + XCTAssertEqual(stats.connecting, 1) + XCTAssertEqual(stats.backingOff, 1) + } + + func testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let generator = HTTPConnectionPool.Connection.ID.Generator() + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator) + + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + + let conn1ID = generator.next() + let conn2ID = generator.next() + + connections.migrateFromHTTP2( + starting: [(conn1ID, el1)], + backingOff: [(conn2ID, el2)] + ) + let newConnections = connections.createConnectionsAfterMigrationIfNeeded( + requiredEventLoopOfPendingRequests: [(el3, 1)], + generalPurposeRequestCountGroupedByPreferredEventLoop: [] + ) + XCTAssertEqual(newConnections.count, 1) + XCTAssertEqual(newConnections.first?.1.id, el3.id) + + guard let conn3ID = newConnections.first?.0 else { + return XCTFail("expected to start a new connection") + } + + let stats = connections.stats + XCTAssertEqual(stats.idle, 0) + XCTAssertEqual(stats.leased, 0) + XCTAssertEqual(stats.connecting, 2) + XCTAssertEqual(stats.backingOff, 1) + + let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn3ID, eventLoop: el3) + let (_, context) = connections.newHTTP1ConnectionEstablished(conn3) + XCTAssertEqual(context.use, .eventLoop(el3)) + XCTAssertTrue(context.eventLoop === el3) + } + + func testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let generator = HTTPConnectionPool.Connection.ID.Generator() + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator) + + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + + let conn1ID = generator.next() + let conn2ID = generator.next() + + connections.migrateFromHTTP2( + starting: [(conn1ID, el1)], + backingOff: [(conn2ID, el2)] + ) + let newConnections = connections.createConnectionsAfterMigrationIfNeeded( + requiredEventLoopOfPendingRequests: [], + generalPurposeRequestCountGroupedByPreferredEventLoop: [(el3, 3)] + ) + XCTAssertEqual(newConnections.count, 1) + XCTAssertEqual(newConnections.first?.1.id, el3.id) + + guard let conn3ID = newConnections.first?.0 else { + return XCTFail("expected to start a new connection") + } + + let stats = connections.stats + XCTAssertEqual(stats.idle, 0) + XCTAssertEqual(stats.leased, 0) + XCTAssertEqual(stats.connecting, 2) + XCTAssertEqual(stats.backingOff, 1) + + let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn3ID, eventLoop: el3) + let (_, context) = connections.newHTTP1ConnectionEstablished(conn3) + XCTAssertEqual(context.use, .generalPurpose) + XCTAssertTrue(context.eventLoop === el3) + } + + func testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let generator = HTTPConnectionPool.Connection.ID.Generator() + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (index, _) = connections.newHTTP1ConnectionEstablished(conn1) + _ = connections.leaseConnection(at: index) + + let conn2ID = generator.next() + let conn3ID = generator.next() + + connections.migrateFromHTTP2( + starting: [(conn2ID, el2)], + backingOff: [(conn3ID, el3)] + ) + let newConnections = connections.createConnectionsAfterMigrationIfNeeded( + requiredEventLoopOfPendingRequests: [], + generalPurposeRequestCountGroupedByPreferredEventLoop: [(el3, 3)] + ) + + XCTAssertEqual(newConnections.count, 1) + XCTAssertEqual(newConnections.first?.1.id, el3.id) + + guard let conn4ID = newConnections.first?.0 else { + return XCTFail("expected to start a new connection") + } + + let stats = connections.stats + XCTAssertEqual(stats.idle, 0) + XCTAssertEqual(stats.leased, 1) + XCTAssertEqual(stats.connecting, 2) + XCTAssertEqual(stats.backingOff, 1) + + let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn4ID, eventLoop: el3) + let (_, context) = connections.newHTTP1ConnectionEstablished(conn3) + XCTAssertEqual(context.use, .generalPurpose) + XCTAssertTrue(context.eventLoop === el3) + } + + func testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let generator = HTTPConnectionPool.Connection.ID.Generator() + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 2, generator: generator) + + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + + let conn1ID = generator.next() + let conn2ID = generator.next() + let conn3ID = generator.next() + + connections.migrateFromHTTP2( + starting: [(conn1ID, el1), (conn2ID, el2), (conn3ID, el3)], + backingOff: [] + ) + + // first two connections should be added as general purpose connections + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (_, context1) = connections.newHTTP1ConnectionEstablished(conn1) + XCTAssertEqual(context1.use, .generalPurpose) + XCTAssertTrue(context1.eventLoop === el1) + let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el2) + let (_, context2) = connections.newHTTP1ConnectionEstablished(conn2) + XCTAssertEqual(context2.use, .generalPurpose) + XCTAssertTrue(context2.eventLoop === el2) + + // additional connection should be added as overflow connection + let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn3ID, eventLoop: el3) + let (_, context3) = connections.newHTTP1ConnectionEstablished(conn3) + XCTAssertEqual(context3.use, .eventLoop(el3)) + XCTAssertTrue(context3.eventLoop === el3) + } + + func testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let generator = HTTPConnectionPool.Connection.ID.Generator() + var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 1, generator: generator) + + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let el4 = elg.next() + + let conn1ID = generator.next() + let conn2ID = generator.next() + let conn3ID = generator.next() + + connections.migrateFromHTTP2( + starting: [(conn1ID, el1), (conn2ID, el2), (conn3ID, el3)], + backingOff: [] + ) + + let connectionsToCreate = connections.createConnectionsAfterMigrationIfNeeded( + requiredEventLoopOfPendingRequests: [(el2, 2), (el3, 1), (el4, 2)], + generalPurposeRequestCountGroupedByPreferredEventLoop: [] + ) + + XCTAssertEqual( + connectionsToCreate.map { $0.1.id }, + [el2.id, el4.id, el4.id], + "should create one connection for el2 and two for el4" + ) + + for (connID, el) in connectionsToCreate { + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el) + let (_, context) = connections.newHTTP1ConnectionEstablished(conn) + XCTAssertEqual(context.use, .eventLoop(el)) + XCTAssertTrue(context.eventLoop === el) + } + } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift index 953da7222..2ad98202a 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift @@ -40,6 +40,8 @@ extension HTTPConnectionPool_HTTP2ConnectionsTests { ("testNewMaxConcurrentStreamsSetting", testNewMaxConcurrentStreamsSetting), ("testLeaseOnPreferredEventLoopWithoutAnyAvailable", testLeaseOnPreferredEventLoopWithoutAnyAvailable), ("testMigrationFromHTTP1", testMigrationFromHTTP1), + ("testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop), + ("testMigrationFromHTTP1WithAlreadyEstablishedHTTP2Connection", testMigrationFromHTTP1WithAlreadyEstablishedHTTP2Connection), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift index c361c5c25..0d3f45ad0 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift @@ -517,6 +517,10 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { starting: [(conn1ID, el1)], backingOff: [(conn2ID, el2)] ) + XCTAssertTrue(connections.createConnectionsAfterMigrationIfNeeded( + requiredEventLoopsOfPendingRequests: [el1, el2] + ).isEmpty) + XCTAssertEqual( connections.stats, .init( @@ -551,4 +555,68 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { ) ) } + + func testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let generator = HTTPConnectionPool.Connection.ID.Generator() + var connections = HTTPConnectionPool.HTTP2Connections(generator: generator) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let conn1ID = generator.next() + let conn2ID = generator.next() + + connections.migrateFromHTTP1( + starting: [(conn1ID, el1)], + backingOff: [(conn2ID, el2)] + ) + let newConnections = connections.createConnectionsAfterMigrationIfNeeded( + requiredEventLoopsOfPendingRequests: [el1, el2, el3] + ) + + XCTAssertEqual(newConnections.count, 1) + + guard let (conn3ID, eventLoop) = newConnections.first else { + return XCTFail("expected to start a new connection") + } + XCTAssertTrue(eventLoop === el3) + + let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn3ID, eventLoop: el3) + let (_, context) = connections.newHTTP2ConnectionEstablished(conn3, maxConcurrentStreams: 100) + XCTAssertEqual(context.availableStreams, 100) + XCTAssertEqual(context.eventLoop.id, el3.id) + XCTAssertEqual(context.isIdle, true) + XCTAssertEqual(context.connectionID, conn3ID) + } + + func testMigrationFromHTTP1WithAlreadyEstablishedHTTP2Connection() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let generator = HTTPConnectionPool.Connection.ID.Generator() + var connections = HTTPConnectionPool.HTTP2Connections(generator: generator) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (index, _) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + _ = connections.leaseStreams(at: index, count: 1) + + let conn2ID = generator.next() + let conn3ID = generator.next() + + connections.migrateFromHTTP1( + starting: [(conn2ID, el2)], + backingOff: [(conn3ID, el3)] + ) + + XCTAssertTrue(connections.createConnectionsAfterMigrationIfNeeded( + requiredEventLoopsOfPendingRequests: [el1, el2, el3] + ).isEmpty, "we still have an active connection for el1 and should not create a new one") + + guard let (leasedConn, _) = connections.leaseStream(onRequired: el1) else { + return XCTFail("could not lease stream on el1") + } + XCTAssertEqual(leasedConn, conn1) + } }