Skip to content

Commit 55364b6

Browse files
committed
[HTTP2] Test max streams setting is respected.
1 parent 60fef53 commit 55364b6

File tree

3 files changed

+245
-0
lines changed

3 files changed

+245
-0
lines changed

Diff for: Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests {
3939
("testMigrationFromHTTP1ToHTTP2", testMigrationFromHTTP1ToHTTP2),
4040
("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections),
4141
("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration),
42+
("testMaxConcurrentStreamsIsRespected", testMaxConcurrentStreamsIsRespected),
4243
]
4344
}
4445
}

Diff for: Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift

+145
Original file line numberDiff line numberDiff line change
@@ -869,4 +869,149 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {
869869
XCTAssertEqual(releaseAction.request, .none)
870870
XCTAssertNoThrow(try connections.closeConnection(http2Conn))
871871
}
872+
873+
func testMaxConcurrentStreamsIsRespected() {
874+
let elg = EmbeddedEventLoopGroup(loops: 4)
875+
defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) }
876+
877+
guard var (connections, state) = try? MockConnectionPool.http2(elg: elg, maxConcurrentStreams: 100) else {
878+
return XCTFail("Test setup failed")
879+
}
880+
881+
let generalPurposeConnection = connections.randomParkedConnection()!
882+
var queuer = MockRequestQueuer()
883+
884+
// schedule 1000 requests on the pool. The first 100 will be executed right away. All others
885+
// shall be queued.
886+
for i in 0..<1000 {
887+
let requestEL = elg.next()
888+
let mockRequest = MockHTTPRequest(eventLoop: requestEL)
889+
let request = HTTPConnectionPool.Request(mockRequest)
890+
891+
let executeAction = state.executeRequest(request)
892+
switch i {
893+
case 0:
894+
XCTAssertEqual(executeAction.connection, .cancelTimeoutTimer(generalPurposeConnection.id))
895+
XCTAssertNoThrow(try connections.activateConnection(generalPurposeConnection.id))
896+
XCTAssertEqual(executeAction.request, .executeRequest(request, generalPurposeConnection, cancelTimeout: false))
897+
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
898+
case 1..<100:
899+
XCTAssertEqual(executeAction.request, .executeRequest(request, generalPurposeConnection, cancelTimeout: false))
900+
XCTAssertEqual(executeAction.connection, .none)
901+
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
902+
case 100..<1000:
903+
XCTAssertEqual(executeAction.request, .scheduleRequestTimeout(for: request, on: requestEL))
904+
XCTAssertEqual(executeAction.connection, .none)
905+
XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id))
906+
default:
907+
XCTFail("Unexpected")
908+
}
909+
}
910+
911+
// let's end processing 500 requests. For every finished request, we will execute another one
912+
// right away
913+
while queuer.count > 500 {
914+
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
915+
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
916+
XCTAssertEqual(finishAction.connection, .none)
917+
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else {
918+
return XCTFail("Unexpected request action: \(finishAction.request)")
919+
}
920+
guard requests.count == 1, let request = requests.first else {
921+
return XCTFail("Expected to get exactly one request!")
922+
}
923+
let mockRequest = request.__testOnly_wrapped_request()
924+
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
925+
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
926+
}
927+
928+
XCTAssertEqual(queuer.count, 500)
929+
930+
// Next the server allows for more concurrent streams
931+
let newMaxStreams = 200
932+
XCTAssertNoThrow(try connections.newHTTP2ConnectionSettingsReceived(generalPurposeConnection.id, maxConcurrentStreams: newMaxStreams))
933+
let newMaxStreamsAction = state.newHTTP2MaxConcurrentStreamsReceived(generalPurposeConnection.id, newMaxStreams: newMaxStreams)
934+
XCTAssertEqual(newMaxStreamsAction.connection, .none)
935+
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = newMaxStreamsAction.request else {
936+
return XCTFail("Unexpected request action after new max concurrent stream setting: \(newMaxStreamsAction.request)")
937+
}
938+
XCTAssertEqual(requests.count, 100, "Expected to execute 100 more requests")
939+
for request in requests {
940+
let mockRequest = request.__testOnly_wrapped_request()
941+
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
942+
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
943+
}
944+
945+
XCTAssertEqual(queuer.count, 400)
946+
947+
// let's end processing 100 requests. For every finished request, we will execute another one
948+
// right away
949+
while queuer.count > 300 {
950+
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
951+
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
952+
XCTAssertEqual(finishAction.connection, .none)
953+
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else {
954+
return XCTFail("Unexpected request action: \(finishAction.request)")
955+
}
956+
guard requests.count == 1, let request = requests.first else {
957+
return XCTFail("Expected to get exactly one request!")
958+
}
959+
let mockRequest = request.__testOnly_wrapped_request()
960+
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
961+
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
962+
}
963+
964+
// Next the server allows for fewer concurrent streams
965+
let fewerMaxStreams = 50
966+
XCTAssertNoThrow(try connections.newHTTP2ConnectionSettingsReceived(generalPurposeConnection.id, maxConcurrentStreams: fewerMaxStreams))
967+
let fewerMaxStreamsAction = state.newHTTP2MaxConcurrentStreamsReceived(generalPurposeConnection.id, newMaxStreams: fewerMaxStreams)
968+
XCTAssertEqual(fewerMaxStreamsAction.connection, .none)
969+
XCTAssertEqual(fewerMaxStreamsAction.request, .none)
970+
971+
// for the next 150 requests that are finished, no new request must be executed.
972+
for _ in 0..<150 {
973+
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
974+
XCTAssertEqual(state.http2ConnectionStreamClosed(generalPurposeConnection.id), .none)
975+
}
976+
977+
XCTAssertEqual(queuer.count, 300)
978+
979+
// let's end all remaining requests. For every finished request, we will execute another one
980+
// right away
981+
while queuer.count > 0 {
982+
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
983+
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
984+
XCTAssertEqual(finishAction.connection, .none)
985+
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else {
986+
return XCTFail("Unexpected request action: \(finishAction.request)")
987+
}
988+
guard requests.count == 1, let request = requests.first else {
989+
return XCTFail("Expected to get exactly one request!")
990+
}
991+
let mockRequest = request.__testOnly_wrapped_request()
992+
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
993+
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
994+
}
995+
996+
// Now we only need to drain the remaining 50 requests on the connection
997+
var timeoutTimerScheduled = false
998+
for remaining in stride(from: 50, through: 1, by: -1) {
999+
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
1000+
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
1001+
XCTAssertEqual(finishAction.request, .none)
1002+
switch remaining {
1003+
case 1:
1004+
timeoutTimerScheduled = true
1005+
XCTAssertEqual(finishAction.connection, .scheduleTimeoutTimer(generalPurposeConnection.id, on: generalPurposeConnection.eventLoop))
1006+
XCTAssertNoThrow(try connections.parkConnection(generalPurposeConnection.id))
1007+
case 2...50:
1008+
XCTAssertEqual(finishAction.connection, .none)
1009+
default:
1010+
XCTFail("Unexpected value: \(remaining)")
1011+
}
1012+
}
1013+
XCTAssertTrue(timeoutTimerScheduled)
1014+
XCTAssertNotNil(connections.randomParkedConnection())
1015+
XCTAssertEqual(connections.count, 1)
1016+
}
8721017
}

