Skip to content

[HTTP2] Create new connections during migration if needed #459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -69,6 +69,15 @@ extension HTTPConnectionPool {
}
}

var canOrWillBeAbleToExecuteRequests: Bool {
switch self.state {
case .leased, .backingOff, .idle, .starting:
return true
case .closed:
return false
}
}

var isLeased: Bool {
switch self.state {
case .leased:
@@ -281,6 +290,10 @@ extension HTTPConnectionPool {
return connecting
}

private var maximumAdditionalGeneralPurposeConnections: Int {
self.maximumConcurrentConnections - (self.overflowIndex - 1)
}

/// Is there at least one connection that is able to run requests
var hasActiveConnections: Bool {
self.connections.contains(where: { $0.isIdle || $0.isLeased })
@@ -530,8 +543,8 @@ extension HTTPConnectionPool {
return migrationContext
}

/// we only handle starting and backing off connection here.
/// All running connections must be handled by the enclosing state machine
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
@@ -541,17 +554,96 @@ extension HTTPConnectionPool {
) {
for (connectionID, eventLoop) in starting {
let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
self.connections.append(newConnection)
self.connections.insert(newConnection, at: self.overflowIndex)
/// If we can grow, we mark the connection as a general purpose connection.
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
if self.canGrow {
self.overflowIndex = self.connections.index(after: self.overflowIndex)
}
}

for (connectionID, eventLoop) in backingOff {
var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
// TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
backingOffConnection.failedToConnect()
self.connections.append(backingOffConnection)
self.connections.insert(backingOffConnection, at: self.overflowIndex)
/// If we can grow, we mark the connection as a general purpose connection.
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
if self.canGrow {
self.overflowIndex = self.connections.index(after: self.overflowIndex)
}
}
}

/// We will create new connections for each `requiredEventLoopOfPendingRequests`
/// In addition, we also create more general purpose connections if we do not have enough to execute
/// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
/// until we reach `maximumConcurrentConnections`
/// - Parameters:
/// - requiredEventLoopsForPendingRequests:
/// event loops for which we have requests with a required event loop.
/// Duplicates are not allowed.
/// - generalPurposeRequestCountPerPreferredEventLoop:
/// request count with no required event loop,
/// grouped by preferred event loop and ordered descending by number of requests
/// - Returns: new connections that must be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: [(EventLoop, Int)],
generalPurposeRequestCountGroupedByPreferredEventLoop: [(EventLoop, Int)]
) -> [(Connection.ID, EventLoop)] {
// create new connections for requests with a required event loop

// we may already start connections for those requests and do not want to start to many
let startingRequiredEventLoopConnectionCount = Dictionary(
self.connections[self.overflowIndex..<self.connections.endIndex].lazy.map {
($0.eventLoop.id, 1)
},
uniquingKeysWith: +
)
var connectionToCreate = requiredEventLoopOfPendingRequests
.flatMap { (eventLoop, requestCount) -> [(Connection.ID, EventLoop)] in
// We need a connection for each queued request with a required event loop.
// Therefore, we look how many request we have queued for a given `eventLoop` and
// how many connections we are already starting on the given `eventLoop`.
// If we have not enough, we will create additional connections to have at least
// on connection per request.
let connectionsToStart = requestCount - startingRequiredEventLoopConnectionCount[eventLoop.id, default: 0]
return stride(from: 0, to: connectionsToStart, by: 1).lazy.map { _ in
(self.createNewOverflowConnection(on: eventLoop), eventLoop)
}
}

// create new connections for requests without a required event loop

// TODO: improve algorithm to create connections uniformly across all preferred event loops
// while paying attention to the number of queued request per event loop
// Currently we start by creating new connections on the event loop with the most queued
// requests. If we have create a enough connections to cover all requests for the given
// event loop we will continue with the event loop with the second most queued requests
// and so on and so forth. We do not need to sort the array because
let newGeneralPurposeConnections: [(Connection.ID, EventLoop)] = generalPurposeRequestCountGroupedByPreferredEventLoop
// we do not want to allocated intermediate arrays.
.lazy
// we flatten the grouped list of event loops by lazily repeating the event loop
// for each request.
// As a result we get one event loop per request (`[EventLoop]`).
.flatMap { eventLoop, requestCount in
repeatElement(eventLoop, count: requestCount)
}
// we may already start connections and do not want to start too many
.dropLast(self.startingGeneralPurposeConnections)
// we need to respect the used defined `maximumConcurrentConnections`
.prefix(self.maximumAdditionalGeneralPurposeConnections)
// we now create a connection for each remaining event loop
.map { eventLoop in
(self.createNewConnection(on: eventLoop), eventLoop)
}

connectionToCreate.append(contentsOf: newGeneralPurposeConnections)

return connectionToCreate
}

// MARK: Shutdown

mutating func shutdown() -> CleanupContext {
Original file line number Diff line number Diff line change
@@ -84,32 +84,40 @@ extension HTTPConnectionPool {
http2Connections: HTTP2Connections,
requests: RequestQueue
) -> ConnectionMigrationAction {
precondition(self.connections.isEmpty)
precondition(self.http2Connections == nil)
precondition(self.requests.isEmpty)
precondition(self.connections.isEmpty, "expected an empty state machine but connections are not empty")
precondition(self.http2Connections == nil, "expected an empty state machine but http2Connections are not nil")
precondition(self.requests.isEmpty, "expected an empty state machine but requests are not empty")

self.requests = requests

// we may have remaining open http1 connections from a pervious migration to http2
if let http1Connections = http1Connections {
self.connections = http1Connections
}

var http2Connections = http2Connections
let migration = http2Connections.migrateToHTTP1()

self.connections.migrateFromHTTP2(
starting: migration.starting,
backingOff: migration.backingOff
)

let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: requests.requestCountGroupedByRequiredEventLoop(),
generalPurposeRequestCountGroupedByPreferredEventLoop: requests.generalPurposeRequestCountGroupedByPreferredEventLoop()
)

if !http2Connections.isEmpty {
self.http2Connections = http2Connections
}

// TODO: Close all idle connections from context.close
// TODO: Start new http1 connections for pending requests
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

self.requests = requests

return .init(closeConnections: [], createConnections: [])
return .init(
closeConnections: migration.close,
createConnections: createConnections
)
}

// MARK: - Events
Original file line number Diff line number Diff line change
@@ -346,8 +346,8 @@ extension HTTPConnectionPool {

// MARK: Migration

/// we only handle starting and backing off connection here.
/// All running connections must be handled by the enclosing state machine
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
@@ -368,6 +368,31 @@ extension HTTPConnectionPool {
}
}

/// We will create new connections for `requiredEventLoopsOfPendingRequests`
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
/// - Parameters:
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
/// - Returns: new connections that need to be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: [EventLoop]
) -> [(Connection.ID, EventLoop)] {
// create new connections for requests with a required event loop
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
self.connections.lazy
.filter {
$0.canOrWillBeAbleToExecuteRequests
}.map {
$0.eventLoop.id
}
)
return requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in
guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id)
else { return nil }
let connectionID = self.createNewConnection(on: eventLoop)
return (connectionID, eventLoop)
}
}

