Skip to content

[HTTP2] Create new connections during migration if needed #459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ extension HTTPConnectionPool {
}
}

var canOrWillBeAbleToExecuteRequests: Bool {
switch self.state {
case .leased, .backingOff, .idle, .starting:
return true
case .closed:
return false
}
}

var isLeased: Bool {
switch self.state {
case .leased:
Expand Down Expand Up @@ -530,8 +539,8 @@ extension HTTPConnectionPool {
return migrationContext
}

/// we only handle starting and backing off connection here.
/// All running connections must be handled by the enclosing state machine
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
Expand All @@ -541,15 +550,82 @@ extension HTTPConnectionPool {
) {
for (connectionID, eventLoop) in starting {
let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
self.connections.append(newConnection)
self.connections.insert(newConnection, at: self.overflowIndex)
/// If we can grow, we mark the connection as a general purpose connection.
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
if self.canGrow {
self.overflowIndex = self.connections.index(after: self.overflowIndex)
}
}

for (connectionID, eventLoop) in backingOff {
var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
// TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
backingOffConnection.failedToConnect()
self.connections.append(backingOffConnection)
self.connections.insert(backingOffConnection, at: self.overflowIndex)
/// If we can grow, we mark the connection as a general purpose connection.
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
if self.canGrow {
self.overflowIndex = self.connections.index(after: self.overflowIndex)
}
}
}

/// We will create new connections for each `requiredEventLoopOfPendingRequests`
/// In addition, we also create more general purpose connections if we do not have enough to execute
/// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
/// until we reach `maximumConcurrentConnections`
/// - Parameters:
/// - requiredEventLoopsForPendingRequests:
/// event loops for which we have requests with a required event loop.
/// Duplicates are not allowed.
/// - generalPurposeRequestCountPerPreferredEventLoop:
/// request count with no required event loop,
/// grouped by preferred event loop and ordered descending by number of requests
/// - Returns: new connections that need to be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: [(EventLoop, Int)],
generalPurposeRequestCountGroupedByPreferredEventLoop: [(EventLoop, Int)]
) -> [(Connection.ID, EventLoop)] {
/// create new connections for requests with a required event loop

/// we may already start connections for those requests and do not want to start to many
let startingRequiredEventLoopConnectionCount = Dictionary(
self.connections[self.overflowIndex..<self.connections.endIndex].lazy.map {
($0.eventLoop.id, 1)
},
uniquingKeysWith: +
)
var connectionToCreate = requiredEventLoopOfPendingRequests
.flatMap { (eventLoop, requestCount) -> [(Connection.ID, EventLoop)] in
/// We need a connection for each queued request with a required event loop.
/// Therefore, we look how many request we have queued for a given `eventLoop` and
/// how many connections we are already starting on the given `eventLoop`.
/// If we have not enough, we will create additional connections to have at least on connection per request.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the /// intended?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added three slashes in a couple of places. Do we have any style guide when to use three and when to use two? I slightly tend to like three slashes more because the non-monospaced font is a bit easier to read for me but I do not have a strong opinion on that topic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we use three in documentation comments (like above a method) and two inline. At least that's what I've seen (and done) so far.

let connectionsToStart = requestCount - startingRequiredEventLoopConnectionCount[eventLoop.id, default: 0]
return stride(from: 0, to: connectionsToStart, by: 1).lazy.map { _ in
(self.createNewOverflowConnection(on: eventLoop), eventLoop)
}
}

/// create new connections for requests without a required event loop

/// we may already start connections for those requests and do not want to start to many
/// this integer keeps track of the number of connections which do not yet have any requests assigned
var unusedGeneralPurposeConnections = self.startingGeneralPurposeConnections

outerLoop: for (eventLoop, requestCount) in generalPurposeRequestCountGroupedByPreferredEventLoop {
let connectionsToStart = max(requestCount - unusedGeneralPurposeConnections, 0)
unusedGeneralPurposeConnections -= min(requestCount, unusedGeneralPurposeConnections)
for _ in 0..<connectionsToStart {
guard self.canGrow else {
break outerLoop
}
connectionToCreate.append((self.createNewConnection(on: eventLoop), eventLoop))
}
}
Copy link
Member

@fabianfett fabianfett Oct 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct. However I think, we could make this code a little easier to read. Wdyt about creating a variable:

var neededGeneralPurposeConnections = max(requestCount - unusedGeneralPurposeConnections, 0)

and then decreasing the variable for each created connection? Break the loop as soon as the variable has reached 0.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I fully understand what you mean. It seams you want to replace the for loop in line 620 with a while loop and exit it if we hit zero, which is exactly the same behaviour as the for loop. I would argue that the intent of the for loop is clearer (and safer) compared to a while true loop with a break on zero.

Or do you first want to sum up the actually number of neededGeneralPurposeConnections regardless of event loop. Then we could replace the two for loops with only one while/for loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The later. :)


return connectionToCreate
}

// MARK: Shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,35 @@ extension HTTPConnectionPool {
precondition(self.http2Connections == nil)
precondition(self.requests.isEmpty)

self.requests = requests

if let http1Connections = http1Connections {
self.connections = http1Connections
}

var http2Connections = http2Connections
let migration = http2Connections.migrateToHTTP1()