Diff for: Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift

+99
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ struct MockConnectionPool {
3434
case connectionIsNotStarting
3535
case connectionIsNotExecuting
3636
case connectionDoesNotFulfillEventLoopRequirement
37+
case connectionIsNotActive
38+
case connectionIsNotHTTP2Connection
3739
case connectionDoesNotHaveHTTP2StreamAvailable
3840
case connectionBackoffTimerExists
3941
case connectionBackoffTimerNotFound
@@ -256,6 +258,25 @@ struct MockConnectionPool {
256258
}
257259
}
258260

261+
mutating func newHTTP2SettingsReceived(maxConcurrentStreams newMaxStream: Int) throws {
262+
switch self.state {
263+
case .starting:
264+
throw Errors.connectionIsNotActive
265+
266+
case .http1:
267+
throw Errors.connectionIsNotHTTP2Connection
268+
269+
case .http2(.inUse(_, let used)):
270+
self.state = .http2(.inUse(maxConcurrentStreams: newMaxStream, used: used))
271+
272+
case .http2(.idle(_, let parked, let lastIdle)):
273+
self.state = .http2(.idle(maxConcurrentStreams: newMaxStream, parked: parked, lastIdle: lastIdle))
274+
275+
case .closed:
276+
throw Errors.connectionIsClosed
277+
}
278+
}
279+
259280
mutating func close() throws {
260281
switch self.state {
261282
case .starting:
@@ -378,6 +399,19 @@ struct MockConnectionPool {
378399
self.backoff.insert(connectionID)
379400
}
380401

402+
mutating func newHTTP2ConnectionSettingsReceived(
403+
_ connectionID: Connection.ID,
404+
maxConcurrentStreams: Int
405+
) throws -> Connection {
406+
guard var connection = self.connections[connectionID] else {
407+
throw Errors.connectionNotFound
408+
}
409+
410+
try connection.newHTTP2SettingsReceived(maxConcurrentStreams: maxConcurrentStreams)
411+
self.connections[connection.id] = connection
412+
return .__testOnly_connection(id: connection.id, eventLoop: connection.eventLoop)
413+
}
414+
381415
mutating func connectionBackoffTimerDone(_ connectionID: Connection.ID) throws {
382416
guard self.backoff.remove(connectionID) != nil else {
383417
throw Errors.connectionBackoffTimerNotFound
@@ -561,6 +595,71 @@ extension MockConnectionPool {
561595

562596
return (connections, state)
563597
}
598+
599+
/// Sets up a MockConnectionPool with one established http2 connection
600+
static func http2(
601+
elg: EventLoopGroup,
602+
on eventLoop: EventLoop? = nil,
603+
maxConcurrentStreams: Int = 100
604+
) throws -> (Self, HTTPConnectionPool.StateMachine) {
605+
var state = HTTPConnectionPool.StateMachine(
606+
idGenerator: .init(),
607+
maximumConcurrentHTTP1Connections: 8
608+
)
609+
var connections = MockConnectionPool()
610+
var queuer = MockRequestQueuer()
611+
612+
// 1. Schedule one request to create a connection
613+
614+
let mockRequest = MockHTTPRequest(eventLoop: eventLoop ?? elg.next())
615+
let request = HTTPConnectionPool.Request(mockRequest)
616+
let executeAction = state.executeRequest(request)
617+
618+
guard case .scheduleRequestTimeout(request, on: let waitEL) = executeAction.request, mockRequest.eventLoop === waitEL else {
619+
throw SetupError.expectedRequestToBeAddedToQueue
620+
}
621+
622+
guard case .createConnection(let connectionID, on: let eventLoop) = executeAction.connection else {
623+
throw SetupError.expectedConnectionToBeCreated
624+
}
625+
626+
try connections.createConnection(connectionID, on: eventLoop)
627+
try queuer.queue(mockRequest, id: request.id)
628+
629+
// 2. the connection becomes available
630+
631+
let newConnection = try connections.succeedConnectionCreationHTTP2(connectionID, maxConcurrentStreams: maxConcurrentStreams)
632+
let action = state.newHTTP2ConnectionCreated(newConnection, maxConcurrentStreams: maxConcurrentStreams)
633+
634+
guard case .executeRequestsAndCancelTimeouts([request], newConnection) = action.request else {
635+
throw SetupError.expectedPreviouslyQueuedRequestToBeRunNow
636+
}
637+
638+
guard case .migration(createConnections: let create, closeConnections: [], scheduleTimeout: nil) = action.connection, create.isEmpty else {
639+
throw SetupError.expectedNoConnectionAction
640+
}
641+
642+
guard try queuer.get(request.id, request: request.__testOnly_wrapped_request()) === mockRequest else {
643+
throw SetupError.expectedPreviouslyQueuedRequestToBeRunNow
644+
}
645+
try connections.execute(mockRequest, on: newConnection)
646+
647+
// 3. park connection
648+
649+
try connections.finishExecution(newConnection.id)
650+
651+
let expected: HTTPConnectionPool.StateMachine.ConnectionAction = .scheduleTimeoutTimer(
652+
newConnection.id,
653+
on: newConnection.eventLoop
654+
)
655+
guard state.http2ConnectionStreamClosed(newConnection.id) == .init(request: .none, connection: expected) else {
656+
throw SetupError.expectedConnectionToBeParked
657+
}
658+
659+
try connections.parkConnection(newConnection.id)
660+
661+
return (connections, state)
662+
}
564663
}
565664

566665
/// A request that can be used when testing the `HTTPConnectionPool.StateMachine`

0 commit comments

Comments
 (0)