Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a8679bf

Browse files
committedSep 30, 2021
HTTP2StateMachine
1 parent 2bd8885 commit a8679bf

8 files changed

+614
-6
lines changed
 

‎Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift

+51-6
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,30 @@ extension HTTPConnectionPool {
189189
preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)")
190190
}
191191
}
192+
193+
enum MigrateAction {
194+
case removeConnection
195+
case keepConnection
196+
}
197+
198+
func migrateToHTTP2(_ context: inout HTTP1Connections.HTTP2ToHTTP1MigrationContext) -> MigrateAction {
199+
switch self.state {
200+
case .starting:
201+
context.starting.append((self.connectionID, self.eventLoop))
202+
return .removeConnection
203+
case .backingOff:
204+
context.backingOff.append((self.connectionID, self.eventLoop))
205+
return .removeConnection
206+
case .idle(let connection, since: _):
207+
// Idle connections can be removed right away
208+
context.close.append(connection)
209+
return .removeConnection
210+
case .leased:
211+
return .keepConnection
212+
case .closed:
213+
preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)")
214+
}
215+
}
192216
}
193217

194218
/// A structure to hold the currently active HTTP/1.1 connections.
@@ -298,6 +322,12 @@ extension HTTPConnectionPool {
298322
var connectionsStartingForUseCase: Int
299323
}
300324

325+
struct HTTP2ToHTTP1MigrationContext {
326+
var backingOff: [(Connection.ID, EventLoop)] = []
327+
var starting: [(Connection.ID, EventLoop)] = []
328+
var close: [Connection] = []
329+
}
330+
301331
// MARK: Connection creation
302332

303333
mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID {
@@ -485,6 +515,21 @@ extension HTTPConnectionPool {
485515
return (index, context)
486516
}
487517

518+
// MARK: Migration
519+
520+
mutating func migrateToHTTP2() -> HTTP2ToHTTP1MigrationContext {
521+
var migrationContext = HTTP2ToHTTP1MigrationContext()
522+
self.connections.removeAll { connection in
523+
switch connection.migrateToHTTP2(&migrationContext) {
524+
case .removeConnection:
525+
return true
526+
case .keepConnection:
527+
return false
528+
}
529+
}
530+
return migrationContext
531+
}
532+
488533
// MARK: Shutdown
489534

490535
mutating func shutdown() -> CleanupContext {
@@ -610,12 +655,12 @@ extension HTTPConnectionPool {
610655

611656
return nil
612657
}
613-
}
614658

615-
struct Stats {
616-
var idle: Int = 0
617-
var leased: Int = 0
618-
var connecting: Int = 0
619-
var backingOff: Int = 0
659+
struct Stats {
660+
var idle: Int = 0
661+
var leased: Int = 0
662+
var connecting: Int = 0
663+
var backingOff: Int = 0
664+
}
620665
}
621666
}

‎Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift

+15
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ extension HTTPConnectionPool {
4949
}
5050
}
5151