self.connections.migrateFromHTTP2(
starting: migration.starting,
backingOff: migration.backingOff
)

let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: requests.requestCountGroupedByRequiredEventLoop(),
generalPurposeRequestCountGroupedByPreferredEventLoop: requests.generalPurposeRequestCountGroupedByPreferredEventLoop()
)

if !http2Connections.isEmpty {
self.http2Connections = http2Connections
}

// TODO: Close all idle connections from context.close
// TODO: Start new http1 connections for pending requests
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

self.requests = requests

return .init(closeConnections: [], createConnections: [])
return .init(
closeConnections: migration.close,
createConnections: createConnections
)
}

// MARK: - Events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ extension HTTPConnectionPool {

// MARK: Migration

/// we only handle starting and backing off connection here.
/// All running connections must be handled by the enclosing state machine
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
Expand All @@ -368,6 +368,31 @@ extension HTTPConnectionPool {
}
}

/// We will create new connections for `requiredEventLoopsOfPendingRequests`
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
/// - Parameters:
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
/// - Returns: new connections that need to be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: [EventLoop]
) -> [(Connection.ID, EventLoop)] {
// create new connections for requests with a required event loop
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
self.connections.lazy
.filter {
$0.canOrWillBeAbleToExecuteRequests
}.map {
$0.eventLoop.id
}
)
return requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in
guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id)
else { return nil }
let connectionID = self.createNewConnection(on: eventLoop)
return (connectionID, eventLoop)
}
}

struct HTTP2ToHTTP1MigrationContext {
var backingOff: [(Connection.ID, EventLoop)] = []
var starting: [(Connection.ID, EventLoop)] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ extension HTTPConnectionPool {
precondition(self.connections.isEmpty)
precondition(self.requests.isEmpty)

self.requests = requests

if let http2Connections = http2Connections {
self.connections = http2Connections
}
Expand All @@ -107,17 +109,19 @@ extension HTTPConnectionPool {
backingOff: context.backingOff
)

let createConnections = self.connections.createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: requests.eventLoopsWithPendingRequests()
)

if !http1Connections.isEmpty {
self.http1Connections = http1Connections
}

self.requests = requests

// TODO: Close all idle connections from context.close
// TODO: Start new http2 connections for pending requests
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

return .init(closeConnections: [], createConnections: [])
return .init(
closeConnections: context.close,
createConnections: createConnections
)
}

mutating func executeRequest(_ request: Request) -> Action {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@

import NIOCore

private struct HashableEventLoop: Hashable {
static func == (lhs: HashableEventLoop, rhs: HashableEventLoop) -> Bool {
lhs.eventLoop === rhs.eventLoop
}

init(_ eventLoop: EventLoop) {
self.eventLoop = eventLoop
}

let eventLoop: EventLoop
func hash(into hasher: inout Hasher) {
self.eventLoop.id.hash(into: &hasher)
}
}

extension HTTPConnectionPool {
/// A struct to store all queued requests.
struct RequestQueue {
Expand Down Expand Up @@ -131,6 +146,42 @@ extension HTTPConnectionPool {
}
return nil
}

/// - Returns: event loops with at least one request with a required event loop
func eventLoopsWithPendingRequests() -> [EventLoop] {
self.eventLoopQueues.compactMap {
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`
/// however, a queue can be empty
$0.value.first?.requiredEventLoop!
}
}

/// - Returns: request count for requests with required event loop, grouped by required event loop without any particular order
func requestCountGroupedByRequiredEventLoop() -> [(EventLoop, Int)] {
self.eventLoopQueues.values.compactMap { requests -> (EventLoop, Int)? in
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`,
/// however, a queue can be empty
guard let requiredEventLoop = requests.first?.requiredEventLoop! else {
return nil
}
return (requiredEventLoop, requests.count)
}
}

/// - Returns: request count with **no** required event loop, grouped by preferred event loop and ordered descending by number of requests
func generalPurposeRequestCountGroupedByPreferredEventLoop() -> [(EventLoop, Int)] {
let requestCountPerEventLoop = Dictionary(
self.generalPurposeQueue.lazy.map { request in
(HashableEventLoop(request.preferredEventLoop), 1)
},
uniquingKeysWith: +
)
return requestCountPerEventLoop.lazy
.map { ($0.key.eventLoop, $0.value) }
.sorted { lhs, rhs in
lhs.1 > rhs.1
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ extension HTTPConnectionPool_HTTP1ConnectionsTests {
("testCloseConnectionIfIdleButLeasedRaceCondition", testCloseConnectionIfIdleButLeasedRaceCondition),
("testCloseConnectionIfIdleButClosedRaceCondition", testCloseConnectionIfIdleButClosedRaceCondition),
("testShutdown", testShutdown),
("testMigrationFromHTTP2", testMigrationFromHTTP2),
("testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithRequiredEventLoop),
("testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop", testMigrationFromHTTP2WithPendingRequestsWithPreferredEventLoop),
("testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection", testMigrationFromHTTP2WithAlreadyLeasedHTTP1Connection),
("testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections", testMigrationFromHTTP2WithMoreStartingConnectionsThanMaximumAllowedConccurentConnections),
("testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests", testMigrationFromHTTP2StartsEnoghOverflowConnectionsForRequiredEventLoopRequests),
]
}
}
Loading