Skip to content

Commit 32c4e7f

Browse files
committed
integrate http2 state machine
1 parent 4147fd6 commit 32c4e7f

8 files changed

+540
-66
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift

+3-2
Original file line numberDiff line numberDiff line change
@@ -618,9 +618,10 @@ extension HTTPConnectionPool {
618618
// TODO: improve algorithm to create connections uniformly across all preferred event loops
619619
// while paying attention to the number of queued request per event loop
620620
// Currently we start by creating new connections on the event loop with the most queued
621-
// requests. If we have create a enough connections to cover all requests for the given
621+
// requests. If we have created enough connections to cover all requests for the first
622622
// event loop we will continue with the event loop with the second most queued requests
623-
// and so on and so forth. We do not need to sort the array because
623+
// and so on and so forth. The `generalPurposeRequestCountGroupedByPreferredEventLoop`
624+
// array is already ordered so we can just iterate over it without sorting by request count.
624625
let newGeneralPurposeConnections: [(Connection.ID, EventLoop)] = generalPurposeRequestCountGroupedByPreferredEventLoop
625626
// we do not want to allocated intermediate arrays.
626627
.lazy

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift

+29-3
Original file line numberDiff line numberDiff line change
@@ -404,9 +404,9 @@ extension HTTPConnectionPool {
404404
self.connections.removeAll { connection in
405405
switch connection.migrateToHTTP1(context: &context) {
406406
case .removeConnection:
407-
return false
408-
case .keepConnection:
409407
return true
408+
case .keepConnection:
409+
return false
410410
}
411411
}
412412
return context
@@ -419,7 +419,7 @@ extension HTTPConnectionPool {
419419
self.connections.contains { $0.isActive }
420420
}
421421

422-
/// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one
422+
/// used in general purpose connection scenarios to check if at least one connection is starting or active for the given `eventLoop`
423423
var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool {
424424
self.connections.contains { $0.canOrWillBeAbleToExecuteRequests }
425425
}
@@ -433,6 +433,32 @@ extension HTTPConnectionPool {
433433
}
434434
}
435435

436+
func hasActiveConnection(for eventLoop: EventLoop) -> Bool {
437+
self.connections.contains {
438+
$0.eventLoop === eventLoop && $0.isActive
439+
}
440+
}
441+
442+
/// used after backoff is done to determine if we need to create a new connection
443+
func hasStartingOrActiveConnection() -> Bool {
444+
self.connections.contains { connection in
445+
connection.canOrWillBeAbleToExecuteRequests
446+
}
447+
}
448+
449+
/// used after backoff is done to determine if we need to create a new connection
450+
/// - Parameters:
451+
/// - eventLoop: connection `EventLoop` to search for
452+
/// - Returns: true if at least one connection is starting or active for the given `eventLoop`
453+
func hasStartingOrActiveConnection(
454+
for eventLoop: EventLoop
455+
) -> Bool {
456+
self.connections.contains { connection in
457+
connection.eventLoop === eventLoop &&
458+
connection.canOrWillBeAbleToExecuteRequests
459+
}
460+
}
461+
436462
mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID {
437463
assert(
438464
!self.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop),

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift

+39-7
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,22 @@ extension HTTPConnectionPool {
228228
private mutating func _newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> EstablishedAction {
229229
self.failedConsecutiveConnectionAttempts = 0
230230
self.lastConnectFailure = nil
231-
let (index, context) = self.connections.newHTTP2ConnectionEstablished(
232-
connection,
233-
maxConcurrentStreams: maxConcurrentStreams
234-
)
235-
return self.nextActionForAvailableConnection(at: index, context: context)
231+
if self.connections.hasActiveConnection(for: connection.eventLoop) {
232+
guard let (index, _) = self.connections.failConnection(connection.id) else {
233+
preconditionFailure("we connection to a connection which we no nothing about")
234+
}
235+
self.connections.removeConnection(at: index)
236+
return .init(
237+
request: .none,
238+
connection: .closeConnection(connection, isShutdown: .no)
239+
)
240+
} else {
241+
let (index, context) = self.connections.newHTTP2ConnectionEstablished(
242+
connection,
243+
maxConcurrentStreams: maxConcurrentStreams
244+
)
245+
return self.nextActionForAvailableConnection(at: index, context: context)
246+
}
236247
}
237248

238249
private mutating func nextActionForAvailableConnection(
@@ -318,8 +329,28 @@ extension HTTPConnectionPool {
318329
private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action {
319330
switch self.state {
320331
case .running:
321-
let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil)
322-
guard hasPendingRequest else {
332+
// we do not know if we have created this connection for a request with a required
333+
// event loop or not. However, we do not need this information and can infer
334+
// if we need to create a new connection because we will only ever create one connection
335+
// per event loop for required event loop requests and only need one connection for
336+
// general purpose requests.
337+
338+
// we need to start a new on connection in two cases:
339+
let needGeneralPurposeConnection =
340+
// 1. if we have general purpose requests
341+
!self.requests.isEmpty(for: nil) &&
342+
// and no connection starting or active
343+
!self.connections.hasStartingOrActiveConnection()
344+
345+
let needRequiredEventLoopConnection =
346+
// 2. or if we have requests for a required event loop
347+
!self.requests.isEmpty(for: eventLoop) &&
348+
// and no connection starting or active for the given event loop
349+
!self.connections.hasStartingOrActiveConnection(for: eventLoop)
350+
351+
guard needGeneralPurposeConnection || needRequiredEventLoopConnection else {
352+
// otherwise we can remove the connection
353+
self.connections.removeConnection(at: index)
323354
return .none
324355
}
325356

@@ -330,6 +361,7 @@ extension HTTPConnectionPool {
330361
request: .none,
331362
connection: .createConnection(newConnectionID, on: eventLoop)
332363
)
364+
333365
case .shuttingDown(let unclean):
334366
assert(self.requests.isEmpty)
335367
self.connections.removeConnection(at: index)

Diff for: Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift

+138-54
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,23 @@ extension HTTPConnectionPool {
6868

6969
enum HTTPVersionState {
7070
case http1(HTTP1StateMachine)
71+
case http2(HTTP2StateMachine)
72+
73+
mutating func modify<ReturnValue>(
74+
http1: (inout HTTP1StateMachine) -> ReturnValue,
75+
http2: (inout HTTP2StateMachine) -> ReturnValue
76+
) -> ReturnValue {
77+
let returnValue: ReturnValue
78+
switch self {
79+
case .http1(var http1State):
80+
returnValue = http1(&http1State)
81+
self = .http1(http1State)
82+
case .http2(var http2State):
83+
returnValue = http2(&http2State)
84+
self = .http2(http2State)
85+
}
86+
return returnValue
87+
}
7188
}
7289

7390
var state: HTTPVersionState
@@ -87,12 +104,11 @@ extension HTTPConnectionPool {
87104
}
88105

89106
mutating func executeRequest(_ request: Request) -> Action {
90-
switch self.state {
91-
case .http1(var http1StateMachine):
92-
let action = http1StateMachine.executeRequest(request)
93-
self.state = .http1(http1StateMachine)
94-
return action
95-
}
107+
self.state.modify(http1: { http1 in
108+
http1.executeRequest(request)
109+
}, http2: { http2 in
110+
http2.executeRequest(request)
111+
})
96112
}
97113

98114
mutating func newHTTP1ConnectionCreated(_ connection: Connection) -> Action {
@@ -101,28 +117,100 @@ extension HTTPConnectionPool {
101117
let action = http1StateMachine.newHTTP1ConnectionEstablished(connection)
102118
self.state = .http1(http1StateMachine)
103119
return action
120+
121+
case .http2(let http2StateMachine):
122+
var http1StateMachine = HTTP1StateMachine(
123+
idGenerator: self.idGenerator,
124+
maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections
125+
)
126+
127+
let newConnectionAction = http1StateMachine.migrateFromHTTP2(
128+
http2State: http2StateMachine,
129+
newHTTP1Connection: connection
130+
)
131+
self.state = .http1(http1StateMachine)
132+
return newConnectionAction
104133
}
105134
}
106135

107-
mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action {
136+
mutating func newHTTP2ConnectionCreated(_ connection: Connection, maxConcurrentStreams: Int) -> Action {
108137
switch self.state {
109-
case .http1(var http1StateMachine):
110-
let action = http1StateMachine.failedToCreateNewConnection(
138+
case .http1(let http1StateMachine):
139+
140+
var http2StateMachine = HTTP2StateMachine(
141+
idGenerator: self.idGenerator
142+
)
143+
let migrationAction = http2StateMachine.migrateFromHTTP1(
144+
http1State: http1StateMachine,
145+
newHTTP2Connection: connection,
146+
maxConcurrentStreams: maxConcurrentStreams
147+
)
148+
149+
self.state = .http2(http2StateMachine)
150+
return migrationAction
151+
152+
case .http2(var http2StateMachine):
153+
let newConnectionAction = http2StateMachine.newHTTP2ConnectionEstablished(
154+
connection,
155+
maxConcurrentStreams: maxConcurrentStreams
156+
)
157+
self.state = .http2(http2StateMachine)
158+
return newConnectionAction
159+
}
160+
}
161+
162+
mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action {
163+
self.state.modify(http1: { http1 in
164+
http1.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
165+
}, http2: { http2 in
166+
http2.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
167+
})
168+
}
169+
170+
mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action {
171+
self.state.modify(http1: { http1 in
172+
http1.http2ConnectionGoAwayReceived(connectionID)
173+
}, http2: { http2 in
174+
http2.http2ConnectionGoAwayReceived(connectionID)
175+
})
176+
}
177+
178+
mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action {
179+
self.state.modify(http1: { http1 in
180+
http1.http2ConnectionClosed(connectionID)
181+
}, http2: { http2 in
182+
http2.http2ConnectionClosed(connectionID)
183+
})
184+
}
185+
186+
mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action {
187+
self.state.modify(http1: { http1 in
188+
http1.http2ConnectionStreamClosed(connectionID)
189+
}, http2: { http2 in
190+
http2.http2ConnectionStreamClosed(connectionID)
191+
})
192+
}
193+
194+
mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action {
195+
self.state.modify(http1: { http1 in
196+
http1.failedToCreateNewConnection(
111197
error,
112198
connectionID: connectionID
113199
)
114-
self.state = .http1(http1StateMachine)
115-
return action
116-
}
200+
}, http2: { http2 in
201+
http2.failedToCreateNewConnection(
202+
error,
203+
connectionID: connectionID
204+
)
205+
})
117206
}
118207

119208
mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action {
120-
switch self.state {
121-
case .http1(var http1StateMachine):
122-
let action = http1StateMachine.connectionCreationBackoffDone(connectionID)
123-
self.state = .http1(http1StateMachine)
124-
return action
125-
}
209+
self.state.modify(http1: { http1 in
210+
http1.connectionCreationBackoffDone(connectionID)
211+
}, http2: { http2 in
212+
http2.connectionCreationBackoffDone(connectionID)
213+
})
126214
}
127215

128216
/// A request has timed out.
@@ -131,12 +219,11 @@ extension HTTPConnectionPool {
131219
/// request, but don't need to cancel the timer (it already triggered). If a request is cancelled
132220
/// we don't need to fail it but we need to cancel its timeout timer.
133221
mutating func timeoutRequest(_ requestID: Request.ID) -> Action {
134-
switch self.state {
135-
case .http1(var http1StateMachine):
136-
let action = http1StateMachine.timeoutRequest(requestID)
137-
self.state = .http1(http1StateMachine)
138-
return action
139-
}
222+
self.state.modify(http1: { http1 in
223+
http1.timeoutRequest(requestID)
224+
}, http2: { http2 in
225+
http2.timeoutRequest(requestID)
226+
})
140227
}
141228

142229
/// A request was cancelled.
@@ -145,53 +232,48 @@ extension HTTPConnectionPool {
145232
/// need to cancel its timeout timer. If a request times out, we need to fail the request, but don't
146233
/// need to cancel the timer (it already triggered).
147234
mutating func cancelRequest(_ requestID: Request.ID) -> Action {
148-
switch self.state {
149-
case .http1(var http1StateMachine):
150-
let action = http1StateMachine.cancelRequest(requestID)
151-
self.state = .http1(http1StateMachine)
152-
return action
153-
}
235+
self.state.modify(http1: { http1 in
236+
http1.cancelRequest(requestID)
237+
}, http2: { http2 in
238+
http2.cancelRequest(requestID)
239+
})
154240
}
155241

156242
mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action {
157-
switch self.state {
158-
case .http1(var http1StateMachine):
159-
let action = http1StateMachine.connectionIdleTimeout(connectionID)
160-
self.state = .http1(http1StateMachine)
161-
return action
162-
}
243+
self.state.modify(http1: { http1 in
244+
http1.connectionIdleTimeout(connectionID)
245+
}, http2: { http2 in
246+
http2.connectionIdleTimeout(connectionID)
247+
})
163248
}
164249

165250
/// A connection has been closed
166251
mutating func http1ConnectionClosed(_ connectionID: Connection.ID) -> Action {
167-
switch self.state {
168-
case .http1(var http1StateMachine):
169-
let action = http1StateMachine.http1ConnectionClosed(connectionID)
170-
self.state = .http1(http1StateMachine)
171-
return action
172-
}
252+
self.state.modify(http1: { http1 in
253+
http1.http1ConnectionClosed(connectionID)
254+
}, http2: { http2 in
255+
http2.http1ConnectionClosed(connectionID)
256+
})
173257
}
174258

175259
mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action {
176-
switch self.state {
177-
case .http1(var http1StateMachine):
178-
let action = http1StateMachine.http1ConnectionReleased(connectionID)
179-
self.state = .http1(http1StateMachine)
180-
return action
181-
}
260+
self.state.modify(http1: { http1 in
261+
http1.http1ConnectionReleased(connectionID)
262+
}, http2: { http2 in
263+
http2.http1ConnectionReleased(connectionID)
264+
})
182265
}
183266

184267
mutating func shutdown() -> Action {
185268
precondition(!self.isShuttingDown, "Shutdown must only be called once")
186269

187270
self.isShuttingDown = true
188271

189-
switch self.state {
190-
case .http1(var http1StateMachine):
191-
let action = http1StateMachine.shutdown()
192-
self.state = .http1(http1StateMachine)
193-
return action
194-
}
272+
return self.state.modify(http1: { http1 in
273+
http1.shutdown()
274+
}, http2: { http2 in
275+
http2.shutdown()
276+
})
195277
}
196278
}
197279
}
@@ -221,6 +303,8 @@ extension HTTPConnectionPool.StateMachine: CustomStringConvertible {
221303
switch self.state {
222304
case .http1(let http1):
223305
return ".http1(\(http1))"
306+
case .http2(let http2):
307+
return ".http2(\(http2))"
224308
}
225309
}
226310
}

0 commit comments

Comments
 (0)