52+
/// A connection is established and can potentially execute requests if not all streams are leased
53+
var isActive: Bool {
54+
switch self.state {
55+
case .active:
56+
return true
57+
case .starting, .backingOff, .draining, .closed:
58+
return false
59+
}
60+
}
61+
5262
/// A request can be scheduled on the connection
5363
var isAvailable: Bool {
5464
switch self.state {
@@ -326,6 +336,11 @@ extension HTTPConnectionPool {
326336

327337
// MARK: Connection creation
328338

339+
/// true if one ore more connections are active
340+
var hasActiveConnections: Bool {
341+
self.connections.contains { $0.isActive }
342+
}
343+
329344
/// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one
330345
var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool {
331346
self.connections.contains { $0.canOrWillBeAbleToExecuteRequests }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
import NIOHTTP2
17+
18+
extension HTTPConnectionPool {
19+
struct HTTP2StateMaschine {
20+
typealias Action = HTTPConnectionPool.StateMachine.Action
21+
typealias RequestAction = HTTPConnectionPool.StateMachine.RequestAction
22+
typealias ConnectionAction = HTTPConnectionPool.StateMachine.ConnectionAction
23+
24+
private var lastConnectFailure: Error?
25+
private var failedConsecutiveConnectionAttempts = 0
26+
27+
private var connections: HTTP2Connections
28+
private var http1Connections: HTTP1Connections?
29+
30+
private var requests: RequestQueue
31+
32+
private let idGenerator: Connection.ID.Generator
33+
34+
init(
35+
idGenerator: Connection.ID.Generator
36+
) {
37+
self.idGenerator = idGenerator
38+
self.requests = RequestQueue()
39+
40+
self.connections = HTTP2Connections(generator: idGenerator)
41+
}
42+
43+
mutating func migrateConnectionsFromHTTP1(
44+
connections http1Connections: HTTP1Connections,
45+
requests: RequestQueue
46+
) -> Action {
47+
precondition(self.http1Connections == nil)
48+
precondition(self.connections.isEmpty)
49+
precondition(self.requests.isEmpty)
50+
51+
var http1Connections = http1Connections // make http1Connections mutable
52+
let context = http1Connections.migrateToHTTP2()
53+
self.connections.migrateConnections(
54+
starting: context.starting,
55+
backingOff: context.backingOff
56+
)
57+
58+
if !http1Connections.isEmpty {
59+
self.http1Connections = http1Connections
60+
}
61+
62+
self.requests = requests
63+
64+
// TODO: Close all idle connections from context.close
65+
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)
66+
67+
return .none
68+
}
69+
70+
mutating func executeRequest(_ request: Request) -> Action {
71+
if let eventLoop = request.requiredEventLoop {
72+
return self.executeRequest(request, onRequired: eventLoop)
73+
} else {
74+
return self.executeRequest(request, onPreferred: request.preferredEventLoop)
75+
}
76+
}
77+
78+
private mutating func executeRequest(
79+
_ request: Request,
80+
onRequired eventLoop: EventLoop
81+
) -> Action {
82+
if let connection = self.connections.leaseStream(onRequired: eventLoop) {
83+
/// 1. we have a stream available and can execute the request immediately
84+
return .init(
85+
request: .executeRequest(request, connection, cancelTimeout: false),
86+
connection: .cancelTimeoutTimer(connection.id)
87+
)
88+
}
89+
/// 2. No available stream so we definitely need to wait until we have one
90+
self.requests.push(request)
91+
92+
if self.connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop) {
93+
/// 3. we already have a connection, we just need to wait until until it becomes available
94+
return .init(
95+
request: .scheduleRequestTimeout(for: request, on: eventLoop),
96+
connection: .none
97+
)
98+
} else {
99+
/// 4. we do *not* have a connection, need to create a new one and wait until it is connected.
100+
let connectionId = self.connections.createNewConnection(on: eventLoop)
101+
return .init(
102+
request: .scheduleRequestTimeout(for: request, on: eventLoop),
103+
connection: .createConnection(connectionId, on: eventLoop)
104+
)
105+
}
106+
}
107+
108+
private mutating func executeRequest(
109+
_ request: Request,
110+
onPreferred eventLoop: EventLoop
111+
) -> Action {
112+
if let connection = self.connections.leaseStream(onPreferred: eventLoop) {
113+
/// 1. we have a stream available and can execute the request immediately
114+
return .init(
115+
request: .executeRequest(request, connection, cancelTimeout: false),
116+
connection: .cancelTimeoutTimer(connection.id)
117+
)
118+
}
119+
/// 2. No available stream so we definitely need to wait until we have one
120+
self.requests.push(request)
121+
122+
if self.connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests {
123+
/// 3. we already have a connection, we just need to wait until until it becomes available
124+
return .init(
125+
request: .scheduleRequestTimeout(for: request, on: eventLoop),
126+
connection: .none
127+
)
128+
} else {
129+
/// 4. we do *not* have a connection, need to create a new one and wait until it is connected.
130+
let connectionId = self.connections.createNewConnection(on: eventLoop)
131+
return .init(
132+
request: .scheduleRequestTimeout(for: request, on: eventLoop),
133+
connection: .createConnection(connectionId, on: eventLoop)
134+
)
135+
}
136+
}
137+
138+
mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> Action {
139+
self.failedConsecutiveConnectionAttempts = 0
140+
self.lastConnectFailure = nil
141+
let (index, context) = self.connections.newHTTP2ConnectionEstablished(
142+
connection,
143+
maxConcurrentStreams: maxConcurrentStreams
144+
)
145+
return self.nextActionForAvailableConnection(at: index, context: context)
146+
}
147+
148+
private mutating func nextActionForAvailableConnection(
149+
at index: Int,
150+
context: HTTP2Connections.AvailableConnectionContext
151+
) -> Action {
152+
// We prioritise requests with a required event loop over those without a requirement.
153+
// This can cause starvation for request without a required event loop.
154+
// We should come up with a better algorithm in the future.
155+
156+
var requestsToExecute = self.requests.popFirst(max: context.availableStreams, for: context.eventLoop)
157+
let remainingAvailableStreams = context.availableStreams - requestsToExecute.count
158+
// use the remaining available streams for requests without a required event loop
159+
requestsToExecute += self.requests.popFirst(max: remainingAvailableStreams, for: nil)
160+
let connection = self.connections.leaseStreams(at: index, count: requestsToExecute.count)
161+
162+
let requestAction = { () -> RequestAction in
163+
if requestsToExecute.isEmpty {
164+
return .none
165+
} else {
166+
return .executeRequestsAndCancelTimeouts(requestsToExecute, connection)
167+
}
168+
}()
169+
170+
let connectionAction = { () -> ConnectionAction in
171+
if context.isIdle, requestsToExecute.isEmpty {
172+
return .scheduleTimeoutTimer(connection.id, on: context.eventLoop)
173+
} else {
174+
return .none
175+
}
176+
}()
177+
178+
return .init(
179+
request: requestAction,
180+
connection: connectionAction
181+
)
182+
}
183+
184+
mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action {
185+
let (index, context) = self.connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
186+
return self.nextActionForAvailableConnection(at: index, context: context)
187+
}
188+
189+
mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action {
190+
let context = self.connections.goAwayReceived(connectionID)
191+
return self.nextActionForClosingConnection(on: context.eventLoop)
192+
}
193+
194+
mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action {
195+
guard let (index, context) = self.connections.failConnection(connectionID) else {
196+
return .none
197+
}
198+
return self.nextActionForFailedConnection(at: index, on: context.eventLoop)
199+
}
200+
201+
private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action {
202+
let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil)
203+
guard hasPendingRequest else {
204+
return .none
205+
}
206+
207+
let (newConnectionID, previousEventLoop) = self.connections.createNewConnectionByReplacingClosedConnection(at: index)
208+
precondition(previousEventLoop === eventLoop)
209+
210+
return .init(
211+
request: .none,
212+
connection: .createConnection(newConnectionID, on: eventLoop)
213+
)
214+
}
215+
216+
private mutating func nextActionForClosingConnection(on eventLoop: EventLoop) -> Action {
217+
let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil)
218+
guard hasPendingRequest else {
219+
return .none
220+
}
221+
222+
let newConnectionID = self.connections.createNewConnection(on: eventLoop)
223+
224+
return .init(
225+
request: .none,
226+
connection: .createConnection(newConnectionID, on: eventLoop)
227+
)
228+
}
229+
230+
mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action {
231+
let (index, context) = self.connections.releaseStream(connectionID)
232+
return self.nextActionForAvailableConnection(at: index, context: context)
233+
}
234+
235+
mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action {
236+
self.failedConsecutiveConnectionAttempts += 1
237+
self.lastConnectFailure = error
238+
239+
let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID)
240+
let backoff = calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts)
241+
return .init(request: .none, connection: .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop))
242+
}
243+
244+
mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action {
245+
// The naming of `failConnection` is a little confusing here. All it does is moving the
246+
// connection state from `.backingOff` to `.closed` here. It also returns the
247+
// connection's index.
248+
guard let (index, _) = self.connections.failConnection(connectionID) else {
249+
preconditionFailure("Backing off a connection that is unknown to us?")
250+
}
251+
let (newConnectionID, eventLoop) = self.connections.createNewConnectionByReplacingClosedConnection(at: index)
252+
return .init(request: .none, connection: .createConnection(newConnectionID, on: eventLoop))
253+
}
254+
255+
mutating func timeoutRequest(_ requestID: Request.ID) -> Action {
256+
// 1. check requests in queue
257+
if let request = self.requests.remove(requestID) {
258+
var error: Error = HTTPClientError.getConnectionFromPoolTimeout
259+
if let lastError = self.lastConnectFailure {
260+
error = lastError
261+
} else if !self.connections.hasActiveConnections {
262+
error = HTTPClientError.connectTimeout
263+
}
264+
return .init(
265+
request: .failRequest(request, error, cancelTimeout: false),
266+
connection: .none
267+
)
268+
}
269+
270+
// 2. This point is reached, because the request may have already been scheduled. A
271+
// connection might have become available shortly before the request timeout timer
272+
// fired.
273+
return .none
274+
}
275+
276+
mutating func cancelRequest(_ requestID: Request.ID) -> Action {
277+
// 1. check requests in queue
278+
if self.requests.remove(requestID) != nil {
279+
return .init(
280+
request: .cancelRequestTimeout(requestID),
281+
connection: .none
282+
)
283+
}
284+
285+
// 2. This is point is reached, because the request may already have been forwarded to
286+
// an idle connection. In this case the connection will need to handle the
287+
// cancellation.
288+
return .none
289+
}
290+
291+
mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action {
292+
guard let connection = connections.closeConnectionIfIdle(connectionID) else {
293+
return .none
294+
}
295+
return .init(
296+
request: .none,
297+
connection: .closeConnection(connection, isShutdown: .no)
298+
)
299+
}
300+
301+
mutating func connectionClosed(_ connectionID: Connection.ID) -> Action {
302+
guard let (index, context) = self.connections.failConnection(connectionID) else {
303+
// When a connection close is initiated by the connection pool, the connection will
304+
// still report its close to the state machine. In those cases we must ignore the
305+
// event.
306+
return .none
307+
}
308+
return self.nextActionForFailedConnection(at: index, on: context.eventLoop)
309+
}
310+
311+
mutating func http1ConnectionReleased(_: Connection.ID) -> Action {
312+
fatalError("TODO: implement \(#function)")
313+
}
314+
315+
mutating func shutdown() -> Action {
316+
// If we have remaining request queued, we should fail all of them with a cancelled
317+
// error.
318+
let waitingRequests = self.requests.removeAll()
319+
320+
var requestAction: StateMachine.RequestAction = .none
321+
if !waitingRequests.isEmpty {
322+
requestAction = .failRequestsAndCancelTimeouts(waitingRequests, HTTPClientError.cancelled)
323+
}
324+
325+
// clean up the connections, we can cleanup now!
326+
let cleanupContext = self.connections.shutdown()
327+
328+
// If there aren't any more connections, everything is shutdown
329+
let isShutdown: StateMachine.ConnectionAction.IsShutdown
330+
let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty)
331+
if self.connections.isEmpty {
332+
isShutdown = .yes(unclean: unclean)
333+
} else {
334+
isShutdown = .no
335+
}
336+
return .init(
337+
request: requestAction,
338+
connection: .cleanupConnections(cleanupContext, isShutdown: isShutdown)
339+
)
340+
}
341+
}
342+
}