struct HTTP2ToHTTP1MigrationContext {
var backingOff: [(Connection.ID, EventLoop)] = []
var starting: [(Connection.ID, EventLoop)] = []
Original file line number Diff line number Diff line change
@@ -92,10 +92,13 @@ extension HTTPConnectionPool {
http2Connections: HTTP2Connections?,
requests: RequestQueue
) -> ConnectionMigrationAction {
precondition(self.http1Connections == nil)
precondition(self.connections.isEmpty)
precondition(self.requests.isEmpty)
precondition(self.connections.isEmpty, "expected an empty state machine but connections are not empty")
precondition(self.http1Connections == nil, "expected an empty state machine but http1Connections are not nil")
precondition(self.requests.isEmpty, "expected an empty state machine but requests are not empty")

self.requests = requests

// we may have remaining open http2 connections from a pervious migration to http1
if let http2Connections = http2Connections {
self.connections = http2Connections
}
@@ -107,17 +110,19 @@ extension HTTPConnectionPool {
backingOff: context.backingOff
)

let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: requests.eventLoopsWithPendingRequests()
)

if !http1Connections.isEmpty {
self.http1Connections = http1Connections
}

self.requests = requests

// TODO: Close all idle connections from context.close
// TODO: Start new http2 connections for pending requests
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

return .init(closeConnections: [], createConnections: [])
return .init(
closeConnections: context.close,
createConnections: createConnections
)
}

