Skip to content

[HTTP2] Test max streams setting is respected. #465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 3, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests {
("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections),
("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration),
("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires),
("testMaxConcurrentStreamsIsRespected", testMaxConcurrentStreamsIsRespected),
]
}
}
Original file line number Diff line number Diff line change
@@ -943,4 +943,149 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {
XCTAssertEqual(action3.request, .none)
XCTAssertEqual(action3.connection, .none)
}

func testMaxConcurrentStreamsIsRespected() {
let elg = EmbeddedEventLoopGroup(loops: 4)
defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) }

guard var (connections, state) = try? MockConnectionPool.http2(elg: elg, maxConcurrentStreams: 100) else {
return XCTFail("Test setup failed")
}

let generalPurposeConnection = connections.randomParkedConnection()!
var queuer = MockRequestQueuer()

// schedule 1000 requests on the pool. The first 100 will be executed right away. All others
// shall be queued.
for i in 0..<1000 {
let requestEL = elg.next()
let mockRequest = MockHTTPRequest(eventLoop: requestEL)
let request = HTTPConnectionPool.Request(mockRequest)

let executeAction = state.executeRequest(request)
switch i {
case 0:
XCTAssertEqual(executeAction.connection, .cancelTimeoutTimer(generalPurposeConnection.id))
XCTAssertNoThrow(try connections.activateConnection(generalPurposeConnection.id))
XCTAssertEqual(executeAction.request, .executeRequest(request, generalPurposeConnection, cancelTimeout: false))
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
case 1..<100:
XCTAssertEqual(executeAction.request, .executeRequest(request, generalPurposeConnection, cancelTimeout: false))
XCTAssertEqual(executeAction.connection, .none)
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
case 100..<1000:
XCTAssertEqual(executeAction.request, .scheduleRequestTimeout(for: request, on: requestEL))
XCTAssertEqual(executeAction.connection, .none)
XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id))
default:
XCTFail("Unexpected")
}
}

// let's end processing 500 requests. For every finished request, we will execute another one
// right away
while queuer.count > 500 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a while loop instead of a for _ in 0..<500?

XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
XCTAssertEqual(finishAction.connection, .none)
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else {
return XCTFail("Unexpected request action: \(finishAction.request)")
}
guard requests.count == 1, let request = requests.first else {
return XCTFail("Expected to get exactly one request!")
}
let mockRequest = request.__testOnly_wrapped_request()
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
}

XCTAssertEqual(queuer.count, 500)

// Next the server allows for more concurrent streams
let newMaxStreams = 200
XCTAssertNoThrow(try connections.newHTTP2ConnectionSettingsReceived(generalPurposeConnection.id, maxConcurrentStreams: newMaxStreams))
let newMaxStreamsAction = state.newHTTP2MaxConcurrentStreamsReceived(generalPurposeConnection.id, newMaxStreams: newMaxStreams)
XCTAssertEqual(newMaxStreamsAction.connection, .none)
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = newMaxStreamsAction.request else {
return XCTFail("Unexpected request action after new max concurrent stream setting: \(newMaxStreamsAction.request)")
}
XCTAssertEqual(requests.count, 100, "Expected to execute 100 more requests")
for request in requests {
let mockRequest = request.__testOnly_wrapped_request()
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
}

XCTAssertEqual(queuer.count, 400)

// let's end processing 100 requests. For every finished request, we will execute another one
// right away
while queuer.count > 300 {
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
XCTAssertEqual(finishAction.connection, .none)
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else {
return XCTFail("Unexpected request action: \(finishAction.request)")
}
guard requests.count == 1, let request = requests.first else {
return XCTFail("Expected to get exactly one request!")
}
let mockRequest = request.__testOnly_wrapped_request()
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
}

// Next the server allows for fewer concurrent streams
let fewerMaxStreams = 50
XCTAssertNoThrow(try connections.newHTTP2ConnectionSettingsReceived(generalPurposeConnection.id, maxConcurrentStreams: fewerMaxStreams))
let fewerMaxStreamsAction = state.newHTTP2MaxConcurrentStreamsReceived(generalPurposeConnection.id, newMaxStreams: fewerMaxStreams)
XCTAssertEqual(fewerMaxStreamsAction.connection, .none)
XCTAssertEqual(fewerMaxStreamsAction.request, .none)

// for the next 150 requests that are finished, no new request must be executed.
for _ in 0..<150 {
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
XCTAssertEqual(state.http2ConnectionStreamClosed(generalPurposeConnection.id), .none)
}

XCTAssertEqual(queuer.count, 300)

// let's end all remaining requests. For every finished request, we will execute another one
// right away
while queuer.count > 0 {
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
XCTAssertEqual(finishAction.connection, .none)
guard case .executeRequestsAndCancelTimeouts(let requests, generalPurposeConnection) = finishAction.request else {
return XCTFail("Unexpected request action: \(finishAction.request)")
}
guard requests.count == 1, let request = requests.first else {
return XCTFail("Expected to get exactly one request!")
}
let mockRequest = request.__testOnly_wrapped_request()
XCTAssertNoThrow(try queuer.get(request.id, request: mockRequest))
XCTAssertNoThrow(try connections.execute(mockRequest, on: generalPurposeConnection))
}