‎Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift

+36
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,21 @@ extension HTTPConnectionPool {
7070
}
7171
}
7272

73+
/// removes up to `max` requests from the queue for the given `eventLoop` and returns them.
74+
/// - Parameters:
75+
/// - max: maximum number of requests to pop
76+
/// - eventLoop: required event loop of the request
77+
/// - Returns: requests for the given `eventLoop`
78+
mutating func popFirst(max: Int, for eventLoop: EventLoop? = nil) -> [Request] {
79+
if let eventLoop = eventLoop {
80+
return self.withEventLoopQueue(for: eventLoop.id) { queue in
81+
queue.popFirst(max: max)
82+
}
83+
} else {
84+
return self.generalPurposeQueue.popFirst(max: max)
85+
}
86+
}
87+
7388
mutating func remove(_ requestID: Request.ID) -> Request? {
7489
if let eventLoopID = requestID.eventLoopID {
7590
return self.withEventLoopQueue(for: eventLoopID) { queue in
@@ -118,3 +133,24 @@ extension HTTPConnectionPool {
118133
}
119134
}
120135
}
136+
137+
extension CircularBuffer {
138+
/// Removes up to `max` elements from the beginning of the
139+
/// `CircularBuffer` and returns them.
140+
///
141+
/// Calling this method may invalidate any existing indices for use with this
142+
/// `CircularBuffer`.
143+
///
144+
/// - Parameter max: The number of elements to remove.
145+
/// `max` must be greater than or equal to zero.
146+
/// - Returns: removed elements
147+
///
148+
/// - Complexity: O(*k*), where *k* is the number of elements removed.
149+
fileprivate mutating func popFirst(max: Int) -> [Element] {
150+
precondition(max >= 0, "")
151+
let elementCountToRemove = Swift.min(max, self.count)
152+
let array = Array(self[self.startIndex..<self.index(self.startIndex, offsetBy: elementCountToRemove)])
153+
self.removeFirst(elementCountToRemove)
154+
return array
155+
}
156+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
//
15+
// HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift
16+
//
17+
import XCTest
18+
19+
///
20+
/// NOTE: This file was generated by generate_linux_tests.rb
21+
///
22+
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
23+
///
24+
25+
extension HTTPConnectionPool_HTTP2StateMachineTests {
26+
static var allTests: [(String, (HTTPConnectionPool_HTTP2StateMachineTests) -> () throws -> Void)] {
27+
return [
28+
("testCreatingOfConnection", testCreatingOfConnection),
29+
]
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import AsyncHTTPClient
16+
import NIOCore
17+
import NIOEmbedded
18+
import NIOHTTP1
19+
import NIOPosix
20+
import XCTest
21+
22+
private typealias Action = HTTPConnectionPool.StateMachine.Action
23+
private typealias ConnectionAction = HTTPConnectionPool.StateMachine.ConnectionAction
24+
private typealias RequestAction = HTTPConnectionPool.StateMachine.RequestAction
25+
26+
class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {
27+
func testCreatingOfConnection() {
28+
let elg = EmbeddedEventLoopGroup(loops: 1)
29+
let el1 = elg.next()
30+
var connections = MockConnectionPool()
31+
var queuer = MockRequestQueuer()
32+
var state = HTTPConnectionPool.HTTP2StateMaschine(idGenerator: .init())
33+
34+
/// first request should create a new connection
35+
let mockRequest = MockHTTPRequest(eventLoop: el1)
36+
let request = HTTPConnectionPool.Request(mockRequest)
37+
let executeAction = state.executeRequest(request)
38+
39+
guard case .createConnection(let connID, let eventLoop) = executeAction.connection else {
40+
return XCTFail("Unexpected connection action \(executeAction.connection)")
41+
}
42+
XCTAssertTrue(eventLoop === el1)
43+
44+
XCTAssertEqual(executeAction.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop))
45+
46+
XCTAssertNoThrow(try connections.createConnection(connID, on: el1))
47+
XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id))
48+
49+
/// subsequent requests should not create a connection
50+
for _ in 0..<9 {
51+
let mockRequest = MockHTTPRequest(eventLoop: el1)
52+
let request = HTTPConnectionPool.Request(mockRequest)
53+
let action = state.executeRequest(request)
54+
55+
XCTAssertEqual(action.connection, .none)
56+
XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop))
57+
58+
XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id))
59+
}
60+
61+
/// connection establishment should result in 5 request executions because we set max concurrent streams to 5
62+
var maybeConn: HTTPConnectionPool.Connection?
63+
XCTAssertNoThrow(maybeConn = try connections.succeedConnectionCreationHTTP2(connID, maxConcurrentStreams: 5))
64+
guard let conn = maybeConn else {
65+
return XCTFail("unexpected throw")
66+
}
67+
let action = state.newHTTP2ConnectionEstablished(conn, maxConcurrentStreams: 5)
68+
69+
XCTAssertEqual(action.connection, .none)
70+
guard case .executeRequestsAndCancelTimeouts(let requests, conn) = action.request else {
71+
return XCTFail("Unexpected request action \(action.request)")
72+
}
73+
XCTAssertEqual(requests.count, 5)
74+
75+
for request in requests {
76+
XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request()))
77+
}
78+
79+
/// closing a stream while we have requests queued should result in one request execution action
80+
for _ in 0..<5 {
81+
let action = state.http2ConnectionStreamClosed(connID)
82+
XCTAssertEqual(action.connection, .none)
83+
guard case .executeRequestsAndCancelTimeouts(let requests, conn) = action.request else {
84+
return XCTFail("Unexpected request action \(action.request)")
85+
}
86+
XCTAssertEqual(requests.count, 1)
87+
for request in requests {
88+
XCTAssertNoThrow(try queuer.cancel(request.id))
89+
}
90+
}
91+
XCTAssertTrue(queuer.isEmpty)
92+
93+
/// closing streams without any queued requests shouldn't do anything if it's *not* the last stream
94+
for _ in 0..<4 {
95+
let action = state.http2ConnectionStreamClosed(connID)
96+
XCTAssertEqual(action.request, .none)
97+
XCTAssertEqual(action.connection, .none)
98+
}
99+
100+
/// closing the last stream should schedule a idle timeout
101+
let streamCloseAction = state.http2ConnectionStreamClosed(connID)
102+
XCTAssertEqual(streamCloseAction.request, .none)
103+
XCTAssertEqual(streamCloseAction.connection, .scheduleTimeoutTimer(connID, on: el1))
104+
105+
/// shutdown should only close one connection
106+
let shutdownAction = state.shutdown()
107+
XCTAssertEqual(shutdownAction.request, .none)
108+
XCTAssertEqual(shutdownAction.connection, .cleanupConnections(
109+
.init(
110+
close: [conn],
111+
cancel: [],
112+
connectBackoff: []
113+
),
114+
isShutdown: .yes(unclean: false)
115+
))
116+
}
117+
}

