Skip to content

[HTTP2StateMachine] test and fix HTTP2 go away #452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,19 +371,20 @@ extension HTTPConnectionPool {
/// This will put the connection into the idle state.
///
/// - Parameter connection: The new established connection.
/// - Returns: An index and an ``AvailableConnectionContext`` to determine the next action for the now idle connection.
/// - Returns: An index and an ``EstablishedConnectionContext`` to determine the next action for the now idle connection.
/// Call ``leaseStreams(at:count:)`` or ``closeConnection(at:)`` with the supplied index after
/// this.
mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> (Int, AvailableConnectionContext) {
mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> (Int, EstablishedConnectionContext) {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connection.id }) else {
preconditionFailure("There is a new connection that we didn't request!")
}
precondition(connection.eventLoop === self.connections[index].eventLoop, "Expected the new connection to be on EL")
let availableStreams = self.connections[index].connected(connection, maxStreams: maxConcurrentStreams)
let context = AvailableConnectionContext(
let context = EstablishedConnectionContext(
availableStreams: availableStreams,
eventLoop: connection.eventLoop,
isIdle: self.connections[index].isIdle
isIdle: self.connections[index].isIdle,
connectionID: connection.id
)
return (index, context)
}
Expand Down Expand Up @@ -419,20 +420,21 @@ extension HTTPConnectionPool {
/// - Parameters:
/// - connectionID: The connectionID for which we received new settings
/// - newMaxStreams: new maximum concurrent streams
/// - Returns: index of the connection and new number of available streams in the `AvailableConnectionContext`
/// - Returns: index of the connection and new number of available streams in the `EstablishedConnectionContext`
/// - Precondition: Connections must be in the `.active` or `.draining` state.
mutating func newHTTP2MaxConcurrentStreamsReceived(
_ connectionID: Connection.ID,
newMaxStreams: Int
) -> (Int, AvailableConnectionContext) {
) -> (Int, EstablishedConnectionContext) {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to update the maximum number of concurrent streams for a connection that does not exists")
}
let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams)
let context = AvailableConnectionContext(
let context = EstablishedConnectionContext(
availableStreams: availableStreams,
eventLoop: self.connections[index].eventLoop,
isIdle: self.connections[index].isIdle
isIdle: self.connections[index].isIdle,
connectionID: connectionID
)
return (index, context)
}
Expand Down Expand Up @@ -471,25 +473,27 @@ extension HTTPConnectionPool {
/// lease `count` streams after connections establishment
/// - Parameters:
/// - index: index of the connection you got by calling `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)`
/// - count: number of streams you want to lease. You get the current available streams from the `AvailableConnectionContext` which `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` returns
/// - count: number of streams you want to lease. You get the current available streams from the `EstablishedConnectionContext` which `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` returns
/// - Returns: connection to execute `count` requests on
/// - precondition: `index` needs to be valid. `count` must be greater than or equal to *0* and not execeed the number of available streams.
/// - precondition: `index` needs to be valid. `count` must be greater than or equal to *1* and not exceed the number of available streams.
mutating func leaseStreams(at index: Int, count: Int) -> (Connection, LeasedStreamContext) {
precondition(count >= 1, "stream lease count must be greater than or equal to 1")
let isIdle = self.connections[index].isIdle
let connection = self.connections[index].lease(count)
let context = LeasedStreamContext(wasIdle: isIdle)
return (connection, context)
}

mutating func releaseStream(_ connectionID: Connection.ID) -> (Int, AvailableConnectionContext) {
mutating func releaseStream(_ connectionID: Connection.ID) -> (Int, EstablishedConnectionContext) {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to release a connection we do not know anything about")
}
let availableStreams = self.connections[index].release()
let context = AvailableConnectionContext(
let context = EstablishedConnectionContext(
availableStreams: availableStreams,
eventLoop: self.connections[index].eventLoop,
isIdle: self.connections[index].isIdle
isIdle: self.connections[index].isIdle,
connectionID: connectionID
)
return (index, context)
}
Expand Down Expand Up @@ -567,14 +571,16 @@ extension HTTPConnectionPool {

// MARK: Result structs

/// Information around an available connection
struct AvailableConnectionContext {
/// Information around a connection which is either in the .active or .draining state.
struct EstablishedConnectionContext {
/// number of streams which can be leased
var availableStreams: Int
/// The eventLoop the connection is running on.
var eventLoop: EventLoop
/// true if no stream is leased
var isIdle: Bool
/// id of the connection
var connectionID: Connection.ID
}

struct LeasedStreamContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ extension HTTPConnectionPool {

private mutating func nextActionForAvailableConnection(
at index: Int,
context: HTTP2Connections.AvailableConnectionContext
context: HTTP2Connections.EstablishedConnectionContext
) -> Action {
switch self.state {
case .running:
Expand All @@ -196,19 +196,22 @@ extension HTTPConnectionPool {
let remainingAvailableStreams = context.availableStreams - requestsToExecute.count
// use the remaining available streams for requests without a required event loop
requestsToExecute += self.requests.popFirst(max: remainingAvailableStreams, for: nil)
let (connection, _) = self.connections.leaseStreams(at: index, count: requestsToExecute.count)

let requestAction = { () -> RequestAction in
if requestsToExecute.isEmpty {
return .none
} else {
// we can only lease streams if the connection has available streams.
// Otherwise we might crash even if we try to lease zero streams,
// because the connection might already be in the draining state.
let (connection, _) = self.connections.leaseStreams(at: index, count: requestsToExecute.count)
return .executeRequestsAndCancelTimeouts(requestsToExecute, connection)
}
}()

let connectionAction = { () -> ConnectionAction in
if context.isIdle, requestsToExecute.isEmpty {
return .scheduleTimeoutTimer(connection.id, on: context.eventLoop)
return .scheduleTimeoutTimer(context.connectionID, on: context.eventLoop)
} else {
return .none
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ extension HTTPConnectionPool_HTTP2StateMachineTests {
("testSchedulingAndCancelingOfIdleTimeout", testSchedulingAndCancelingOfIdleTimeout),
("testConnectionTimeout", testConnectionTimeout),
("testConnectionEstablishmentFailure", testConnectionEstablishmentFailure),
("testGoAwayOnIdleConnection", testGoAwayOnIdleConnection),
("testGoAwayWithLeasedStream", testGoAwayWithLeasedStream),
("testGoAwayWithPendingRequestsStartsNewConnection", testGoAwayWithPendingRequestsStartsNewConnection),
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,4 +435,122 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {
}
XCTAssertEqual(eventLoop.id, el1.id)
}

func testGoAwayOnIdleConnection() {
let elg = EmbeddedEventLoopGroup(loops: 1)
let el1 = elg.next()

// establish one idle http2 connection
let idGenerator = HTTPConnectionPool.Connection.ID.Generator()
var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator)
let conn1ID = http1Conns.createNewConnection(on: el1)
var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator)
let migrationAction = state.migrateConnectionsFromHTTP1(
connections: http1Conns,
requests: HTTPConnectionPool.RequestQueue()
)
XCTAssertEqual(migrationAction, .none)
let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1)
let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100)
XCTAssertEqual(connectAction.request, .none)
XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1))

let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID)
XCTAssertEqual(goAwayAction.request, .none)
XCTAssertEqual(goAwayAction.connection, .none, "Connection is automatically closed by HTTP2IdleHandler")
}

func testGoAwayWithLeasedStream() {
let elg = EmbeddedEventLoopGroup(loops: 1)
let el1 = elg.next()

// establish one idle http2 connection
let idGenerator = HTTPConnectionPool.Connection.ID.Generator()
var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator)
let conn1ID = http1Conns.createNewConnection(on: el1)
var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator)
let migrationAction = state.migrateConnectionsFromHTTP1(
connections: http1Conns,
requests: HTTPConnectionPool.RequestQueue()
)
XCTAssertEqual(migrationAction, .none)
let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1)
let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100)
XCTAssertEqual(connectAction.request, .none)
XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1))

