@@ -278,10 +278,6 @@ extension HTTPConnectionPool {
278
278
self . overflowIndex < self . maximumConcurrentConnections
279
279
}
280
280
281
- private var maximumAdditionalGeneralPurposeConnections : Int {
282
- self . maximumConcurrentConnections - ( self . overflowIndex - 1 )
283
- }
284
-
285
281
var startingGeneralPurposeConnections : Int {
286
282
var connecting = 0
287
283
for connectionState in self . connections [ 0 ..< self . overflowIndex] {
@@ -545,66 +541,84 @@ extension HTTPConnectionPool {
545
541
546
542
/// We only handle starting and backing off connection here.
547
543
/// All already running connections must be handled by the enclosing state machine.
548
- /// We will also create new connections for `requiredEventLoopsOfPendingRequests`
549
- /// if we do not already have a connection that can or will be able to execute requests on the given event loop.
550
- /// In addition, we also create more general purpose connections if we do not have enough to execute
551
- /// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
552
- /// until we reach `maximumConcurrentConnections`.
553
544
/// - Parameters:
554
545
/// - starting: starting HTTP connections from previous state machine
555
546
/// - backingOff: backing off HTTP connections from previous state machine
556
- /// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
557
- /// - preferredEventLoopsOfPendingGeneralPurposeRequests: the preferred event loop of all pending requests without a required event loop
558
- /// - Returns: new connections that need to be created
559
- mutating func migrateFromHTTP2< PreferredEventLoopSequence> (
547
+ mutating func migrateFromHTTP2(
560
548
starting: [ ( Connection . ID , EventLoop ) ] ,
561
- backingOff: [ ( Connection . ID , EventLoop ) ] ,
562
- requiredEventLoopsOfPendingRequests: [ EventLoop ] ,
563
- preferredEventLoopsOfPendingGeneralPurposeRequests: PreferredEventLoopSequence
564
- ) -> [ ( Connection . ID , EventLoop ) ] where PreferredEventLoopSequence: Sequence , PreferredEventLoopSequence. Element == EventLoop {
549
+ backingOff: [ ( Connection . ID , EventLoop ) ]
550
+ ) {
565
551
for (connectionID, eventLoop) in starting {
566
552
let newConnection = HTTP1ConnectionState ( connectionID: connectionID, eventLoop: eventLoop)
567
553
self . connections. insert ( newConnection, at: self . overflowIndex)
568
- self . overflowIndex = self . connections. index ( after: self . overflowIndex)
554
+ /// If we can grow, we mark the connection as a general purpose connection.
555
+ /// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
556
+ if self . canGrow {
557
+ self . overflowIndex = self . connections. index ( after: self . overflowIndex)
558
+ }
569
559
}
570
560
571
561
for (connectionID, eventLoop) in backingOff {
572
562
var backingOffConnection = HTTP1ConnectionState ( connectionID: connectionID, eventLoop: eventLoop)
573
563
// TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
574
564
backingOffConnection. failedToConnect ( )
575
565
self . connections. insert ( backingOffConnection, at: self . overflowIndex)
576
- self . overflowIndex = self . connections. index ( after: self . overflowIndex)
566
+ /// If we can grow, we mark the connection as a general purpose connection.
567
+ /// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
568
+ if self . canGrow {
569
+ self . overflowIndex = self . connections. index ( after: self . overflowIndex)
570
+ }
577
571
}
572
+ }
578
573
579
- // create new connections for requests with a required event loop
580
- let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set (
581
- self . connections. lazy
582
- . filter {
583
- $0. canOrWillBeAbleToExecuteRequests
584
- } . map {
585
- $0. eventLoop. id
586
- }
574
+ /// We will create new connections for each `requiredEventLoopOfPendingRequests`
575
+ /// In addition, we also create more general purpose connections if we do not have enough to execute
576
+ /// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
577
+ /// until we reach `maximumConcurrentConnections
578
+ /// - Parameters:
579
+ /// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
580
+ /// - generalPurposeRequestCountPerPreferredEventLoop: request count with no required event loop, grouped by preferred event loop and ordered descending by number of requests
581
+ /// - Returns: new connections that need to be created
582
+ mutating func createConnectionsAfterMigrationIfNeeded(
583
+ requiredEventLoopOfPendingRequests: [ ( EventLoop , Int ) ] ,
584
+ generalPurposeRequestCountGroupedByPreferredEventLoop: [ ( EventLoop , Int ) ]
585
+ ) -> [ ( Connection . ID , EventLoop ) ] {
586
+ /// create new connections for requests with a required event loop
587
+
588
+ /// we may already start connections for those requests and do not want to start to many
589
+ let startingRequiredEventLoopConnectionCount = Dictionary (
590
+ self . connections [ self . overflowIndex..< self . connections. endIndex] . lazy. map {
591
+ ( $0. eventLoop. id, 1 )
592
+ } ,
593
+ uniquingKeysWith: +
587
594
)
588
- var createConnections = requiredEventLoopsOfPendingRequests. compactMap { eventLoop -> ( Connection . ID , EventLoop ) ? in
589
- guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests. contains ( eventLoop. id)
590
- else { return nil }
591
- let connectionID = self . createNewOverflowConnection ( on: eventLoop)
592
- return ( connectionID, eventLoop)
593
- }
594
-
595
- // create new connections for requests without a required event loop
596
- createConnections. append (
597
- contentsOf: preferredEventLoopsOfPendingGeneralPurposeRequests. lazy
598
- /// we do not want to start additional connection for requests for which we already have starting or backing off connections
599
- . dropFirst ( self . startingGeneralPurposeConnections)
600
- /// if we have additional requests, we will start more connections while respecting the user defined `maximumConcurrentConnections`
601
- . prefix ( self . maximumAdditionalGeneralPurposeConnections)
602
- . map { eventLoop in
603
- ( self . createNewConnection ( on: eventLoop) , eventLoop)
595
+
596
+ var connectionToCreate = requiredEventLoopOfPendingRequests
597
+ . flatMap { ( eventLoop, requestCount) -> [ ( Connection . ID , EventLoop ) ] in
598
+ let connectionsToStart = max ( requestCount - startingRequiredEventLoopConnectionCount[ eventLoop. id, default: 0 ] , 0 )
599
+ return ( 0 ..< connectionsToStart) . lazy. map { _ in
600
+ ( self . createNewOverflowConnection ( on: eventLoop) , eventLoop)
604
601
}
605
- )
602
+ }
603
+
604
+ /// create new connections for requests without a required event loop
605
+
606
+ /// we may already start connections for those requests and do not want to start to many
607
+ /// this integer keeps track of the number of connections which do not yet have any requests assigned
608
+ var unusedGeneralPurposeConnections = self . startingGeneralPurposeConnections
609
+
610
+ outerLoop: for (eventLoop, requestCount) in generalPurposeRequestCountGroupedByPreferredEventLoop {
611
+ let connectionsToStart = max ( requestCount - unusedGeneralPurposeConnections, 0 )
612
+ unusedGeneralPurposeConnections -= min ( requestCount, unusedGeneralPurposeConnections)
613
+ for _ in 0 ..< connectionsToStart {
614
+ guard self . canGrow else {
615
+ break outerLoop
616
+ }
617
+ connectionToCreate. append ( ( self . createNewConnection ( on: eventLoop) , eventLoop) )
618
+ }
619
+ }
606
620
607
- return createConnections
621
+ return connectionToCreate
608
622
}
609
623
610
624
// MARK: Shutdown
0 commit comments