‎Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift

+21
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ struct MockConnectionPool {
162162
self.state = .http1(.idle(parked: false, idleSince: .now()))
163163
}
164164

165+
mutating func http2Started(maxConcurrentStreams: Int) throws {
166+
guard case .starting = self.state else {
167+
throw Errors.connectionIsNotStarting
168+
}
169+
170+
self.state = .http2(.idle(maxConcurrentStreams: maxConcurrentStreams, parked: false, lastIdle: .now()))
171+
}
172+
165173
mutating func park() throws {
166174
switch self.state {
167175
case .starting, .closed, .http1(.inUse), .http2(.inUse):
@@ -333,6 +341,19 @@ struct MockConnectionPool {
333341
return .__testOnly_connection(id: connection.id, eventLoop: connection.eventLoop)
334342
}
335343

344+
mutating func succeedConnectionCreationHTTP2(
345+
_ connectionID: Connection.ID,
346+
maxConcurrentStreams: Int
347+
) throws -> Connection {
348+
guard var connection = self.connections[connectionID] else {
349+
throw Errors.connectionNotFound
350+
}
351+
352+
try connection.http2Started(maxConcurrentStreams: maxConcurrentStreams)
353+
self.connections[connection.id] = connection
354+
return .__testOnly_connection(id: connection.id, eventLoop: connection.eventLoop)
355+
}
356+
336357
mutating func failConnectionCreation(_ connectionID: Connection.ID) throws {
337358
guard let connection = self.connections[connectionID] else {
338359
throw Errors.connectionNotFound

‎Tests/LinuxMain.swift

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import XCTest
4343
testCase(HTTPConnectionPool_HTTP1ConnectionsTests.allTests),
4444
testCase(HTTPConnectionPool_HTTP1StateMachineTests.allTests),
4545
testCase(HTTPConnectionPool_HTTP2ConnectionsTests.allTests),
46+
testCase(HTTPConnectionPool_HTTP2StateMachineTests.allTests),
4647
testCase(HTTPConnectionPool_ManagerTests.allTests),
4748
testCase(HTTPConnectionPool_RequestQueueTests.allTests),
4849
testCase(HTTPRequestStateMachineTests.allTests),

0 commit comments

Comments
 (0)
Please sign in to comment.