mutating func executeRequest(_ request: Request) -> Action {
Original file line number Diff line number Diff line change
@@ -14,6 +14,21 @@

import NIOCore

private struct HashableEventLoop: Hashable {
static func == (lhs: HashableEventLoop, rhs: HashableEventLoop) -> Bool {
lhs.eventLoop === rhs.eventLoop
}

init(_ eventLoop: EventLoop) {
self.eventLoop = eventLoop
}

let eventLoop: EventLoop
func hash(into hasher: inout Hasher) {
self.eventLoop.id.hash(into: &hasher)
}
}

extension HTTPConnectionPool {
/// A struct to store all queued requests.
struct RequestQueue {
@@ -131,6 +146,42 @@ extension HTTPConnectionPool {
}
return nil
}

/// - Returns: event loops with at least one request with a required event loop
func eventLoopsWithPendingRequests() -> [EventLoop] {
self.eventLoopQueues.compactMap {
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`
/// however, a queue can be empty
$0.value.first?.requiredEventLoop!
}
}

/// - Returns: request count for requests with required event loop, grouped by required event loop without any particular order
func requestCountGroupedByRequiredEventLoop() -> [(EventLoop, Int)] {
self.eventLoopQueues.values.compactMap { requests -> (EventLoop, Int)? in
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`,
/// however, a queue can be empty
guard let requiredEventLoop = requests.first?.requiredEventLoop! else {
return nil
}
return (requiredEventLoop, requests.count)
}
}

/// - Returns: request count with **no** required event loop, grouped by preferred event loop and ordered descending by number of requests
func generalPurposeRequestCountGroupedByPreferredEventLoop() -> [(EventLoop, Int)] {
let requestCountPerEventLoop = Dictionary(
self.generalPurposeQueue.lazy.map { request in
(HashableEventLoop(request.preferredEventLoop), 1)
},
uniquingKeysWith: +
)
return requestCountPerEventLoop.lazy
.map { ($0.key.eventLoop, $0.value) }
.sorted { lhs, rhs in
lhs.1 > rhs.1
}
}
}
}

Original file line number Diff line number Diff line change
@@ -35,6 +35,12 @@ extension HTTPConnectionPool_HTTP1ConnectionsTests {
("testCloseConnectionIfIdleButLeasedRaceCondition", testCloseConnectionIfIdleButLeasedRaceCondition),
("testCloseConnectionIfIdleButClosedRaceCondition", testCloseConnectionIfIdleButClosedRaceCondition),
("testShutdown", testShutdown),
("testMigrationFromHTTP2", testMigrationFromHTTP2),
("testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop),
("testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop),
("testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection", testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection),
("testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections", testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections),
("testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests", testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests),
]
}
}
Original file line number Diff line number Diff line change
@@ -339,4 +339,228 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
connections.removeConnection(at: failIndex)
XCTAssertTrue(connections.isEmpty)
}

func testMigrationFromHTTP2() {
let elg = EmbeddedEventLoopGroup(loops: 4)
let generator = HTTPConnectionPool.Connection.ID.Generator()
var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator)

let el1 = elg.next()
let el2 = elg.next()

let conn1ID = generator.next()
let conn2ID = generator.next()

connections.migrateFromHTTP2(
starting: [(conn1ID, el1)],
backingOff: [(conn2ID, el2)]
)
let newConnections = connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: [],
generalPurposeRequestCountGroupedByPreferredEventLoop: [(el1, 1), (el2, 1)]
)

XCTAssertTrue(newConnections.isEmpty)

let stats = connections.stats
XCTAssertEqual(stats.idle, 0)
XCTAssertEqual(stats.leased, 0)
XCTAssertEqual(stats.connecting, 1)
XCTAssertEqual(stats.backingOff, 1)
}

func testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop() {
let elg = EmbeddedEventLoopGroup(loops: 4)
let generator = HTTPConnectionPool.Connection.ID.Generator()
var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator)

let el1 = elg.next()
let el2 = elg.next()
let el3 = elg.next()

let conn1ID = generator.next()
let conn2ID = generator.next()

connections.migrateFromHTTP2(
starting: [(conn1ID, el1)],
backingOff: [(conn2ID, el2)]
)
let newConnections = connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: [(el3, 1)],
generalPurposeRequestCountGroupedByPreferredEventLoop: []
)
XCTAssertEqual(newConnections.count, 1)
XCTAssertEqual(newConnections.first?.1.id, el3.id)

guard let conn3ID = newConnections.first?.0 else {
return XCTFail("expected to start a new connection")
}

