Skip to content

Commit b39c948

Browse files
committed
fix review comments
- new connections that are created after the http2 -> http1 migration should now more closely mimic the behaviour of the http1 state machine - API bounderies are now clearer
1 parent e5d84c1 commit b39c948

8 files changed

+259
-86
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift

+59-45
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,6 @@ extension HTTPConnectionPool {
278278
self.overflowIndex < self.maximumConcurrentConnections
279279
}
280280

281-
private var maximumAdditionalGeneralPurposeConnections: Int {
282-
self.maximumConcurrentConnections - (self.overflowIndex - 1)
283-
}
284-
285281
var startingGeneralPurposeConnections: Int {
286282
var connecting = 0
287283
for connectionState in self.connections[0..<self.overflowIndex] {
@@ -545,66 +541,84 @@ extension HTTPConnectionPool {
545541

546542
/// We only handle starting and backing off connection here.
547543
/// All already running connections must be handled by the enclosing state machine.
548-
/// We will also create new connections for `requiredEventLoopsOfPendingRequests`
549-
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
550-
/// In addition, we also create more general purpose connections if we do not have enough to execute
551-
/// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
552-
/// until we reach `maximumConcurrentConnections`.
553544
/// - Parameters:
554545
/// - starting: starting HTTP connections from previous state machine
555546
/// - backingOff: backing off HTTP connections from previous state machine
556-
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
557-
/// - preferredEventLoopsOfPendingGeneralPurposeRequests: the preferred event loop of all pending requests without a required event loop
558-
/// - Returns: new connections that need to be created
559-
mutating func migrateFromHTTP2<PreferredEventLoopSequence>(
547+
mutating func migrateFromHTTP2(
560548
starting: [(Connection.ID, EventLoop)],
561-
backingOff: [(Connection.ID, EventLoop)],
562-
requiredEventLoopsOfPendingRequests: [EventLoop],
563-
preferredEventLoopsOfPendingGeneralPurposeRequests: PreferredEventLoopSequence
564-
) -> [(Connection.ID, EventLoop)] where PreferredEventLoopSequence: Sequence, PreferredEventLoopSequence.Element == EventLoop {
549+
backingOff: [(Connection.ID, EventLoop)]
550+
) {
565551
for (connectionID, eventLoop) in starting {
566552
let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
567553
self.connections.insert(newConnection, at: self.overflowIndex)
568-
self.overflowIndex = self.connections.index(after: self.overflowIndex)
554+
/// If we can grow, we mark the connection as a general purpose connection.
555+
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
556+
if self.canGrow {
557+
self.overflowIndex = self.connections.index(after: self.overflowIndex)
558+
}
569559
}
570560

571561
for (connectionID, eventLoop) in backingOff {
572562
var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
573563
// TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
574564
backingOffConnection.failedToConnect()
575565
self.connections.insert(backingOffConnection, at: self.overflowIndex)
576-
self.overflowIndex = self.connections.index(after: self.overflowIndex)
566+
/// If we can grow, we mark the connection as a general purpose connection.
567+
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
568+
if self.canGrow {
569+
self.overflowIndex = self.connections.index(after: self.overflowIndex)
570+
}
577571
}
572+
}
578573

579-
// create new connections for requests with a required event loop
580-
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
581-
self.connections.lazy
582-
.filter {
583-
$0.canOrWillBeAbleToExecuteRequests
584-
}.map {
585-
$0.eventLoop.id
586-
}
574+
/// We will create new connections for each `requiredEventLoopOfPendingRequests`
575+
/// In addition, we also create more general purpose connections if we do not have enough to execute
576+
/// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
577+
/// until we reach `maximumConcurrentConnections
578+
/// - Parameters:
579+
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
580+
/// - generalPurposeRequestCountPerPreferredEventLoop: request count with no required event loop, grouped by preferred event loop and ordered descending by number of requests
581+
/// - Returns: new connections that need to be created
582+
mutating func createConnectionsAfterMigrationIfNeeded(
583+
requiredEventLoopOfPendingRequests: [(EventLoop, Int)],
584+
generalPurposeRequestCountGroupedByPreferredEventLoop: [(EventLoop, Int)]
585+
) -> [(Connection.ID, EventLoop)] {
586+
/// create new connections for requests with a required event loop
587+
588+
/// we may already start connections for those requests and do not want to start to many
589+
let startingRequiredEventLoopConnectionCount = Dictionary(
590+
self.connections[self.overflowIndex..<self.connections.endIndex].lazy.map {
591+
($0.eventLoop.id, 1)
592+
},
593+
uniquingKeysWith: +
587594
)
588-
var createConnections = requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in
589-
guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id)
590-
else { return nil }
591-
let connectionID = self.createNewOverflowConnection(on: eventLoop)
592-
return (connectionID, eventLoop)
593-
}
594-
595-
// create new connections for requests without a required event loop
596-
createConnections.append(
597-
contentsOf: preferredEventLoopsOfPendingGeneralPurposeRequests.lazy
598-
/// we do not want to start additional connection for requests for which we already have starting or backing off connections
599-
.dropFirst(self.startingGeneralPurposeConnections)
600-
/// if we have additional requests, we will start more connections while respecting the user defined `maximumConcurrentConnections`
601-
.prefix(self.maximumAdditionalGeneralPurposeConnections)
602-
.map { eventLoop in
603-
(self.createNewConnection(on: eventLoop), eventLoop)
595+
596+
var connectionToCreate = requiredEventLoopOfPendingRequests
597+
.flatMap { (eventLoop, requestCount) -> [(Connection.ID, EventLoop)] in
598+
let connectionsToStart = max(requestCount - startingRequiredEventLoopConnectionCount[eventLoop.id, default: 0], 0)
599+
return (0..<connectionsToStart).lazy.map { _ in
600+
(self.createNewOverflowConnection(on: eventLoop), eventLoop)
604601
}
605-
)
602+
}
603+
604+
/// create new connections for requests without a required event loop
605+
606+
/// we may already start connections for those requests and do not want to start to many
607+
/// this integer keeps track of the number of connections which do not yet have any requests assigned
608+
var unusedGeneralPurposeConnections = self.startingGeneralPurposeConnections
609+
610+
outerLoop: for (eventLoop, requestCount) in generalPurposeRequestCountGroupedByPreferredEventLoop {
611+
let connectionsToStart = max(requestCount - unusedGeneralPurposeConnections, 0)
612+
unusedGeneralPurposeConnections -= min(requestCount, unusedGeneralPurposeConnections)
613+
for _ in 0..<connectionsToStart {
614+
guard self.canGrow else {
615+
break outerLoop
616+
}
617+
connectionToCreate.append((self.createNewConnection(on: eventLoop), eventLoop))
618+
}
619+
}
606620

607-
return createConnections
621+
return connectionToCreate
608622
}
609623

610624
// MARK: Shutdown

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift

+8-6
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,15 @@ extension HTTPConnectionPool {
9696

9797
var http2Connections = http2Connections
9898
let migration = http2Connections.migrateToHTTP1()
99-
let createConnections = self.connections.migrateFromHTTP2(
99+
100+
self.connections.migrateFromHTTP2(
100101
starting: migration.starting,
101-
backingOff: migration.backingOff,
102-
requiredEventLoopsOfPendingRequests: requests.eventLoopsWithPendingRequests(),
103-
preferredEventLoopsOfPendingGeneralPurposeRequests: requests.generalPurposeQueue.lazy.map {
104-
$0.preferredEventLoop
105-
}
102+
backingOff: migration.backingOff
103+
)
104+
105+
let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded(
106+
requiredEventLoopOfPendingRequests: requests.requestCountGroupedByRequiredEventLoop(),
107+
generalPurposeRequestCountGroupedByPreferredEventLoop: requests.generalPurposeRequestCountGroupedByPreferredEventLoop()
106108
)
107109

108110
if !http2Connections.isEmpty {

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift

+11-7
Original file line numberDiff line numberDiff line change
@@ -348,18 +348,13 @@ extension HTTPConnectionPool {
348348

349349
/// We only handle starting and backing off connection here.
350350
/// All already running connections must be handled by the enclosing state machine.
351-
/// We will also create new connections for `requiredEventLoopsOfPendingRequests`
352-
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
353351
/// - Parameters:
354352
/// - starting: starting HTTP connections from previous state machine
355353
/// - backingOff: backing off HTTP connections from previous state machine
356-
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
357-
/// - Returns: new connections that need to be created
358354
mutating func migrateFromHTTP1(
359355
starting: [(Connection.ID, EventLoop)],
360-
backingOff: [(Connection.ID, EventLoop)],
361-
requiredEventLoopsOfPendingRequests: [EventLoop]
362-
) -> [(Connection.ID, EventLoop)] {
356+
backingOff: [(Connection.ID, EventLoop)]
357+
) {
363358
for (connectionID, eventLoop) in starting {
364359
let newConnection = HTTP2ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
365360
self.connections.append(newConnection)
@@ -371,7 +366,16 @@ extension HTTPConnectionPool {
371366
backingOffConnection.failedToConnect()
372367
self.connections.append(backingOffConnection)
373368
}
369+
}
374370

371+
/// We will create new connections for `requiredEventLoopsOfPendingRequests`
372+
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
373+
/// - Parameters:
374+
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
375+
/// - Returns: new connections that need to be created
376+
mutating func createConnectionsAfterMigrationIfNeeded(
377+
requiredEventLoopsOfPendingRequests: [EventLoop]
378+
) -> [(Connection.ID, EventLoop)] {
375379
// create new connections for requests with a required event loop
376380
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
377381
self.connections.lazy

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift

+5-2
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,12 @@ extension HTTPConnectionPool {
104104

105105
var http1Connections = http1Connections // make http1Connections mutable
106106
let context = http1Connections.migrateToHTTP2()
107-
let createConnections = self.connections.migrateFromHTTP1(
107+
self.connections.migrateFromHTTP1(
108108
starting: context.starting,
109-
backingOff: context.backingOff,
109+
backingOff: context.backingOff
110+
)
111+
112+
let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded(
110113
requiredEventLoopsOfPendingRequests: requests.eventLoopsWithPendingRequests()
111114
)
112115

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift

+49-2
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,25 @@
1414

1515
import NIOCore
1616

17+
private struct HashableEventLoop: Hashable {
18+
static func == (lhs: HashableEventLoop, rhs: HashableEventLoop) -> Bool {
19+
lhs.eventLoop === rhs.eventLoop
20+
}
21+
22+
init(_ eventLoop: EventLoop) {
23+
self.eventLoop = eventLoop
24+
}
25+
26+
let eventLoop: EventLoop
27+
func hash(into hasher: inout Hasher) {
28+
self.eventLoop.id.hash(into: &hasher)
29+
}
30+
}
31+
1732
extension HTTPConnectionPool {
1833
/// A struct to store all queued requests.
1934
struct RequestQueue {
20-
private(set) var generalPurposeQueue: CircularBuffer<Request>
35+
private var generalPurposeQueue: CircularBuffer<Request>
2136
private var eventLoopQueues: [EventLoopID: CircularBuffer<Request>]
2237

2338
init() {
@@ -132,8 +147,40 @@ extension HTTPConnectionPool {
132147
return nil
133148
}
134149

150+
/// - Returns: event loops with at least one request with a required event loop
135151
func eventLoopsWithPendingRequests() -> [EventLoop] {
136-
self.eventLoopQueues.compactMap { $0.value.first?.requiredEventLoop }
152+
self.eventLoopQueues.compactMap {
153+
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`
154+
/// however, a queue can be empty
155+
$0.value.first?.requiredEventLoop!
156+
}
157+
}
158+
159+
/// - Returns: request count for requests with required event loop, grouped by required event loop without any particular order
160+
func requestCountGroupedByRequiredEventLoop() -> [(EventLoop, Int)] {
161+
self.eventLoopQueues.values.compactMap { requests -> (EventLoop, Int)? in
162+
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`,
163+
/// however, a queue can be empty
164+
guard let requiredEventLoop = requests.first?.requiredEventLoop! else {
165+
return nil
166+
}
167+
return (requiredEventLoop, requests.count)
168+
}
169+
}
170+
171+
/// - Returns: request count with **no** required event loop, grouped by preferred event loop and ordered descending by number of requests
172+
func generalPurposeRequestCountGroupedByPreferredEventLoop() -> [(EventLoop, Int)] {
173+
let requestCountPerEventLoop = Dictionary(
174+
self.generalPurposeQueue.lazy.map { request in
175+
(HashableEventLoop(request.preferredEventLoop), 1)
176+
},
177+
uniquingKeysWith: +
178+
)
179+
return requestCountPerEventLoop.lazy
180+
.map { ($0.key.eventLoop, $0.value) }
181+
.sorted { lhs, rhs in
182+
lhs.1 > rhs.1
183+
}
137184
}
138185
}
139186
}

Diff for: Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest+XCTest.swift

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ extension HTTPConnectionPool_HTTP1ConnectionsTests {
3939
("testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop),
4040
("testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop),
4141
("testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection", testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection),
42+
("testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections", testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections),
43+
("testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests", testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests),
4244
]
4345
}
4446
}

0 commit comments

Comments
 (0)