// Now we only need to drain the remaining 50 requests on the connection
var timeoutTimerScheduled = false
for remaining in stride(from: 50, through: 1, by: -1) {
XCTAssertNoThrow(try connections.finishExecution(generalPurposeConnection.id))
let finishAction = state.http2ConnectionStreamClosed(generalPurposeConnection.id)
XCTAssertEqual(finishAction.request, .none)
switch remaining {
case 1:
timeoutTimerScheduled = true
XCTAssertEqual(finishAction.connection, .scheduleTimeoutTimer(generalPurposeConnection.id, on: generalPurposeConnection.eventLoop))
XCTAssertNoThrow(try connections.parkConnection(generalPurposeConnection.id))
case 2...50:
XCTAssertEqual(finishAction.connection, .none)
default:
XCTFail("Unexpected value: \(remaining)")
}
}
XCTAssertTrue(timeoutTimerScheduled)
XCTAssertNotNil(connections.randomParkedConnection())
XCTAssertEqual(connections.count, 1)
}
}
99 changes: 99 additions & 0 deletions Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift
Original file line number Diff line number Diff line change
@@ -34,6 +34,8 @@ struct MockConnectionPool {
case connectionIsNotStarting
case connectionIsNotExecuting
case connectionDoesNotFulfillEventLoopRequirement
case connectionIsNotActive
case connectionIsNotHTTP2Connection
case connectionDoesNotHaveHTTP2StreamAvailable
case connectionBackoffTimerExists
case connectionBackoffTimerNotFound
@@ -256,6 +258,25 @@ struct MockConnectionPool {
}
}

mutating func newHTTP2SettingsReceived(maxConcurrentStreams newMaxStream: Int) throws {
switch self.state {
case .starting:
throw Errors.connectionIsNotActive

case .http1:
throw Errors.connectionIsNotHTTP2Connection

case .http2(.inUse(_, let used)):
self.state = .http2(.inUse(maxConcurrentStreams: newMaxStream, used: used))

case .http2(.idle(_, let parked, let lastIdle)):
self.state = .http2(.idle(maxConcurrentStreams: newMaxStream, parked: parked, lastIdle: lastIdle))

case .closed:
throw Errors.connectionIsClosed
}
}

mutating func close() throws {
switch self.state {
case .starting:
@@ -378,6 +399,19 @@ struct MockConnectionPool {
self.backoff.insert(connectionID)
}

mutating func newHTTP2ConnectionSettingsReceived(
_ connectionID: Connection.ID,
maxConcurrentStreams: Int
) throws -> Connection {
guard var connection = self.connections[connectionID] else {
throw Errors.connectionNotFound
}

try connection.newHTTP2SettingsReceived(maxConcurrentStreams: maxConcurrentStreams)
self.connections[connection.id] = connection
return .__testOnly_connection(id: connection.id, eventLoop: connection.eventLoop)
}

mutating func connectionBackoffTimerDone(_ connectionID: Connection.ID) throws {
guard self.backoff.remove(connectionID) != nil else {
throw Errors.connectionBackoffTimerNotFound
@@ -561,6 +595,71 @@ extension MockConnectionPool {

return (connections, state)
}

/// Sets up a MockConnectionPool with one established http2 connection
static func http2(
elg: EventLoopGroup,
on eventLoop: EventLoop? = nil,
maxConcurrentStreams: Int = 100
) throws -> (Self, HTTPConnectionPool.StateMachine) {
var state = HTTPConnectionPool.StateMachine(
idGenerator: .init(),
maximumConcurrentHTTP1Connections: 8
)
var connections = MockConnectionPool()
var queuer = MockRequestQueuer()

// 1. Schedule one request to create a connection

let mockRequest = MockHTTPRequest(eventLoop: eventLoop ?? elg.next())
let request = HTTPConnectionPool.Request(mockRequest)
let executeAction = state.executeRequest(request)

guard case .scheduleRequestTimeout(request, on: let waitEL) = executeAction.request, mockRequest.eventLoop === waitEL else {
throw SetupError.expectedRequestToBeAddedToQueue
}

guard case .createConnection(let connectionID, on: let eventLoop) = executeAction.connection else {
throw SetupError.expectedConnectionToBeCreated
}

try connections.createConnection(connectionID, on: eventLoop)
try queuer.queue(mockRequest, id: request.id)

// 2. the connection becomes available

let newConnection = try connections.succeedConnectionCreationHTTP2(connectionID, maxConcurrentStreams: maxConcurrentStreams)
let action = state.newHTTP2ConnectionCreated(newConnection, maxConcurrentStreams: maxConcurrentStreams)

guard case .executeRequestsAndCancelTimeouts([request], newConnection) = action.request else {
throw SetupError.expectedPreviouslyQueuedRequestToBeRunNow
}

guard case .migration(createConnections: let create, closeConnections: [], scheduleTimeout: nil) = action.connection, create.isEmpty else {
throw SetupError.expectedNoConnectionAction
}

guard try queuer.get(request.id, request: request.__testOnly_wrapped_request()) === mockRequest else {
throw SetupError.expectedPreviouslyQueuedRequestToBeRunNow
}
try connections.execute(mockRequest, on: newConnection)

// 3. park connection

try connections.finishExecution(newConnection.id)

let expected: HTTPConnectionPool.StateMachine.ConnectionAction = .scheduleTimeoutTimer(
newConnection.id,
on: newConnection.eventLoop
)
guard state.http2ConnectionStreamClosed(newConnection.id) == .init(request: .none, connection: expected) else {
throw SetupError.expectedConnectionToBeParked
}

try connections.parkConnection(newConnection.id)

return (connections, state)
}
}

/// A request that can be used when testing the `HTTPConnectionPool.StateMachine`