Skip to content

Commit 195f852

Browse files
committed
start new connections if needed during migration
1 parent 1361ecc commit 195f852

9 files changed

+313
-30
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift

+62-7
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ extension HTTPConnectionPool {
6969
}
7070
}
7171

72+
var canOrWillBeAbleToExecuteRequests: Bool {
73+
switch self.state {
74+
case .leased, .backingOff, .idle, .starting:
75+
return true
76+
case .closed:
77+
return false
78+
}
79+
}
80+
7281
var isLeased: Bool {
7382
switch self.state {
7483
case .leased:
@@ -269,6 +278,10 @@ extension HTTPConnectionPool {
269278
self.overflowIndex < self.maximumConcurrentConnections
270279
}
271280

281+
private var maximumAdditionalGeneralPurposeConnections: Int {
282+
self.maximumConcurrentConnections - (self.overflowIndex - 1)
283+
}
284+
272285
var startingGeneralPurposeConnections: Int {
273286
var connecting = 0
274287
for connectionState in self.connections[0..<self.overflowIndex] {
@@ -530,26 +543,68 @@ extension HTTPConnectionPool {
530543
return migrationContext
531544
}
532545

533-
/// we only handle starting and backing off connection here.
534-
/// All running connections must be handled by the enclosing state machine
546+
/// We only handle starting and backing off connection here.
547+
/// All already running connections must be handled by the enclosing state machine.
548+
/// We will also create new connections for `requiredEventLoopsOfPendingRequests`
549+
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
550+
/// In addition, we also create more general purpose connections if we do not have enough to execute
551+
/// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
552+
/// until we reach `maximumConcurrentConnections`.
535553
/// - Parameters:
536554
/// - starting: starting HTTP connections from previous state machine
537555
/// - backingOff: backing off HTTP connections from previous state machine
538-
mutating func migrateFromHTTP2(
556+
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
557+
/// - preferredEventLoopsOfPendingGeneralPurposeRequests: the preferred event loop of all pending requests without a required event loop
558+
/// - Returns: new connections that need to be created
559+
mutating func migrateFromHTTP2<PreferredEventLoopSequence>(
539560
starting: [(Connection.ID, EventLoop)],
540-
backingOff: [(Connection.ID, EventLoop)]
541-
) {
561+
backingOff: [(Connection.ID, EventLoop)],
562+
requiredEventLoopsOfPendingRequests: [EventLoop],
563+
preferredEventLoopsOfPendingGeneralPurposeRequests: PreferredEventLoopSequence
564+
) -> [(Connection.ID, EventLoop)] where PreferredEventLoopSequence: Sequence, PreferredEventLoopSequence.Element == EventLoop {
542565
for (connectionID, eventLoop) in starting {
543566
let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
544-
self.connections.append(newConnection)
567+
self.connections.insert(newConnection, at: self.overflowIndex)
568+
self.overflowIndex = self.connections.index(after: self.overflowIndex)
545569
}
546570

547571
for (connectionID, eventLoop) in backingOff {
548572
var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
549573
// TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
550574
backingOffConnection.failedToConnect()
551-
self.connections.append(backingOffConnection)
575+
self.connections.insert(backingOffConnection, at: self.overflowIndex)
576+
self.overflowIndex = self.connections.index(after: self.overflowIndex)
552577
}
578+
579+
// create new connections for requests with a required event loop
580+
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
581+
self.connections.lazy
582+
.filter {
583+
$0.canOrWillBeAbleToExecuteRequests
584+
}.map {
585+
$0.eventLoop.id
586+
}
587+
)
588+
var createConnections = requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in
589+
guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id)
590+
else { return nil }
591+
let connectionID = self.createNewOverflowConnection(on: eventLoop)
592+
return (connectionID, eventLoop)
593+
}
594+
595+
// create new connections for requests without a required event loop
596+
createConnections.append(
597+
contentsOf: preferredEventLoopsOfPendingGeneralPurposeRequests.lazy
598+
/// we do not want to start additional connection for requests for which we already have starting or backing off connections
599+
.dropFirst(self.startingGeneralPurposeConnections)
600+
/// if we have additional requests, we will start more connections while respecting the user defined `maximumConcurrentConnections`
601+
.prefix(self.maximumAdditionalGeneralPurposeConnections)
602+
.map { eventLoop in
603+
(self.createNewConnection(on: eventLoop), eventLoop)
604+
}
605+
)
606+
607+
return createConnections
553608
}
554609

555610
// MARK: Shutdown

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift

+12-7
Original file line numberDiff line numberDiff line change
@@ -88,28 +88,33 @@ extension HTTPConnectionPool {
8888
precondition(self.http2Connections == nil)
8989
precondition(self.requests.isEmpty)
9090

91+
self.requests = requests
92+
9193
if let http1Connections = http1Connections {
9294
self.connections = http1Connections
9395
}
9496

9597
var http2Connections = http2Connections
9698
let migration = http2Connections.migrateToHTTP1()
97-
self.connections.migrateFromHTTP2(
99+
let createConnections = self.connections.migrateFromHTTP2(
98100
starting: migration.starting,
99-
backingOff: migration.backingOff
101+
backingOff: migration.backingOff,
102+
requiredEventLoopsOfPendingRequests: requests.eventLoopsWithPendingRequests(),
103+
preferredEventLoopsOfPendingGeneralPurposeRequests: requests.generalPurposeQueue.lazy.map {
104+
$0.preferredEventLoop
105+
}
100106
)
101107

102108
if !http2Connections.isEmpty {
103109
self.http2Connections = http2Connections
104110
}
105111

106-
// TODO: Close all idle connections from context.close
107-
// TODO: Start new http1 connections for pending requests
108112
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)
109113

110-
self.requests = requests
111-
112-
return .init(closeConnections: [], createConnections: [])
114+
return .init(
115+
closeConnections: migration.close,
116+
createConnections: createConnections
117+
)
113118
}
114119

115120
// MARK: - Events

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift

+25-4
Original file line numberDiff line numberDiff line change
@@ -346,15 +346,20 @@ extension HTTPConnectionPool {
346346

347347
// MARK: Migration
348348

349-
/// we only handle starting and backing off connection here.
350-
/// All running connections must be handled by the enclosing state machine
349+
/// We only handle starting and backing off connection here.
350+
/// All already running connections must be handled by the enclosing state machine.
351+
/// We will also create new connections for `requiredEventLoopsOfPendingRequests`
352+
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
351353
/// - Parameters:
352354
/// - starting: starting HTTP connections from previous state machine
353355
/// - backingOff: backing off HTTP connections from previous state machine
356+
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
357+
/// - Returns: new connections that need to be created
354358
mutating func migrateFromHTTP1(
355359
starting: [(Connection.ID, EventLoop)],
356-
backingOff: [(Connection.ID, EventLoop)]
357-
) {
360+
backingOff: [(Connection.ID, EventLoop)],
361+
requiredEventLoopsOfPendingRequests: [EventLoop]
362+
) -> [(Connection.ID, EventLoop)] {
358363
for (connectionID, eventLoop) in starting {
359364
let newConnection = HTTP2ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
360365
self.connections.append(newConnection)
@@ -366,6 +371,22 @@ extension HTTPConnectionPool {
366371
backingOffConnection.failedToConnect()
367372
self.connections.append(backingOffConnection)
368373
}
374+
375+
// create new connections for requests with a required event loop
376+
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
377+
self.connections.lazy
378+
.filter {
379+
$0.canOrWillBeAbleToExecuteRequests
380+
}.map {
381+
$0.eventLoop.id
382+
}
383+
)
384+
return requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in
385+
guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id)
386+
else { return nil }
387+
let connectionID = self.createNewConnection(on: eventLoop)
388+
return (connectionID, eventLoop)
389+
}
369390
}
370391

371392
struct HTTP2ToHTTP1MigrationContext {

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift

+9-8
Original file line numberDiff line numberDiff line change
@@ -96,28 +96,29 @@ extension HTTPConnectionPool {
9696
precondition(self.connections.isEmpty)
9797
precondition(self.requests.isEmpty)
9898

99+
self.requests = requests
100+
99101
if let http2Connections = http2Connections {
100102
self.connections = http2Connections
101103
}
102104

103105
var http1Connections = http1Connections // make http1Connections mutable
104106
let context = http1Connections.migrateToHTTP2()
105-
self.connections.migrateFromHTTP1(
107+
let createConnections = self.connections.migrateFromHTTP1(
106108
starting: context.starting,
107-
backingOff: context.backingOff
109+
backingOff: context.backingOff,
110+
requiredEventLoopsOfPendingRequests: requests.eventLoopsWithPendingRequests()
108111
)
109112

110113
if !http1Connections.isEmpty {
111114
self.http1Connections = http1Connections
112115
}
113116

114-
self.requests = requests
115-
116-
// TODO: Close all idle connections from context.close
117-
// TODO: Start new http2 connections for pending requests
118117
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)
119-
120-
return .init(closeConnections: [], createConnections: [])
118+
return .init(
119+
closeConnections: context.close,
120+
createConnections: createConnections
121+
)
121122
}
122123

123124
mutating func executeRequest(_ request: Request) -> Action {

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift

+5-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import NIOCore
1717
extension HTTPConnectionPool {
1818
/// A struct to store all queued requests.
1919
struct RequestQueue {
20-
private var generalPurposeQueue: CircularBuffer<Request>
20+
private(set) var generalPurposeQueue: CircularBuffer<Request>
2121
private var eventLoopQueues: [EventLoopID: CircularBuffer<Request>]
2222

2323
init() {
@@ -131,6 +131,10 @@ extension HTTPConnectionPool {
131131
}
132132
return nil
133133
}
134+
135+
func eventLoopsWithPendingRequests() -> [EventLoop] {
136+
self.eventLoopQueues.compactMap { $0.value.first?.requiredEventLoop }
137+
}
134138
}
135139
}
136140

Diff for: Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest+XCTest.swift

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ extension HTTPConnectionPool_HTTP1ConnectionsTests {
3535
("testCloseConnectionIfIdleButLeasedRaceCondition", testCloseConnectionIfIdleButLeasedRaceCondition),
3636
("testCloseConnectionIfIdleButClosedRaceCondition", testCloseConnectionIfIdleButClosedRaceCondition),
3737
("testShutdown", testShutdown),
38+
("testMigrationFromHTTP2", testMigrationFromHTTP2),
39+
("testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop),
40+
("testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop),
41+
("testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection", testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection),
3842
]
3943
}
4044
}

Diff for: Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift

+131
Original file line numberDiff line numberDiff line change
@@ -339,4 +339,135 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
339339
connections.removeConnection(at: failIndex)
340340
XCTAssertTrue(connections.isEmpty)
341341
}
342+
343+
func testMigrationFromHTTP2() {
344+
let elg = EmbeddedEventLoopGroup(loops: 4)
345+
let generator = HTTPConnectionPool.Connection.ID.Generator()
346+
var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator)
347+
348+
let el1 = elg.next()
349+
let el2 = elg.next()
350+
351+
let conn1ID = generator.next()
352+
let conn2ID = generator.next()
353+
354+
let newConnections = connections.migrateFromHTTP2(
355+
starting: [(conn1ID, el1)],
356+
backingOff: [(conn2ID, el2)],
357+
requiredEventLoopsOfPendingRequests: [el1, el2],
358+
preferredEventLoopsOfPendingGeneralPurposeRequests: []
359+
)
360+
XCTAssertTrue(newConnections.isEmpty)
361+
362+
let stats = connections.stats
363+
XCTAssertEqual(stats.idle, 0)
364+
XCTAssertEqual(stats.leased, 0)
365+
XCTAssertEqual(stats.connecting, 1)
366+
XCTAssertEqual(stats.backingOff, 1)
367+
}
368+
369+
func testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop() {
370+
let elg = EmbeddedEventLoopGroup(loops: 4)
371+
let generator = HTTPConnectionPool.Connection.ID.Generator()
372+
var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator)
373+
374+
let el1 = elg.next()
375+
let el2 = elg.next()
376+
let el3 = elg.next()
377+
378+
let conn1ID = generator.next()
379+
let conn2ID = generator.next()
380+
381+
let newConnections = connections.migrateFromHTTP2(
382+
starting: [(conn1ID, el1)],
383+
backingOff: [(conn2ID, el2)],
384+
requiredEventLoopsOfPendingRequests: [el1, el2, el3],
385+
preferredEventLoopsOfPendingGeneralPurposeRequests: []
386+
)
387+
XCTAssertEqual(newConnections.count, 1)
388+
XCTAssertEqual(newConnections.first?.1.id, el3.id)
389+
390+
guard let conn3ID = newConnections.first?.0 else {
391+
return XCTFail("expected to start a new connection")
392+
}
393+
394+
let stats = connections.stats
395+
XCTAssertEqual(stats.idle, 0)
396+
XCTAssertEqual(stats.leased, 0)
397+
XCTAssertEqual(stats.connecting, 2)
398+
XCTAssertEqual(stats.backingOff, 1)
399+
400+
let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn3ID, eventLoop: el3)
401+
let (_, context) = connections.newHTTP1ConnectionEstablished(conn3)
402+
XCTAssertEqual(context.use, .eventLoop(el3))
403+
XCTAssertTrue(context.eventLoop === el3)
404+
}
405+
406+
func testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop() {
407+
let elg = EmbeddedEventLoopGroup(loops: 4)
408+
let generator = HTTPConnectionPool.Connection.ID.Generator()
409+
var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator)
410+
411+
let el1 = elg.next()
412+
let el2 = elg.next()
413+
let el3 = elg.next()
414+
415+
let conn1ID = generator.next()
416+
let conn2ID = generator.next()
417+
418+
let newConnections = connections.migrateFromHTTP2(
419+
starting: [(conn1ID, el1)],
420+
backingOff: [(conn2ID, el2)],
421+
requiredEventLoopsOfPendingRequests: [el1, el2],
422+
preferredEventLoopsOfPendingGeneralPurposeRequests: [el3, el3, el3]
423+
)
424+
XCTAssertEqual(newConnections.count, 1)
425+
XCTAssertEqual(newConnections.first?.1.id, el3.id)
426+
427+
guard let conn3ID = newConnections.first?.0 else {
428+
return XCTFail("expected to start a new connection")
429+
}
430+
431+
let stats = connections.stats
432+
XCTAssertEqual(stats.idle, 0)
433+
XCTAssertEqual(stats.leased, 0)
434+
XCTAssertEqual(stats.connecting, 2)
435+
XCTAssertEqual(stats.backingOff, 1)
436+
437+
let conn3: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn3ID, eventLoop: el3)
438+
let (_, context) = connections.newHTTP1ConnectionEstablished(conn3)
439+
XCTAssertEqual(context.use, .generalPurpose)
440+
XCTAssertTrue(context.eventLoop === el3)
441+
}
442+
443+
func testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection() {
444+
let elg = EmbeddedEventLoopGroup(loops: 4)
445+
let generator = HTTPConnectionPool.Connection.ID.Generator()
446+
var connections = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: generator)
447+
let el1 = elg.next()
448+
let el2 = elg.next()
449+
let el3 = elg.next()
450+
451+
let conn1ID = connections.createNewConnection(on: el1)
452+
let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1)
453+
let (index, _) = connections.newHTTP1ConnectionEstablished(conn1)
454+
_ = connections.leaseConnection(at: index)
455+
456+
let conn2ID = generator.next()
457+
let conn3ID = generator.next()
458+
459+
let newConnections = connections.migrateFromHTTP2(
460+
starting: [(conn2ID, el2)],
461+
backingOff: [(conn3ID, el3)],
462+
requiredEventLoopsOfPendingRequests: [el1, el2, el3],
463+
preferredEventLoopsOfPendingGeneralPurposeRequests: [el1]
464+
)
465+
XCTAssertTrue(newConnections.isEmpty, "we already have a leased connection on el1")
466+
467+
let stats = connections.stats
468+
XCTAssertEqual(stats.idle, 0)
469+
XCTAssertEqual(stats.leased, 1)
470+
XCTAssertEqual(stats.connecting, 1)
471+
XCTAssertEqual(stats.backingOff, 1)
472+
}
342473
}

0 commit comments

Comments
 (0)