let stats = connections.stats
XCTAssertEqual(stats.idle, 0)
XCTAssertEqual(stats.leased, 0)
XCTAssertEqual(stats.connecting, 2)
XCTAssertEqual(stats.backingOff, 1)

let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn3ID, eventLoop: el3)
let (_, context) = connections.newHTTP1ConnectionEstablished(conn3)
XCTAssertEqual(context.use, .eventLoop(el3))
XCTAssertTrue(context.eventLoop === el3)
}

func testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop() {
let elg = EmbeddedEventLoopGroup(loops: 4)
let generator = HTTPConnectionPool.Connection.ID.Generator()
var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator)

let el1 = elg.next()
let el2 = elg.next()
let el3 = elg.next()

let conn1ID = generator.next()
let conn2ID = generator.next()

connections.migrateFromHTTP2(
starting: [(conn1ID, el1)],
backingOff: [(conn2ID, el2)]
)
let newConnections = connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: [],
generalPurposeRequestCountGroupedByPreferredEventLoop: [(el3, 3)]
)
XCTAssertEqual(newConnections.count, 1)
XCTAssertEqual(newConnections.first?.1.id, el3.id)

guard let conn3ID = newConnections.first?.0 else {
return XCTFail("expected to start a new connection")
}

let stats = connections.stats
XCTAssertEqual(stats.idle, 0)
XCTAssertEqual(stats.leased, 0)
XCTAssertEqual(stats.connecting, 2)
XCTAssertEqual(stats.backingOff, 1)

let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn3ID, eventLoop: el3)
let (_, context) = connections.newHTTP1ConnectionEstablished(conn3)
XCTAssertEqual(context.use, .generalPurpose)
XCTAssertTrue(context.eventLoop === el3)
}

func testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection() {
let elg = EmbeddedEventLoopGroup(loops: 4)
let generator = HTTPConnectionPool.Connection.ID.Generator()
var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator)
let el1 = elg.next()
let el2 = elg.next()
let el3 = elg.next()

let conn1ID = connections.createNewConnection(on: el1)
let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1)
let (index, _) = connections.newHTTP1ConnectionEstablished(conn1)
_ = connections.leaseConnection(at: index)

let conn2ID = generator.next()
let conn3ID = generator.next()

connections.migrateFromHTTP2(
starting: [(conn2ID, el2)],
backingOff: [(conn3ID, el3)]
)
let newConnections = connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: [],
generalPurposeRequestCountGroupedByPreferredEventLoop: [(el3, 3)]
)

XCTAssertEqual(newConnections.count, 1)
XCTAssertEqual(newConnections.first?.1.id, el3.id)

guard let conn4ID = newConnections.first?.0 else {
return XCTFail("expected to start a new connection")
}

let stats = connections.stats
XCTAssertEqual(stats.idle, 0)
XCTAssertEqual(stats.leased, 1)
XCTAssertEqual(stats.connecting, 2)
XCTAssertEqual(stats.backingOff, 1)

let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn4ID, eventLoop: el3)
let (_, context) = connections.newHTTP1ConnectionEstablished(conn3)
XCTAssertEqual(context.use, .generalPurpose)
XCTAssertTrue(context.eventLoop === el3)
}

func testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections() {
let elg = EmbeddedEventLoopGroup(loops: 4)
let generator = HTTPConnectionPool.Connection.ID.Generator()
var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 2, generator: generator)

let el1 = elg.next()
let el2 = elg.next()
let el3 = elg.next()

let conn1ID = generator.next()
let conn2ID = generator.next()
let conn3ID = generator.next()

connections.migrateFromHTTP2(
starting: [(conn1ID, el1), (conn2ID, el2), (conn3ID, el3)],
backingOff: []
)

// first two connections should be added as general purpose connections
let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1)
let (_, context1) = connections.newHTTP1ConnectionEstablished(conn1)
XCTAssertEqual(context1.use, .generalPurpose)
XCTAssertTrue(context1.eventLoop === el1)
let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el2)
let (_, context2) = connections.newHTTP1ConnectionEstablished(conn2)
XCTAssertEqual(context2.use, .generalPurpose)
XCTAssertTrue(context2.eventLoop === el2)

// additional connection should be added as overflow connection
let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn3ID, eventLoop: el3)
let (_, context3) = connections.newHTTP1ConnectionEstablished(conn3)
XCTAssertEqual(context3.use, .eventLoop(el3))
XCTAssertTrue(context3.eventLoop === el3)
}

func testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests() {
let elg = EmbeddedEventLoopGroup(loops: 4)
let generator = HTTPConnectionPool.Connection.ID.Generator()
var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 1, generator: generator)

let el1 = elg.next()
let el2 = elg.next()
let el3 = elg.next()
let el4 = elg.next()

let conn1ID = generator.next()
let conn2ID = generator.next()
let conn3ID = generator.next()

connections.migrateFromHTTP2(
starting: [(conn1ID, el1), (conn2ID, el2), (conn3ID, el3)],
backingOff: []
)

let connectionsToCreate = connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: [(el2, 2), (el3, 1), (el4, 2)],
generalPurposeRequestCountGroupedByPreferredEventLoop: []
)

XCTAssertEqual(
connectionsToCreate.map { $0.1.id },
[el2.id, el4.id, el4.id],
"should create one connection for el2 and two for el4"
)

for (connID, el) in connectionsToCreate {
let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el)
let (_, context) = connections.newHTTP1ConnectionEstablished(conn)
XCTAssertEqual(context.use, .eventLoop(el))
XCTAssertTrue(context.eventLoop === el)
}
}
}
Original file line number Diff line number Diff line change
@@ -40,6 +40,8 @@ extension HTTPConnectionPool_HTTP2ConnectionsTests {
("testNewMaxConcurrentStreamsSetting", testNewMaxConcurrentStreamsSetting),
("testLeaseOnPreferredEventLoopWithoutAnyAvailable", testLeaseOnPreferredEventLoopWithoutAnyAvailable),
("testMigrationFromHTTP1", testMigrationFromHTTP1),
("testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop),
("testMigrationFromHTTP1WithAlreadyEstablishedHTTP2Connection", testMigrationFromHTTP1WithAlreadyEstablishedHTTP2Connection),
]
}
}
Original file line number Diff line number Diff line change
@@ -517,6 +517,10 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
starting: [(conn1ID, el1)],
backingOff: [(conn2ID, el2)]
)
XCTAssertTrue(connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: [el1, el2]
).isEmpty)

XCTAssertEqual(
connections.stats,
.init(
@@ -551,4 +555,68 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
)
)
}

func testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop() {
let elg = EmbeddedEventLoopGroup(loops: 4)
let generator = HTTPConnectionPool.Connection.ID.Generator()
var connections = HTTPConnectionPool.HTTP2Connections(generator: generator)
let el1 = elg.next()
let el2 = elg.next()
let el3 = elg.next()
let conn1ID = generator.next()
let conn2ID = generator.next()

connections.migrateFromHTTP1(
starting: [(conn1ID, el1)],
backingOff: [(conn2ID, el2)]
)
let newConnections = connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: [el1, el2, el3]
)

XCTAssertEqual(newConnections.count, 1)

guard let (conn3ID, eventLoop) = newConnections.first else {
return XCTFail("expected to start a new connection")
}
XCTAssertTrue(eventLoop === el3)

let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn3ID, eventLoop: el3)
let (_, context) = connections.newHTTP2ConnectionEstablished(conn3, maxConcurrentStreams: 100)
XCTAssertEqual(context.availableStreams, 100)
XCTAssertEqual(context.eventLoop.id, el3.id)
XCTAssertEqual(context.isIdle, true)
XCTAssertEqual(context.connectionID, conn3ID)
}

func testMigrationFromHTTP1WithAlreadyEstablishedHTTP2Connection() {
let elg = EmbeddedEventLoopGroup(loops: 4)
let generator = HTTPConnectionPool.Connection.ID.Generator()
var connections = HTTPConnectionPool.HTTP2Connections(generator: generator)
let el1 = elg.next()
let el2 = elg.next()
let el3 = elg.next()

let conn1ID = connections.createNewConnection(on: el1)
let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1)
let (index, _) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100)
_ = connections.leaseStreams(at: index, count: 1)

let conn2ID = generator.next()
let conn3ID = generator.next()

connections.migrateFromHTTP1(
starting: [(conn2ID, el2)],
backingOff: [(conn3ID, el3)]
)

XCTAssertTrue(connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: [el1, el2, el3]
).isEmpty, "we still have an active connection for el1 and should not create a new one")

guard let (leasedConn, _) = connections.leaseStream(onRequired: el1) else {
return XCTFail("could not lease stream on el1")
}
XCTAssertEqual(leasedConn, conn1)
}
}