// execute request on idle connection
let mockRequest1 = MockHTTPRequest(eventLoop: el1)
let request1 = HTTPConnectionPool.Request(mockRequest1)
let request1Action = state.executeRequest(request1)
XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false))
XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID))

let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID)
XCTAssertEqual(goAwayAction.request, .none)
XCTAssertEqual(goAwayAction.connection, .none)

// close stream
let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID)
XCTAssertEqual(closeStream1Action.request, .none)
XCTAssertEqual(closeStream1Action.connection, .none, "Connection is automatically closed by HTTP2IdleHandler")
}

func testGoAwayWithPendingRequestsStartsNewConnection() {
let elg = EmbeddedEventLoopGroup(loops: 1)
let el1 = elg.next()

// establish one idle http2 connection
let idGenerator = HTTPConnectionPool.Connection.ID.Generator()
var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator)
let conn1ID = http1Conns.createNewConnection(on: el1)
var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator)
let migrationAction = state.migrateConnectionsFromHTTP1(
connections: http1Conns,
requests: HTTPConnectionPool.RequestQueue()
)
XCTAssertEqual(migrationAction, .none)
let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1)
let connectAction1 = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 1)
XCTAssertEqual(connectAction1.request, .none)
XCTAssertEqual(connectAction1.connection, .scheduleTimeoutTimer(conn1ID, on: el1))

// execute request
let mockRequest1 = MockHTTPRequest(eventLoop: el1)
let request1 = HTTPConnectionPool.Request(mockRequest1)
let request1Action = state.executeRequest(request1)
XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false))
XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID))

// queue request
let mockRequest2 = MockHTTPRequest(eventLoop: el1)
let request2 = HTTPConnectionPool.Request(mockRequest2)
let request2Action = state.executeRequest(request2)
XCTAssertEqual(request2Action.request, .scheduleRequestTimeout(for: request2, on: el1))
XCTAssertEqual(request2Action.connection, .none)

// go away should create a new connection
let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID)
XCTAssertEqual(goAwayAction.request, .none)
guard case .createConnection(let conn2ID, let eventLoop) = goAwayAction.connection else {
return XCTFail("unexpected connection action \(goAwayAction.connection)")
}
XCTAssertEqual(el1.id, eventLoop.id)

// new connection should execute pending request
let conn2 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn2ID, eventLoop: el1)
let connectAction2 = state.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 1)
XCTAssertEqual(connectAction2.request, .executeRequestsAndCancelTimeouts([request2], conn2))
XCTAssertEqual(connectAction2.connection, .none)

// close stream from conn1
let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID)
XCTAssertEqual(closeStream1Action.request, .none)
XCTAssertEqual(closeStream1Action.connection, .none, "Connection is automatically closed by HTTP2IdleHandler")

// close stream from conn2
let closeStream2Action = state.http2ConnectionStreamClosed(conn2ID)
XCTAssertEqual(closeStream2Action.request, .none)
XCTAssertEqual(closeStream2Action.connection, .scheduleTimeoutTimer(conn2ID, on: el1))
}
}