Skip to content

Commit 866f570

Browse files
committed
integrate http2 state machine
1 parent 4147fd6 commit 866f570

8 files changed

+543
-73
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift

+3-2
Original file line numberDiff line numberDiff line change
@@ -618,9 +618,10 @@ extension HTTPConnectionPool {
618618
// TODO: improve algorithm to create connections uniformly across all preferred event loops
619619
// while paying attention to the number of queued request per event loop
620620
// Currently we start by creating new connections on the event loop with the most queued
621-
// requests. If we have create a enough connections to cover all requests for the given
621+
// requests. If we have created enough connections to cover all requests for the first
622622
// event loop we will continue with the event loop with the second most queued requests
623-
// and so on and so forth. We do not need to sort the array because
623+
// and so on and so forth. The `generalPurposeRequestCountGroupedByPreferredEventLoop`
624+
// array is already ordered so we can just iterate over it without sorting by request count.
624625
let newGeneralPurposeConnections: [(Connection.ID, EventLoop)] = generalPurposeRequestCountGroupedByPreferredEventLoop
625626
// we do not want to allocated intermediate arrays.
626627
.lazy

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift

+29-3
Original file line numberDiff line numberDiff line change
@@ -404,9 +404,9 @@ extension HTTPConnectionPool {
404404
self.connections.removeAll { connection in
405405
switch connection.migrateToHTTP1(context: &context) {
406406
case .removeConnection:
407-
return false
408-
case .keepConnection:
409407
return true
408+
case .keepConnection:
409+
return false
410410
}
411411
}
412412
return context
@@ -419,7 +419,7 @@ extension HTTPConnectionPool {
419419
self.connections.contains { $0.isActive }
420420
}
421421

422-
/// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one
422+
/// used in general purpose connection scenarios to check if at least one connection is starting or active for the given `eventLoop`
423423
var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool {
424424
self.connections.contains { $0.canOrWillBeAbleToExecuteRequests }
425425
}
@@ -433,6 +433,32 @@ extension HTTPConnectionPool {
433433
}
434434
}
435435

436+
func hasActiveConnection(for eventLoop: EventLoop) -> Bool {
437+
self.connections.contains {
438+
$0.eventLoop === eventLoop && $0.isActive
439+
}
440+
}
441+
442+
/// used after backoff is done to determine if we need to create a new connection
443+
func hasStartingOrActiveConnection() -> Bool {
444+
self.connections.contains { connection in
445+
connection.canOrWillBeAbleToExecuteRequests
446+
}
447+
}
448+
449+
/// used after backoff is done to determine if we need to create a new connection
450+
/// - Parameters:
451+
/// - eventLoop: connection `EventLoop` to search for
452+
/// - Returns: true if at least one connection is starting or active for the given `eventLoop`
453+
func hasStartingOrActiveConnection(
454+
for eventLoop: EventLoop
455+
) -> Bool {
456+
self.connections.contains { connection in
457+
connection.eventLoop === eventLoop &&
458+
connection.canOrWillBeAbleToExecuteRequests
459+
}
460+
}
461+
436462
mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID {
437463
assert(
438464
!self.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop),

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift

+42-14
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,22 @@ extension HTTPConnectionPool {
228228
private mutating func _newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> EstablishedAction {
229229
self.failedConsecutiveConnectionAttempts = 0
230230
self.lastConnectFailure = nil
231-
let (index, context) = self.connections.newHTTP2ConnectionEstablished(
232-
connection,
233-
maxConcurrentStreams: maxConcurrentStreams
234-
)
235-
return self.nextActionForAvailableConnection(at: index, context: context)
231+
if self.connections.hasActiveConnection(for: connection.eventLoop) {
232+
guard let (index, _) = self.connections.failConnection(connection.id) else {
233+
preconditionFailure("we connection to a connection which we no nothing about")
234+
}
235+
self.connections.removeConnection(at: index)
236+
return .init(
237+
request: .none,
238+
connection: .closeConnection(connection, isShutdown: .no)
239+
)
240+
} else {
241+
let (index, context) = self.connections.newHTTP2ConnectionEstablished(
242+
connection,
243+
maxConcurrentStreams: maxConcurrentStreams
244+
)
245+
return self.nextActionForAvailableConnection(at: index, context: context)
246+
}
236247
}
237248

238249
private mutating func nextActionForAvailableConnection(
@@ -318,18 +329,35 @@ extension HTTPConnectionPool {
318329
private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action {
319330
switch self.state {
320331
case .running:
321-
let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil)
322-
guard hasPendingRequest else {
332+
// we do not know if we have created this connection for a request with a required
333+
// event loop or not. However, we do not need this information and can infer
334+
// if we need to create a new connection because we will only ever create one connection
335+
// per event loop for required event loop requests and only need one connection for
336+
// general purpose requests.
337+
338+
// we need to start a new on connection in two cases:
339+
if
340+
// 1. if we have general purpose requests
341+
self.requests.isEmpty(for: nil) &&
342+
// and no connection starting or active
343+
self.connections.hasStartingOrActiveConnection() ||
344+
// 2. or if we have requests for a required event loop
345+
self.requests.isEmpty(for: eventLoop) &&
346+
// and no connection starting or active for the given event loop
347+
self.connections.hasStartingOrActiveConnection(for: eventLoop) {
348+
let (newConnectionID, previousEventLoop) = self.connections.createNewConnectionByReplacingClosedConnection(at: index)
349+
precondition(previousEventLoop === eventLoop)
350+
351+
return .init(
352+
request: .none,
353+
connection: .createConnection(newConnectionID, on: eventLoop)
354+
)
355+
} else {
356+
// otherwise we can remove the connection
357+
self.connections.removeConnection(at: index)
323358
return .none
324359
}
325360

326-
let (newConnectionID, previousEventLoop) = self.connections.createNewConnectionByReplacingClosedConnection(at: index)
327-
precondition(previousEventLoop === eventLoop)
328-
329-
return .init(
330-
request: .none,
331-
connection: .createConnection(newConnectionID, on: eventLoop)
332-
)
333361
case .shuttingDown(let unclean):
334362
assert(self.requests.isEmpty)
335363
self.connections.removeConnection(at: index)

0 commit comments

Comments
 (0)