From aaac06f972ddc2258493d9590d3cb83c101eec55 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 5 Oct 2021 11:27:27 +0200 Subject: [PATCH] test and fix go away in HTTP2StateMachine --- .../HTTPConnectionPool+HTTP2Connections.swift | 36 +++--- ...HTTPConnectionPool+HTTP2StateMachine.swift | 9 +- ...onPool+HTTP2StateMachineTests+XCTest.swift | 3 + ...onnectionPool+HTTP2StateMachineTests.swift | 118 ++++++++++++++++++ 4 files changed, 148 insertions(+), 18 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 6e82773ab..28b9517b6 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -371,19 +371,20 @@ extension HTTPConnectionPool { /// This will put the connection into the idle state. /// /// - Parameter connection: The new established connection. - /// - Returns: An index and an ``AvailableConnectionContext`` to determine the next action for the now idle connection. + /// - Returns: An index and an ``EstablishedConnectionContext`` to determine the next action for the now idle connection. /// Call ``leaseStreams(at:count:)`` or ``closeConnection(at:)`` with the supplied index after /// this. - mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> (Int, AvailableConnectionContext) { + mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> (Int, EstablishedConnectionContext) { guard let index = self.connections.firstIndex(where: { $0.connectionID == connection.id }) else { preconditionFailure("There is a new connection that we didn't request!") } precondition(connection.eventLoop === self.connections[index].eventLoop, "Expected the new connection to be on EL") let availableStreams = self.connections[index].connected(connection, maxStreams: maxConcurrentStreams) - let context = AvailableConnectionContext( + let context = EstablishedConnectionContext( availableStreams: availableStreams, eventLoop: connection.eventLoop, - isIdle: self.connections[index].isIdle + isIdle: self.connections[index].isIdle, + connectionID: connection.id ) return (index, context) } @@ -419,20 +420,21 @@ extension HTTPConnectionPool { /// - Parameters: /// - connectionID: The connectionID for which we received new settings /// - newMaxStreams: new maximum concurrent streams - /// - Returns: index of the connection and new number of available streams in the `AvailableConnectionContext` + /// - Returns: index of the connection and new number of available streams in the `EstablishedConnectionContext` /// - Precondition: Connections must be in the `.active` or `.draining` state. mutating func newHTTP2MaxConcurrentStreamsReceived( _ connectionID: Connection.ID, newMaxStreams: Int - ) -> (Int, AvailableConnectionContext) { + ) -> (Int, EstablishedConnectionContext) { guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { preconditionFailure("We tried to update the maximum number of concurrent streams for a connection that does not exists") } let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams) - let context = AvailableConnectionContext( + let context = EstablishedConnectionContext( availableStreams: availableStreams, eventLoop: self.connections[index].eventLoop, - isIdle: self.connections[index].isIdle + isIdle: self.connections[index].isIdle, + connectionID: connectionID ) return (index, context) } @@ -471,25 +473,27 @@ extension HTTPConnectionPool { /// lease `count` streams after connections establishment /// - Parameters: /// - index: index of the connection you got by calling `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` - /// - count: number of streams you want to lease. You get the current available streams from the `AvailableConnectionContext` which `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` returns + /// - count: number of streams you want to lease. You get the current available streams from the `EstablishedConnectionContext` which `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` returns /// - Returns: connection to execute `count` requests on - /// - precondition: `index` needs to be valid. `count` must be greater than or equal to *0* and not execeed the number of available streams. + /// - precondition: `index` needs to be valid. `count` must be greater than or equal to *1* and not exceed the number of available streams. mutating func leaseStreams(at index: Int, count: Int) -> (Connection, LeasedStreamContext) { + precondition(count >= 1, "stream lease count must be greater than or equal to 1") let isIdle = self.connections[index].isIdle let connection = self.connections[index].lease(count) let context = LeasedStreamContext(wasIdle: isIdle) return (connection, context) } - mutating func releaseStream(_ connectionID: Connection.ID) -> (Int, AvailableConnectionContext) { + mutating func releaseStream(_ connectionID: Connection.ID) -> (Int, EstablishedConnectionContext) { guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { preconditionFailure("We tried to release a connection we do not know anything about") } let availableStreams = self.connections[index].release() - let context = AvailableConnectionContext( + let context = EstablishedConnectionContext( availableStreams: availableStreams, eventLoop: self.connections[index].eventLoop, - isIdle: self.connections[index].isIdle + isIdle: self.connections[index].isIdle, + connectionID: connectionID ) return (index, context) } @@ -567,14 +571,16 @@ extension HTTPConnectionPool { // MARK: Result structs - /// Information around an available connection - struct AvailableConnectionContext { + /// Information around a connection which is either in the .active or .draining state. + struct EstablishedConnectionContext { /// number of streams which can be leased var availableStreams: Int /// The eventLoop the connection is running on. var eventLoop: EventLoop /// true if no stream is leased var isIdle: Bool + /// id of the connection + var connectionID: Connection.ID } struct LeasedStreamContext { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 5a79f93c5..355c1e7be 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -184,7 +184,7 @@ extension HTTPConnectionPool { private mutating func nextActionForAvailableConnection( at index: Int, - context: HTTP2Connections.AvailableConnectionContext + context: HTTP2Connections.EstablishedConnectionContext ) -> Action { switch self.state { case .running: @@ -196,19 +196,22 @@ extension HTTPConnectionPool { let remainingAvailableStreams = context.availableStreams - requestsToExecute.count // use the remaining available streams for requests without a required event loop requestsToExecute += self.requests.popFirst(max: remainingAvailableStreams, for: nil) - let (connection, _) = self.connections.leaseStreams(at: index, count: requestsToExecute.count) let requestAction = { () -> RequestAction in if requestsToExecute.isEmpty { return .none } else { + // we can only lease streams if the connection has available streams. + // Otherwise we might crash even if we try to lease zero streams, + // because the connection might already be in the draining state. + let (connection, _) = self.connections.leaseStreams(at: index, count: requestsToExecute.count) return .executeRequestsAndCancelTimeouts(requestsToExecute, connection) } }() let connectionAction = { () -> ConnectionAction in if context.isIdle, requestsToExecute.isEmpty { - return .scheduleTimeoutTimer(connection.id, on: context.eventLoop) + return .scheduleTimeoutTimer(context.connectionID, on: context.eventLoop) } else { return .none } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index de1ca4667..a54d8c578 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -33,6 +33,9 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testSchedulingAndCancelingOfIdleTimeout", testSchedulingAndCancelingOfIdleTimeout), ("testConnectionTimeout", testConnectionTimeout), ("testConnectionEstablishmentFailure", testConnectionEstablishmentFailure), + ("testGoAwayOnIdleConnection", testGoAwayOnIdleConnection), + ("testGoAwayWithLeasedStream", testGoAwayWithLeasedStream), + ("testGoAwayWithPendingRequestsStartsNewConnection", testGoAwayWithPendingRequestsStartsNewConnection), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index bbef727e0..5bcc5fb02 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -435,4 +435,122 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { } XCTAssertEqual(eventLoop.id, el1.id) } + + func testGoAwayOnIdleConnection() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + + // establish one idle http2 connection + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) + let conn1ID = http1Conns.createNewConnection(on: el1) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + let migrationAction = state.migrateConnectionsFromHTTP1( + connections: http1Conns, + requests: HTTPConnectionPool.RequestQueue() + ) + XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) + let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(connectAction.request, .none) + XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + + let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID) + XCTAssertEqual(goAwayAction.request, .none) + XCTAssertEqual(goAwayAction.connection, .none, "Connection is automatically closed by HTTP2IdleHandler") + } + + func testGoAwayWithLeasedStream() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + + // establish one idle http2 connection + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) + let conn1ID = http1Conns.createNewConnection(on: el1) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + let migrationAction = state.migrateConnectionsFromHTTP1( + connections: http1Conns, + requests: HTTPConnectionPool.RequestQueue() + ) + XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) + let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(connectAction.request, .none) + XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + + // execute request on idle connection + let mockRequest1 = MockHTTPRequest(eventLoop: el1) + let request1 = HTTPConnectionPool.Request(mockRequest1) + let request1Action = state.executeRequest(request1) + XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false)) + XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID)) + + let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID) + XCTAssertEqual(goAwayAction.request, .none) + XCTAssertEqual(goAwayAction.connection, .none) + + // close stream + let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID) + XCTAssertEqual(closeStream1Action.request, .none) + XCTAssertEqual(closeStream1Action.connection, .none, "Connection is automatically closed by HTTP2IdleHandler") + } + + func testGoAwayWithPendingRequestsStartsNewConnection() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + + // establish one idle http2 connection + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) + let conn1ID = http1Conns.createNewConnection(on: el1) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + let migrationAction = state.migrateConnectionsFromHTTP1( + connections: http1Conns, + requests: HTTPConnectionPool.RequestQueue() + ) + XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) + let connectAction1 = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 1) + XCTAssertEqual(connectAction1.request, .none) + XCTAssertEqual(connectAction1.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + + // execute request + let mockRequest1 = MockHTTPRequest(eventLoop: el1) + let request1 = HTTPConnectionPool.Request(mockRequest1) + let request1Action = state.executeRequest(request1) + XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false)) + XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID)) + + // queue request + let mockRequest2 = MockHTTPRequest(eventLoop: el1) + let request2 = HTTPConnectionPool.Request(mockRequest2) + let request2Action = state.executeRequest(request2) + XCTAssertEqual(request2Action.request, .scheduleRequestTimeout(for: request2, on: el1)) + XCTAssertEqual(request2Action.connection, .none) + + // go away should create a new connection + let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID) + XCTAssertEqual(goAwayAction.request, .none) + guard case .createConnection(let conn2ID, let eventLoop) = goAwayAction.connection else { + return XCTFail("unexpected connection action \(goAwayAction.connection)") + } + XCTAssertEqual(el1.id, eventLoop.id) + + // new connection should execute pending request + let conn2 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn2ID, eventLoop: el1) + let connectAction2 = state.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 1) + XCTAssertEqual(connectAction2.request, .executeRequestsAndCancelTimeouts([request2], conn2)) + XCTAssertEqual(connectAction2.connection, .none) + + // close stream from conn1 + let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID) + XCTAssertEqual(closeStream1Action.request, .none) + XCTAssertEqual(closeStream1Action.connection, .none, "Connection is automatically closed by HTTP2IdleHandler") + + // close stream from conn2 + let closeStream2Action = state.http2ConnectionStreamClosed(conn2ID) + XCTAssertEqual(closeStream2Action.request, .none) + XCTAssertEqual(closeStream2Action.connection, .scheduleTimeoutTimer(conn2ID, on: el1)) + } }