From 150206e7428be6dc46be12bdfe806144d418cf49 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 20 Oct 2022 16:52:28 +0100 Subject: [PATCH 1/7] Add a connection pool delegate Motivation: It can be useful to observe what the underlying connection pool is doing over time. This could, for example, be instrumenting it to understand how many streams are active at a given time or to understand when a connection is in the processing of being brought up. Modifications: Add new API: - `GRPCConnectionPoolDelegate` which users can implement and configure on the `GRPCChannelPool` to receive notifications about the connection pool state. - `GRPCConnectionID`, an opaque ID to distinguish between different connections in the pool. Modify the connection pool: - Per-EventLoop pools now hold on to quiescing connections and track additional state, including whether a connection is quiescing and how many streams are currently open on a connection. By contrast the pool manager (which manages a number of these per-EventLoop pools) tracks reserved streams (which are not necessarily open). The delegate tracks _opened_ streams rather than _reserved_ streams (as reserved streams are not allocated to a connection but to an any connection running on an appropriate event-loop). - Wire through the new delegate and call out to it in appropriate places. - Add a stream opened function to the internal H2 delegate - Expose various pieces of state from the connection manager Result: Users can instrument the underlying connection pool. --- .../GRPC/ConnectionManager+Delegates.swift | 6 + Sources/GRPC/ConnectionManager.swift | 38 ++ .../ConnectionPool/ConnectionManagerID.swift | 2 +- .../ConnectionPool+PerConnectionState.swift | 52 ++- .../GRPC/ConnectionPool/ConnectionPool.swift | 147 +++++++- .../GRPC/ConnectionPool/GRPCChannelPool.swift | 59 +++ Sources/GRPC/ConnectionPool/PoolManager.swift | 8 +- .../GRPC/ConnectionPool/PooledChannel.swift | 3 +- Sources/GRPC/GRPCIdleHandler.swift | 1 + Tests/GRPCTests/ConnectionManagerTests.swift | 6 + .../ConnectionPool/ConnectionPoolTests.swift | 144 +++++++- .../ConnectionPool/GRPCChannelPoolTests.swift | 335 +++++++++++++++++- .../PoolManagerStateMachineTests.swift | 1 + 13 files changed, 767 insertions(+), 35 deletions(-) diff --git a/Sources/GRPC/ConnectionManager+Delegates.swift b/Sources/GRPC/ConnectionManager+Delegates.swift index c05b51f3e..35d5a870e 100644 --- a/Sources/GRPC/ConnectionManager+Delegates.swift +++ b/Sources/GRPC/ConnectionManager+Delegates.swift @@ -35,6 +35,12 @@ internal protocol ConnectionManagerConnectivityDelegate { } internal protocol ConnectionManagerHTTP2Delegate { + /// An HTTP/2 stream was opened. + /// + /// - Parameters: + /// - connectionManager: The connection manager reporting the opened stream. + func streamOpened(_ connectionManager: ConnectionManager) + /// An HTTP/2 stream was closed. /// /// - Parameters: diff --git a/Sources/GRPC/ConnectionManager.swift b/Sources/GRPC/ConnectionManager.swift index bb7ef471a..931218017 100644 --- a/Sources/GRPC/ConnectionManager.swift +++ b/Sources/GRPC/ConnectionManager.swift @@ -242,6 +242,17 @@ internal final class ConnectionManager { } } + /// Returns whether the state is 'shutdown'. + private var isShutdown: Bool { + self.eventLoop.assertInEventLoop() + switch self.state { + case .shutdown: + return true + case .idle, .connecting, .transientFailure, .active, .ready: + return false + } + } + /// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available. private var multiplexer: HTTP2StreamMultiplexer? { self.eventLoop.assertInEventLoop() @@ -254,6 +265,17 @@ internal final class ConnectionManager { } } + private var channel: Channel? { + self.eventLoop.assertInEventLoop() + switch self.state { + case let .ready(state): + return state.channel + + case .idle, .connecting, .transientFailure, .active, .shutdown: + return nil + } + } + /// The `EventLoop` that the managed connection will run on. internal let eventLoop: EventLoop @@ -807,6 +829,11 @@ internal final class ConnectionManager { } } + internal func streamOpened() { + self.eventLoop.assertInEventLoop() + self.http2Delegate?.streamOpened(self) + } + internal func streamClosed() { self.eventLoop.assertInEventLoop() self.http2Delegate?.streamClosed(self) @@ -1001,12 +1028,23 @@ extension ConnectionManager { return self.manager.isIdle } + /// Returne `true` if the connection is in the shutdown state. + internal var isShutdown: Bool { + return self.manager.isShutdown + } + /// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any /// other state. internal var multiplexer: HTTP2StreamMultiplexer? { return self.manager.multiplexer } + /// Returns the `channel` from a connection in the `ready` state or `nil` if it is any + /// other state. + internal var channel: Channel? { + return self.manager.channel + } + // Start establishing a connection. Must only be called when `isIdle` is `true`. internal func startConnecting() { self.manager.startConnecting() diff --git a/Sources/GRPC/ConnectionPool/ConnectionManagerID.swift b/Sources/GRPC/ConnectionPool/ConnectionManagerID.swift index f5c787000..15f87a0e9 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionManagerID.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionManagerID.swift @@ -15,7 +15,7 @@ */ @usableFromInline -internal struct ConnectionManagerID: Hashable, CustomStringConvertible { +internal struct ConnectionManagerID: Hashable, CustomStringConvertible, GRPCSendable { @usableFromInline internal let _id: ObjectIdentifier diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool+PerConnectionState.swift b/Sources/GRPC/ConnectionPool/ConnectionPool+PerConnectionState.swift index c9be9eddb..02d182919 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool+PerConnectionState.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool+PerConnectionState.swift @@ -26,8 +26,32 @@ extension ConnectionPool { @usableFromInline internal var _availability: StreamAvailability? + @usableFromInline + internal var isQuiescing: Bool { + get { + return self._availability?.isQuiescing ?? false + } + set { + self._availability?.isQuiescing = true + } + } + @usableFromInline internal struct StreamAvailability { + @usableFromInline + struct Utilization { + @usableFromInline + var used: Int + @usableFromInline + var capacity: Int + + @usableFromInline + init(used: Int, capacity: Int) { + self.used = used + self.capacity = capacity + } + } + @usableFromInline var multiplexer: HTTP2StreamMultiplexer /// Maximum number of available streams. @@ -36,24 +60,39 @@ extension ConnectionPool { /// Number of streams reserved. @usableFromInline var reserved: Int = 0 + /// Number of streams opened. + @usableFromInline + var open: Int = 0 + @usableFromInline + var isQuiescing = false /// Number of available streams. @usableFromInline var available: Int { - return self.maxAvailable - self.reserved + return self.isQuiescing ? 0 : self.maxAvailable - self.reserved } /// Increment the reserved streams and return the multiplexer. @usableFromInline mutating func reserve() -> HTTP2StreamMultiplexer { + assert(!self.isQuiescing) self.reserved += 1 return self.multiplexer } + @usableFromInline + mutating func opened() -> Utilization { + self.open += 1 + return .init(used: self.open, capacity: self.maxAvailable) + } + /// Decrement the reserved streams by one. @usableFromInline - mutating func `return`() { + mutating func `return`() -> Utilization { self.reserved -= 1 + self.open -= 1 assert(self.reserved >= 0) + assert(self.open >= 0) + return .init(used: self.open, capacity: self.maxAvailable) } } @@ -92,10 +131,15 @@ extension ConnectionPool { return self._availability?.reserve() } + @usableFromInline + internal mutating func openedStream() -> PerConnectionState.StreamAvailability.Utilization? { + return self._availability?.opened() + } + /// Return a reserved stream to the connection. @usableFromInline - internal mutating func returnStream() { - self._availability?.return() + internal mutating func returnStream() -> PerConnectionState.StreamAvailability.Utilization? { + return self._availability?.return() } /// Update the maximum concurrent streams available on the connection, marking it as available diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index 0056500da..369bd8a53 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -98,6 +98,9 @@ internal final class ConnectionPool { @usableFromInline internal let streamLender: StreamLender + @usableFromInline + internal var delegate: GRPCConnectionPoolDelegate? + /// A logger which always sets "GRPC" as its source. @usableFromInline internal let logger: GRPCLogger @@ -147,6 +150,7 @@ internal final class ConnectionPool { connectionBackoff: ConnectionBackoff, channelProvider: ConnectionManagerChannelProvider, streamLender: StreamLender, + delegate: GRPCConnectionPoolDelegate?, logger: GRPCLogger, now: @escaping () -> NIODeadline = NIODeadline.now ) { @@ -165,6 +169,7 @@ internal final class ConnectionPool { self.connectionBackoff = connectionBackoff self.channelProvider = channelProvider self.streamLender = streamLender + self.delegate = delegate self.logger = logger self.now = now } @@ -191,7 +196,9 @@ internal final class ConnectionPool { http2Delegate: self, logger: self.logger.unwrapped ) - self._connections[manager.id] = PerConnectionState(manager: manager) + let id = manager.id + self._connections[id] = PerConnectionState(manager: manager) + self.delegate?.connectionAdded(id: .init(id)) } // MARK: - Called from the pool manager @@ -397,8 +404,8 @@ internal final class ConnectionPool { // TODO: make this cheaper by storing and incrementally updating the number of idle connections let capacity = self._connections.values.reduce(0) { sum, state in - if state.manager.sync.isIdle { - // Idle connection, no capacity. + if state.manager.sync.isIdle || state.isQuiescing { + // Idle connection or quiescing (so the capacity should be ignored). return sum } else if let knownMaxAvailableStreams = state.maxAvailableStreams { // A known value of max concurrent streams, i.e. the connection is active. @@ -502,6 +509,7 @@ internal final class ConnectionPool { self._state = .shuttingDown(promise.futureResult) promise.futureResult.whenComplete { _ in self._state = .shutdown + self.delegate = nil self.logger.trace("finished shutting down connection pool") } @@ -510,7 +518,21 @@ internal final class ConnectionPool { self._connections.removeAll() let allShutdown = connections.values.map { - $0.manager.shutdown(mode: mode) + let id = $0.manager.id + let manager = $0.manager + + return manager.eventLoop.flatSubmit { + // If the connection was idle/shutdown before calling shutdown then we shouldn't tell + // the delegate the connection closed (because it either never connected or was already + // informed about this). + let connectionIsInactive = manager.sync.isIdle || manager.sync.isShutdown + return manager.shutdown(mode: mode).always { _ in + if !connectionIsInactive { + self.delegate?.connectionClosed(id: .init(id), error: nil) + } + self.delegate?.connectionRemoved(id: .init(id)) + } + } } // Fail the outstanding waiters. @@ -564,27 +586,73 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate { default: () } + + guard let delegate = self.delegate else { return } + + switch (oldState, newState) { + case (.idle, .connecting), + (.transientFailure, .connecting): + delegate.startedConnecting(id: .init(manager.id)) + + case (.connecting, .ready): + // The connection becoming ready is handled by 'receivedSettingsMaxConcurrentStreams'. + () + + case (.ready, .idle): + delegate.connectionClosed(id: .init(manager.id), error: nil) + + case let (.ready, .transientFailure(error)): + delegate.connectionClosed(id: .init(manager.id), error: error) + + case let (.connecting, .transientFailure(error)): + delegate.connectFailed(id: .init(manager.id), error: error) + + default: + () + } } func connectionIsQuiescing(_ manager: ConnectionManager) { self.eventLoop.assertInEventLoop() - guard let removed = self._connections.removeValue(forKey: manager.id) else { - return + + // Find the relevant connection, mark it as quiescing and drop the connectivity delegate as + // these events are no longer relevant. + guard let index = self._connections.index(forKey: manager.id) else { return } + self._connections.values[index].isQuiescing = true + self._connections.values[index].manager.sync.connectivityDelegate = nil + + self.delegate?.connectionQuiescing(id: .init(manager.id)) + + // If there's a close future then the underlying channel is open. It will close eventually when + // open streams are closed, so drop the H2 delegate and update the pool delegate when that + // happens. + if let closeFuture = self._connections.values[index].manager.sync.channel?.closeFuture { + closeFuture.whenComplete { _ in + guard let removed = self._connections.removeValue(forKey: manager.id) else { return } + removed.manager.sync.http2Delegate = nil + self.delegate?.connectionClosed(id: .init(removed.manager.id), error: nil) + self.delegate?.connectionRemoved(id: .init(removed.manager.id)) + } + } else { + // No close future, so no open channel. Remove the delegate and connection. + self._connections.values[index].manager.sync.http2Delegate = nil + self._connections.remove(at: index) + self.delegate?.connectionRemoved(id: .init(manager.id)) } - // Drop any delegates. We're no longer interested in these events. - removed.manager.sync.connectivityDelegate = nil - removed.manager.sync.http2Delegate = nil + // Grab the number of reserved streams (before invalidating the index by adding a connection). + let reservedStreams = self._connections.values[index].reservedStreams // Replace the connection with a new idle one. self.addConnectionToPool() - // Since we're removing this connection from the pool, the pool manager can ignore any streams - // reserved against this connection. + // Since we're removing this connection from the pool (and no new streams can be created on + // the connection), the pool manager can ignore any streams reserved against this connection. + // We do still care about the number of reserved streams for the connection though // - // Note: we don't need to adjust the number of available streams as the number of connections - // hasn't changed. - self.streamLender.returnStreams(removed.reservedStreams, to: self) + // Note: we don't need to adjust the number of available streams as the effective number of + // connections hasn't changed. + self.streamLender.returnStreams(reservedStreams, to: self) } private func updateMostRecentError(_ error: Error) { @@ -606,15 +674,43 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate { } extension ConnectionPool: ConnectionManagerHTTP2Delegate { + internal func streamOpened(_ manager: ConnectionManager) { + self.eventLoop.assertInEventLoop() + if let utilization = self._connections[manager.id]?.openedStream(), + let delegate = self.delegate { + delegate.connectionUtilizationChanged( + id: .init(manager.id), + streamsUsed: utilization.used, + streamCapacity: utilization.capacity + ) + } + } + internal func streamClosed(_ manager: ConnectionManager) { self.eventLoop.assertInEventLoop() + guard let index = self._connections.index(forKey: manager.id) else { + return + } + // Return the stream the connection and to the pool manager. - self._connections[manager.id]?.returnStream() - self.streamLender.returnStreams(1, to: self) + if let utilization = self._connections.values[index].returnStream(), + let delegate = self.delegate { + delegate.connectionUtilizationChanged( + id: .init(manager.id), + streamsUsed: utilization.used, + streamCapacity: utilization.capacity + ) + } - // A stream was returned: we may be able to service a waiter now. - self.tryServiceWaiters() + // Don't return the stream to the pool manager if the connection is quescing, they were returned + // when the connection started quiescing. + if !self._connections.values[index].isQuiescing { + self.streamLender.returnStreams(1, to: self) + + // A stream was returned: we may be able to service a waiter now. + self.tryServiceWaiters() + } } internal func receivedSettingsMaxConcurrentStreams( @@ -623,10 +719,21 @@ extension ConnectionPool: ConnectionManagerHTTP2Delegate { ) { self.eventLoop.assertInEventLoop() + // Find the relevant connection. + guard let index = self._connections.index(forKey: manager.id) else { + return + } + + // When the connection is quiescing, the pool manager is not interested in updates to the + // connection, bail out early. + if self._connections.values[index].isQuiescing { + return + } + // If we received a SETTINGS update then a connection is okay: drop the last known error. self._mostRecentError = nil - let previous = self._connections[manager.id]?.updateMaxConcurrentStreams(maxConcurrentStreams) + let previous = self._connections.values[index].updateMaxConcurrentStreams(maxConcurrentStreams) let delta: Int if let previousValue = previous { @@ -637,6 +744,8 @@ extension ConnectionPool: ConnectionManagerHTTP2Delegate { // There was no previous value so this must be a new connection. We'll compare against our // assumed default. delta = maxConcurrentStreams - self.assumedMaxConcurrentStreams + // Notify the delegate. + self.delegate?.connectSucceeded(id: .init(manager.id), streamCapacity: maxConcurrentStreams) } if delta != 0 { diff --git a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift index 32f5f9898..ea7a23c53 100644 --- a/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift +++ b/Sources/GRPC/ConnectionPool/GRPCChannelPool.swift @@ -177,6 +177,10 @@ extension GRPCChannelPool { /// An error delegate which is called when errors are caught. public var errorDelegate: ClientErrorDelegate? + /// A delegate which will be notified about changes to the state of connections managed by the + /// pool. + public var delegate: GRPCConnectionPoolDelegate? + /// A logger used for background activity, such as connection state changes. public var backgroundActivityLogger = Logger( label: "io.grpc", @@ -287,3 +291,58 @@ extension GRPCChannelPool.Configuration { public var reservationLoadThreshold: Double = 0.9 } } + +/// The ID of a connection in the connection pool. +public struct GRPCConnectionID: Hashable, GRPCSendable, CustomStringConvertible { + private let id: ConnectionManagerID + + public var description: String { + return String(describing: self.id) + } + + internal init(_ id: ConnectionManagerID) { + self.id = id + } +} + +/// A delegate for the connection pool which is notified of various lifecycle events. +/// +/// All functions must execute quickly and may be executed on arbitrary threads. The implementor is +/// responsible for ensuring thread safety. +public protocol GRPCConnectionPoolDelegate: GRPCSendable { + /// A new connection was created with the given ID and added to the pool. The connection is not + /// yet active (or connecting). + /// + /// In most cases ``startedConnecting(id:)`` will be the next function called for the given + /// connection but ``connectionRemoved(id:)`` may also be called. + func connectionAdded(id: GRPCConnectionID) + + /// The connection with the given ID was removed from the pool. + func connectionRemoved(id: GRPCConnectionID) + + /// The connection with the given ID has started trying to establish a connection. The outcome + /// of the connection will be reported as either ``connectSucceeded(id:streamCapacity:)`` or + /// ``connectFailed(id:error:)``. + func startedConnecting(id: GRPCConnectionID) + + /// A connection attempt failed with the given error. After some period of + /// time ``startedConnecting(id:)`` may be called again. + func connectFailed(id: GRPCConnectionID, error: Error) + + /// A connection was established on the connection with the given ID. `streamCapacity` streams are + /// available to use on the connection. The maximum number of available streams may change over + /// time and is reported via ``connectionUtilizationChanged(id:streamsUsed:streamCapacity:)``. The + func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) + + /// The utlization of the connection changed; a stream may have been used, returned or the + /// maximum number of concurrent streams available on the connection changed. + func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) + + /// The remote peer is quiescing the connection: no new streams will be created on it. The + /// connection will eventually be closed and removed from the pool. + func connectionQuiescing(id: GRPCConnectionID) + + /// The connection was closed. The connection may be established again in the future (notified + /// via ``startedConnecting(id:)``). + func connectionClosed(id: GRPCConnectionID, error: Error?) +} diff --git a/Sources/GRPC/ConnectionPool/PoolManager.swift b/Sources/GRPC/ConnectionPool/PoolManager.swift index 8bfca4a07..05bd33641 100644 --- a/Sources/GRPC/ConnectionPool/PoolManager.swift +++ b/Sources/GRPC/ConnectionPool/PoolManager.swift @@ -57,6 +57,9 @@ internal final class PoolManager { @usableFromInline var channelProvider: DefaultChannelProvider + @usableFromInline + var delegate: GRPCConnectionPoolDelegate? + @usableFromInline internal init( maxConnections: Int, @@ -64,7 +67,8 @@ internal final class PoolManager { loadThreshold: Double, assumedMaxConcurrentStreams: Int = 100, connectionBackoff: ConnectionBackoff, - channelProvider: DefaultChannelProvider + channelProvider: DefaultChannelProvider, + delegate: GRPCConnectionPoolDelegate? ) { self.maxConnections = maxConnections self.maxWaiters = maxWaiters @@ -72,6 +76,7 @@ internal final class PoolManager { self.assumedMaxConcurrentStreams = assumedMaxConcurrentStreams self.connectionBackoff = connectionBackoff self.channelProvider = channelProvider + self.delegate = delegate } } @@ -224,6 +229,7 @@ internal final class PoolManager { connectionBackoff: configuration.connectionBackoff, channelProvider: configuration.channelProvider, streamLender: self, + delegate: configuration.delegate, logger: logger ) } diff --git a/Sources/GRPC/ConnectionPool/PooledChannel.swift b/Sources/GRPC/ConnectionPool/PooledChannel.swift index d3a407178..999f87f6c 100644 --- a/Sources/GRPC/ConnectionPool/PooledChannel.swift +++ b/Sources/GRPC/ConnectionPool/PooledChannel.swift @@ -98,7 +98,8 @@ internal final class PooledChannel: GRPCChannel { loadThreshold: configuration.connectionPool.reservationLoadThreshold, assumedMaxConcurrentStreams: 100, connectionBackoff: configuration.connectionBackoff, - channelProvider: provider + channelProvider: provider, + delegate: configuration.delegate ), logger: configuration.backgroundActivityLogger.wrapped ) diff --git a/Sources/GRPC/GRPCIdleHandler.swift b/Sources/GRPC/GRPCIdleHandler.swift index 25ed79d44..b2b87e7af 100644 --- a/Sources/GRPC/GRPCIdleHandler.swift +++ b/Sources/GRPC/GRPCIdleHandler.swift @@ -239,6 +239,7 @@ internal final class GRPCIdleHandler: ChannelInboundHandler { if let created = event as? NIOHTTP2StreamCreatedEvent { self.perform(operations: self.stateMachine.streamCreated(withID: created.streamID)) self.handlePingAction(self.pingHandler.streamCreated()) + self.mode.connectionManager?.streamOpened() context.fireUserInboundEventTriggered(event) } else if let closed = event as? StreamClosedEvent { self.perform(operations: self.stateMachine.streamClosed(withID: closed.streamID)) diff --git a/Tests/GRPCTests/ConnectionManagerTests.swift b/Tests/GRPCTests/ConnectionManagerTests.swift index 5b7ea1c03..ec1358c7e 100644 --- a/Tests/GRPCTests/ConnectionManagerTests.swift +++ b/Tests/GRPCTests/ConnectionManagerTests.swift @@ -1034,9 +1034,14 @@ extension ConnectionManagerTests { ) class HTTP2Delegate: ConnectionManagerHTTP2Delegate { + var streamsOpened = 0 var streamsClosed = 0 var maxConcurrentStreams = 0 + func streamOpened(_ connectionManager: ConnectionManager) { + self.streamsOpened += 1 + } + func streamClosed(_ connectionManager: ConnectionManager) { self.streamsClosed += 1 } @@ -1118,6 +1123,7 @@ extension ConnectionManagerTests { channel.pipeline.fireUserInboundEventTriggered(streamClosed) } + XCTAssertEqual(http2.streamsOpened, 4) XCTAssertEqual(http2.streamsClosed, 4) } diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift index a84bb2e36..df39c1899 100644 --- a/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift @@ -53,6 +53,7 @@ final class ConnectionPoolTests: GRPCTestCase { reservationLoadThreshold: Double = 0.9, now: @escaping () -> NIODeadline = { .now() }, connectionBackoff: ConnectionBackoff = ConnectionBackoff(), + delegate: GRPCConnectionPoolDelegate? = nil, onReservationReturned: @escaping (Int) -> Void = { _ in }, onMaximumReservationsChange: @escaping (Int) -> Void = { _ in }, channelProvider: ConnectionManagerChannelProvider @@ -68,6 +69,7 @@ final class ConnectionPoolTests: GRPCTestCase { onReturnStreams: onReservationReturned, onUpdateMaxAvailableStreams: onMaximumReservationsChange ), + delegate: delegate, logger: self.logger.wrapped, now: now ) @@ -75,10 +77,12 @@ final class ConnectionPoolTests: GRPCTestCase { private func makePool( waiters: Int = 1000, + delegate: GRPCConnectionPoolDelegate? = nil, makeChannel: @escaping (ConnectionManager, EventLoop) -> EventLoopFuture ) -> ConnectionPool { return self.makePool( waiters: waiters, + delegate: delegate, channelProvider: HookedChannelProvider(makeChannel) ) } @@ -88,6 +92,7 @@ final class ConnectionPoolTests: GRPCTestCase { reservationLoadThreshold: Double = 0.9, now: @escaping () -> NIODeadline = { .now() }, connectionBackoff: ConnectionBackoff = ConnectionBackoff(), + delegate: GRPCConnectionPoolDelegate? = nil, onReservationReturned: @escaping (Int) -> Void = { _ in }, onMaximumReservationsChange: @escaping (Int) -> Void = { _ in } ) -> (ConnectionPool, ChannelController) { @@ -97,6 +102,7 @@ final class ConnectionPoolTests: GRPCTestCase { reservationLoadThreshold: reservationLoadThreshold, now: now, connectionBackoff: connectionBackoff, + delegate: delegate, onReservationReturned: onReservationReturned, onMaximumReservationsChange: onMaximumReservationsChange, channelProvider: controller @@ -127,7 +133,9 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertEqual(pool.sync.availableStreams, 0) XCTAssertEqual(pool.sync.reservedStreams, 0) - XCTAssertNoThrow(try pool.shutdown().wait()) + let shutdownFuture = pool.shutdown() + self.eventLoop.run() + XCTAssertNoThrow(try shutdownFuture.wait()) } func testShutdownEmptyPool() { @@ -600,7 +608,8 @@ final class ConnectionPoolTests: GRPCTestCase { // The quiescing connection had 1 stream reserved, it's now returned to the outer pool and we // have a new idle connection in place of the old one. XCTAssertEqual(reservationsReturned, [1]) - XCTAssertEqual(pool.sync.reservedStreams, 0) + // The inner pool still knows about the reserved stream. + XCTAssertEqual(pool.sync.reservedStreams, 1) XCTAssertEqual(pool.sync.availableStreams, 0) XCTAssertEqual(pool.sync.idleConnections, 1) @@ -619,7 +628,8 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertNoThrow(try w2.wait()) controller.openStreamInChannel(atIndex: 1) - XCTAssertEqual(pool.sync.reservedStreams, 1) + // The stream on the quiescing connection is still reserved. + XCTAssertEqual(pool.sync.reservedStreams, 2) XCTAssertEqual(pool.sync.availableStreams, 99) // Return a stream for the _quiescing_ connection: nothing should change in the pool. @@ -865,6 +875,130 @@ final class ConnectionPoolTests: GRPCTestCase { XCTAssertThrowsError(try promise.futureResult.wait()) XCTAssertNil(waiter._scheduledTimeout) } + + func testConnectionPoolDelegate() throws { + let recorder = EventRecordingConnectionPoolDelegate() + let (pool, controller) = self.setUpPoolAndController(delegate: recorder) + pool.initialize(connections: 2) + + func assertConnectionAdded( + _ event: EventRecordingConnectionPoolDelegate.Event? + ) throws -> GRPCConnectionID { + let unwrappedEvent = try XCTUnwrap(event) + switch unwrappedEvent { + case let .connectionAdded(id): + return id + default: + throw EventRecordingConnectionPoolDelegate.UnexpectedEvent(unwrappedEvent) + } + } + + let connID1 = try assertConnectionAdded(recorder.popFirst()) + let connID2 = try assertConnectionAdded(recorder.popFirst()) + + let waiter = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + // Start creating the channel. + self.eventLoop.run() + + let startedConnecting = recorder.popFirst() + let firstConn: GRPCConnectionID + let secondConn: GRPCConnectionID + + if startedConnecting == .startedConnecting(connID1) { + firstConn = connID1 + secondConn = connID2 + } else if startedConnecting == .startedConnecting(connID2) { + firstConn = connID2 + secondConn = connID1 + } else { + return XCTFail("Unexpected event") + } + + // Connect the connection. + self.eventLoop.run() + controller.connectChannel(atIndex: 0) + controller.sendSettingsToChannel(atIndex: 0, maxConcurrentStreams: 10) + XCTAssertEqual(recorder.popFirst(), .connectSucceeded(firstConn, 10)) + + // Open a stream for the waiter. + controller.openStreamInChannel(atIndex: 0) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(firstConn, 1, 10)) + self.eventLoop.run() + XCTAssertNoThrow(try waiter.wait()) + + // Okay, more utilization! + for n in 2 ... 8 { + let w = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + + controller.openStreamInChannel(atIndex: 0) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(firstConn, n, 10)) + self.eventLoop.run() + XCTAssertNoThrow(try w.wait()) + } + + // The utilisation threshold before bringing up a new connection is 0.9; we have 8 open streams + // (out of 10) now so opening the next should trigger a connect on the other connection. + let w9 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + XCTAssertEqual(recorder.popFirst(), .startedConnecting(secondConn)) + + // Deal with the 9th stream. + controller.openStreamInChannel(atIndex: 0) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(firstConn, 9, 10)) + self.eventLoop.run() + XCTAssertNoThrow(try w9.wait()) + + // Bring up the next connection. + controller.connectChannel(atIndex: 1) + controller.sendSettingsToChannel(atIndex: 1, maxConcurrentStreams: 10) + XCTAssertEqual(recorder.popFirst(), .connectSucceeded(secondConn, 10)) + + // The next stream should be on the new connection. + let w10 = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) { + $0.eventLoop.makeSucceededVoidFuture() + } + + // Deal with the 10th stream. + controller.openStreamInChannel(atIndex: 1) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(secondConn, 1, 10)) + self.eventLoop.run() + XCTAssertNoThrow(try w10.wait()) + + // Close the streams. + for i in 1 ... 9 { + controller.closeStreamInChannel(atIndex: 0) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(firstConn, 9 - i, 10)) + } + + controller.closeStreamInChannel(atIndex: 1) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(secondConn, 0, 10)) + + // Close the connections. + controller.fireChannelInactiveForChannel(atIndex: 0) + XCTAssertEqual(recorder.popFirst(), .connectionClosed(firstConn)) + controller.fireChannelInactiveForChannel(atIndex: 1) + XCTAssertEqual(recorder.popFirst(), .connectionClosed(secondConn)) + + // All conns are already closed. + let shutdownFuture = pool.shutdown() + self.eventLoop.run() + XCTAssertNoThrow(try shutdownFuture.wait()) + + // Two connections must be removed. + for _ in 0 ..< 2 { + if let event = recorder.popFirst() { + let id = event.id + XCTAssertEqual(event, .connectionRemoved(id)) + } else { + XCTFail("Expected .connectionRemoved") + } + } + } } extension ConnectionPool { @@ -1031,8 +1165,8 @@ internal struct HookedStreamLender: StreamLender { self.onReturnStreams(count) } - internal func changeStreamCapacity(by max: Int, for pool: ConnectionPool) { - self.onUpdateMaxAvailableStreams(max) + internal func changeStreamCapacity(by delta: Int, for: ConnectionPool) { + self.onUpdateMaxAvailableStreams(delta) } } diff --git a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift index 581db9a34..a2746cfc1 100644 --- a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift @@ -72,9 +72,9 @@ final class GRPCChannelPoolTests: GRPCTestCase { .withServiceProviders([EchoProvider()]) } - private func startServer(withTLS: Bool = false) { + private func startServer(withTLS: Bool = false, port: Int = 0) { self.server = try! self.makeServerBuilder(withTLS: withTLS) - .bind(host: "localhost", port: 0) + .bind(host: "localhost", port: port) .wait() } @@ -104,13 +104,18 @@ final class GRPCChannelPoolTests: GRPCTestCase { } } - private func setUpClientAndServer(withTLS tls: Bool) { - self.configureEventLoopGroup() + private func setUpClientAndServer( + withTLS tls: Bool, + threads: Int = System.coreCount, + _ configure: (inout GRPCChannelPool.Configuration) -> Void = { _ in } + ) { + self.configureEventLoopGroup(threads: threads) self.startServer(withTLS: tls) self.startChannel(withTLS: tls) { // We'll allow any number of waiters since we immediately fire off a bunch of RPCs and don't // want to bounce off the limit as we wait for a connection to come up. $0.connectionPool.maxWaitersPerEventLoop = .max + configure(&$0) } } @@ -435,6 +440,328 @@ final class GRPCChannelPoolTests: GRPCTestCase { XCTFail("Status message did not contain a possible cause: '\(status.message ?? "nil")'") } } + + func testConnectionPoolDelegateSingleConnection() throws { + let recorder = EventRecordingConnectionPoolDelegate() + self.setUpClientAndServer(withTLS: false, threads: 1) { + $0.delegate = recorder + } + + let warmup = self.echo.get(.with { $0.text = "" }) + XCTAssertNoThrow(try warmup.status.wait()) + + let id = try XCTUnwrap(recorder.first?.id) + XCTAssertEqual(recorder.popFirst(), .connectionAdded(id)) + XCTAssertEqual(recorder.popFirst(), .startedConnecting(id)) + XCTAssertEqual(recorder.popFirst(), .connectSucceeded(id, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 1, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 0, 100)) + + let rpcs = try (1 ... 10).map { i in + let rpc = self.echo.collect() + XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait()) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, i, 100)) + return rpc + } + + for (i, rpc) in rpcs.enumerated() { + XCTAssertNoThrow(try rpc.sendEnd().wait()) + XCTAssertNoThrow(try rpc.status.wait()) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 10 - (i + 1), 100)) + } + + XCTAssertNoThrow(try self.channel?.close().wait()) + XCTAssertEqual(recorder.popFirst(), .connectionClosed(id)) + XCTAssertEqual(recorder.popFirst(), .connectionRemoved(id)) + XCTAssert(recorder.isEmpty) + } + + func testConnectionPoolDelegateQuiescing() throws { + let recorder = EventRecordingConnectionPoolDelegate() + self.setUpClientAndServer(withTLS: false, threads: 1) { + $0.delegate = recorder + } + + XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait()) + let id1 = try XCTUnwrap(recorder.first?.id) + XCTAssertEqual(recorder.popFirst(), .connectionAdded(id1)) + XCTAssertEqual(recorder.popFirst(), .startedConnecting(id1)) + XCTAssertEqual(recorder.popFirst(), .connectSucceeded(id1, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 0, 100)) + + // Grab the port and shutdown the server gracefully. + let serverPort = try XCTUnwrap(self.serverPort) + XCTAssertNoThrow(try self.server?.initiateGracefulShutdown().wait()) + + // Start the server again on the same port. + self.startServer(withTLS: false, port: serverPort) + + self.server = nil + self.startServer(withTLS: false) + + XCTAssertEqual(recorder.popFirst(), .connectionClosed(id1)) + } + + func testDelegateCanTellWhenFirstConnectionIsBeingEstablished() { + final class State { + private enum _State { + case idle + case connecting + case connected + } + + private var state: _State = .idle + private let lock = NIOLock() + + var isConnected: Bool { + return self.lock.withLock { + switch self.state { + case .connected: + return true + case .idle, .connecting: + return false + } + } + } + + func startedConnecting() { + self.lock.withLock { + switch self.state { + case .idle: + self.state = .connecting + case .connecting, .connected: + XCTFail("Invalid state \(self.state) for \(#function)") + } + } + } + + func connected() { + self.lock.withLock { + switch self.state { + case .connecting: + self.state = .connected + case .idle, .connected: + XCTFail("Invalid state \(self.state) for \(#function)") + } + } + } + } + + let state = State() + + self.setUpClientAndServer(withTLS: false, threads: 1) { + $0.delegate = IsConnectingDelegate { stateChange in + switch stateChange { + case .connecting: + state.startedConnecting() + case .connected: + state.connected() + } + } + } + + XCTAssertFalse(state.isConnected) + let rpc = self.echo.get(.with { $0.text = "" }) + XCTAssertNoThrow(try rpc.status.wait()) + XCTAssertTrue(state.isConnected) + + // We should be able to do a bunch of other RPCs without the state changing (we'll XCTFail if + // a state change happens). + let rpcs = (0 ..< 20).map { i in + let rpc = self.echo.get(.with { $0.text = "\(i)" }) + return rpc.status + } + XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(rpcs, on: self.group.any()).wait()) + } } +final class IsConnectingDelegate: GRPCConnectionPoolDelegate { + private let lock = NIOLock() + private var connecting = Set() + private var active = Set() + + enum StateNotifacation: Hashable, GRPCSendable { + case connecting + case connected + } + + #if swift(>=5.6) + private let onStateChange: @Sendable (StateNotifacation) -> Void + #else + private let onStateChange: (StateNotifacation) -> Void + #endif + + #if swift(>=5.6) + init(onStateChange: @escaping @Sendable (StateNotifacation) -> Void) { + self.onStateChange = onStateChange + } + #else + init(onStateChange: @escaping (StateNotifacation) -> Void) { + self.onStateChange = onStateChange + } + #endif + + func startedConnecting(id: GRPCConnectionID) { + let didStartConnecting = self.lock.withLock { + let (inserted, _) = self.connecting.insert(id) + // Only intereseted new connection attempts when there are no active connections. + return inserted && self.connecting.count == 1 && self.active.isEmpty + } + + if didStartConnecting { + self.onStateChange(.connecting) + } + } + + func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) { + let didStopConnecting = self.lock.withLock { + let removed = self.connecting.remove(id) != nil + let (inserted, _) = self.active.insert(id) + return removed && inserted && self.active.count == 1 + } + + if didStopConnecting { + self.onStateChange(.connected) + } + } + + func connectionClosed(id: GRPCConnectionID, error: Error?) { + self.lock.withLock { + self.active.remove(id) + self.connecting.remove(id) + } + } + + func connectionQuiescing(id: GRPCConnectionID) { + self.lock.withLock { + _ = self.active.remove(id) + } + } + + // No-op. + func connectionAdded(id: GRPCConnectionID) {} + + // No-op. + func connectionRemoved(id: GRPCConnectionID) {} + + // Conection failures put the connection into a backing off state, we consider that to still + // be 'connecting' at this point. + func connectFailed(id: GRPCConnectionID, error: Error) {} + + // No-op. + func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) {} +} + +#if swift(>=5.6) +extension IsConnectingDelegate: @unchecked Sendable {} +#endif + +final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate { + struct UnexpectedEvent: Error { + var event: Event + + init(_ event: Event) { + self.event = event + } + } + + enum Event: Equatable { + case connectionAdded(GRPCConnectionID) + case startedConnecting(GRPCConnectionID) + case connectFailed(GRPCConnectionID) + case connectSucceeded(GRPCConnectionID, Int) + case connectionClosed(GRPCConnectionID) + case connectionUtilizationChanged(GRPCConnectionID, Int, Int) + case connectionQuiescing(GRPCConnectionID) + case connectionRemoved(GRPCConnectionID) + + var id: GRPCConnectionID { + switch self { + case let .connectionAdded(id), + let .startedConnecting(id), + let .connectFailed(id), + let .connectSucceeded(id, _), + let .connectionClosed(id), + let .connectionUtilizationChanged(id, _, _), + let .connectionQuiescing(id), + let .connectionRemoved(id): + return id + } + } + } + + private var events: CircularBuffer = [] + private let lock = NIOLock() + + var first: Event? { + return self.lock.withLock { + self.events.first + } + } + + var isEmpty: Bool { + return self.lock.withLock { self.events.isEmpty } + } + + func popFirst() -> Event? { + return self.lock.withLock { + self.events.popFirst() + } + } + + func connectionAdded(id: GRPCConnectionID) { + self.lock.withLock { + self.events.append(.connectionAdded(id)) + } + } + + func startedConnecting(id: GRPCConnectionID) { + self.lock.withLock { + self.events.append(.startedConnecting(id)) + } + } + + func connectFailed(id: GRPCConnectionID, error: Error) { + self.lock.withLock { + self.events.append(.connectFailed(id)) + } + } + + func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) { + self.lock.withLock { + self.events.append(.connectSucceeded(id, streamCapacity)) + } + } + + func connectionClosed(id: GRPCConnectionID, error: Error?) { + self.lock.withLock { + self.events.append(.connectionClosed(id)) + } + } + + func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) { + self.lock.withLock { + self.events.append(.connectionUtilizationChanged(id, streamsUsed, streamCapacity)) + } + } + + func connectionQuiescing(id: GRPCConnectionID) { + self.lock.withLock { + self.events.append(.connectionQuiescing(id)) + } + print("quiescing...") + } + + func connectionRemoved(id: GRPCConnectionID) { + self.lock.withLock { + self.events.append(.connectionRemoved(id)) + } + } +} + +#if swift(>=5.6) +extension EventRecordingConnectionPoolDelegate: @unchecked Sendable {} +#endif // swift(>=5.6) + #endif // canImport(NIOSSL) diff --git a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift index 2a663d1a0..5610b9655 100644 --- a/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift +++ b/Tests/GRPCTests/ConnectionPool/PoolManagerStateMachineTests.swift @@ -39,6 +39,7 @@ class PoolManagerStateMachineTests: GRPCTestCase { onReturnStreams: { _ in }, onUpdateMaxAvailableStreams: { _ in } ), + delegate: nil, logger: self.logger.wrapped ) } From 44c62f2f988267f18322e6f40c30b9739276886e Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 26 Oct 2022 11:57:45 +0100 Subject: [PATCH 2/7] Explicit return type --- Sources/GRPC/ConnectionPool/ConnectionPool.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index 369bd8a53..1f6bf2b4e 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -517,7 +517,7 @@ internal final class ConnectionPool { let connections = self._connections self._connections.removeAll() - let allShutdown = connections.values.map { + let allShutdown: [EventLoopFuture] = connections.values.map { let id = $0.manager.id let manager = $0.manager From 7e859395705e419945aead1c65a7a8301294f1e1 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 26 Oct 2022 11:59:39 +0100 Subject: [PATCH 3/7] Move #endif --- Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift index a2746cfc1..869054a71 100644 --- a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift @@ -652,6 +652,7 @@ final class IsConnectingDelegate: GRPCConnectionPoolDelegate { // No-op. func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) {} } +#endif // canImport(NIOSSL) #if swift(>=5.6) extension IsConnectingDelegate: @unchecked Sendable {} @@ -763,5 +764,3 @@ final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate { #if swift(>=5.6) extension EventRecordingConnectionPoolDelegate: @unchecked Sendable {} #endif // swift(>=5.6) - -#endif // canImport(NIOSSL) From 50ef894581cbf62bebf7d011a1e179e72cd4a1f4 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 26 Oct 2022 13:25:54 +0100 Subject: [PATCH 4/7] Move #endif again --- Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift index 869054a71..1df346110 100644 --- a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift @@ -575,6 +575,7 @@ final class GRPCChannelPoolTests: GRPCTestCase { XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(rpcs, on: self.group.any()).wait()) } } +#endif // canImport(NIOSSL) final class IsConnectingDelegate: GRPCConnectionPoolDelegate { private let lock = NIOLock() @@ -652,7 +653,6 @@ final class IsConnectingDelegate: GRPCConnectionPoolDelegate { // No-op. func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) {} } -#endif // canImport(NIOSSL) #if swift(>=5.6) extension IsConnectingDelegate: @unchecked Sendable {} From 4033de4d9a0a5772fb19a0a750e7de943aeec8e6 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 26 Oct 2022 14:35:07 +0100 Subject: [PATCH 5/7] On active connection close --- Sources/GRPC/ConnectionManager.swift | 42 +++++++++++-------- .../GRPC/ConnectionPool/ConnectionPool.swift | 30 +++++++------ 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/Sources/GRPC/ConnectionManager.swift b/Sources/GRPC/ConnectionManager.swift index 931218017..7847da04f 100644 --- a/Sources/GRPC/ConnectionManager.swift +++ b/Sources/GRPC/ConnectionManager.swift @@ -265,17 +265,6 @@ internal final class ConnectionManager { } } - private var channel: Channel? { - self.eventLoop.assertInEventLoop() - switch self.state { - case let .ready(state): - return state.channel - - case .idle, .connecting, .transientFailure, .active, .shutdown: - return nil - } - } - /// The `EventLoop` that the managed connection will run on. internal let eventLoop: EventLoop @@ -604,6 +593,31 @@ internal final class ConnectionManager { } } + /// Registers a callback which fires when the current active connection is closed. + /// + /// If there is a connection, the callback will be invoked with `true` when the connection is + /// closed. Otherwise the callback is invoked with `false`. + internal func onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) { + if self.eventLoop.inEventLoop { + self._onCurrentConnectionClose(onClose) + } else { + self.eventLoop.execute { + self._onCurrentConnectionClose(onClose) + } + } + } + + private func _onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) { + self.eventLoop.assertInEventLoop() + + switch self.state { + case let .ready(state): + state.channel.closeFuture.whenComplete { _ in onClose(true) } + case .idle, .connecting, .active, .transientFailure, .shutdown: + onClose(false) + } + } + // MARK: - State changes from the channel handler. /// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide @@ -1039,12 +1053,6 @@ extension ConnectionManager { return self.manager.multiplexer } - /// Returns the `channel` from a connection in the `ready` state or `nil` if it is any - /// other state. - internal var channel: Channel? { - return self.manager.channel - } - // Start establishing a connection. Must only be called when `isIdle` is `true`. internal func startConnecting() { self.manager.startConnecting() diff --git a/Sources/GRPC/ConnectionPool/ConnectionPool.swift b/Sources/GRPC/ConnectionPool/ConnectionPool.swift index 1f6bf2b4e..554d3b5bb 100644 --- a/Sources/GRPC/ConnectionPool/ConnectionPool.swift +++ b/Sources/GRPC/ConnectionPool/ConnectionPool.swift @@ -615,29 +615,27 @@ extension ConnectionPool: ConnectionManagerConnectivityDelegate { func connectionIsQuiescing(_ manager: ConnectionManager) { self.eventLoop.assertInEventLoop() - // Find the relevant connection, mark it as quiescing and drop the connectivity delegate as - // these events are no longer relevant. - guard let index = self._connections.index(forKey: manager.id) else { return } - self._connections.values[index].isQuiescing = true - self._connections.values[index].manager.sync.connectivityDelegate = nil + // Find the relevant connection. + guard let index = self._connections.index(forKey: manager.id) else { + return + } + + // Drop the connectivity delegate, we're no longer interested in its events now. + manager.sync.connectivityDelegate = nil + // Started quiescing; update our state and notify the pool delegate. + self._connections.values[index].isQuiescing = true self.delegate?.connectionQuiescing(id: .init(manager.id)) - // If there's a close future then the underlying channel is open. It will close eventually when - // open streams are closed, so drop the H2 delegate and update the pool delegate when that - // happens. - if let closeFuture = self._connections.values[index].manager.sync.channel?.closeFuture { - closeFuture.whenComplete { _ in - guard let removed = self._connections.removeValue(forKey: manager.id) else { return } + // As the connection is quescing, we need to know when the current connection its managing has + // closed. When that happens drop the H2 delegate and update the pool delegate. + manager.onCurrentConnectionClose { hadActiveConnection in + assert(hadActiveConnection) + if let removed = self._connections.removeValue(forKey: manager.id) { removed.manager.sync.http2Delegate = nil self.delegate?.connectionClosed(id: .init(removed.manager.id), error: nil) self.delegate?.connectionRemoved(id: .init(removed.manager.id)) } - } else { - // No close future, so no open channel. Remove the delegate and connection. - self._connections.values[index].manager.sync.http2Delegate = nil - self._connections.remove(at: index) - self.delegate?.connectionRemoved(id: .init(manager.id)) } // Grab the number of reserved streams (before invalidating the index by adding a connection). From 8a0418092dbea70b23fe3163d41af86673da22ec Mon Sep 17 00:00:00 2001 From: George Barnett Date: Wed, 26 Oct 2022 15:12:41 +0100 Subject: [PATCH 6/7] fix no NIOSSL build --- .../ConnectionPoolDelegates.swift | 205 ++++++++++++++++ .../ConnectionPool/GRPCChannelPoolTests.swift | 220 ++---------------- 2 files changed, 228 insertions(+), 197 deletions(-) create mode 100644 Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift diff --git a/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift b/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift new file mode 100644 index 000000000..3691706ff --- /dev/null +++ b/Tests/GRPCTests/ConnectionPool/ConnectionPoolDelegates.swift @@ -0,0 +1,205 @@ +/* + * Copyright 2022, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import GRPC +import NIOConcurrencyHelpers +import NIOCore + +final class IsConnectingDelegate: GRPCConnectionPoolDelegate { + private let lock = NIOLock() + private var connecting = Set() + private var active = Set() + + enum StateNotifacation: Hashable, GRPCSendable { + case connecting + case connected + } + + #if swift(>=5.6) + private let onStateChange: @Sendable (StateNotifacation) -> Void + #else + private let onStateChange: (StateNotifacation) -> Void + #endif + + #if swift(>=5.6) + init(onStateChange: @escaping @Sendable (StateNotifacation) -> Void) { + self.onStateChange = onStateChange + } + #else + init(onStateChange: @escaping (StateNotifacation) -> Void) { + self.onStateChange = onStateChange + } + #endif + + func startedConnecting(id: GRPCConnectionID) { + let didStartConnecting: Bool = self.lock.withLock { + let (inserted, _) = self.connecting.insert(id) + // Only intereseted new connection attempts when there are no active connections. + return inserted && self.connecting.count == 1 && self.active.isEmpty + } + + if didStartConnecting { + self.onStateChange(.connecting) + } + } + + func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) { + let didStopConnecting: Bool = self.lock.withLock { + let removed = self.connecting.remove(id) != nil + let (inserted, _) = self.active.insert(id) + return removed && inserted && self.active.count == 1 + } + + if didStopConnecting { + self.onStateChange(.connected) + } + } + + func connectionClosed(id: GRPCConnectionID, error: Error?) { + self.lock.withLock { + self.active.remove(id) + self.connecting.remove(id) + } + } + + func connectionQuiescing(id: GRPCConnectionID) { + self.lock.withLock { + _ = self.active.remove(id) + } + } + + // No-op. + func connectionAdded(id: GRPCConnectionID) {} + + // No-op. + func connectionRemoved(id: GRPCConnectionID) {} + + // Conection failures put the connection into a backing off state, we consider that to still + // be 'connecting' at this point. + func connectFailed(id: GRPCConnectionID, error: Error) {} + + // No-op. + func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) {} +} + +#if swift(>=5.6) +extension IsConnectingDelegate: @unchecked Sendable {} +#endif + +final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate { + struct UnexpectedEvent: Error { + var event: Event + + init(_ event: Event) { + self.event = event + } + } + + enum Event: Equatable { + case connectionAdded(GRPCConnectionID) + case startedConnecting(GRPCConnectionID) + case connectFailed(GRPCConnectionID) + case connectSucceeded(GRPCConnectionID, Int) + case connectionClosed(GRPCConnectionID) + case connectionUtilizationChanged(GRPCConnectionID, Int, Int) + case connectionQuiescing(GRPCConnectionID) + case connectionRemoved(GRPCConnectionID) + + var id: GRPCConnectionID { + switch self { + case let .connectionAdded(id), + let .startedConnecting(id), + let .connectFailed(id), + let .connectSucceeded(id, _), + let .connectionClosed(id), + let .connectionUtilizationChanged(id, _, _), + let .connectionQuiescing(id), + let .connectionRemoved(id): + return id + } + } + } + + private var events: CircularBuffer = [] + private let lock = NIOLock() + + var first: Event? { + return self.lock.withLock { + self.events.first + } + } + + var isEmpty: Bool { + return self.lock.withLock { self.events.isEmpty } + } + + func popFirst() -> Event? { + return self.lock.withLock { + self.events.popFirst() + } + } + + func connectionAdded(id: GRPCConnectionID) { + self.lock.withLock { + self.events.append(.connectionAdded(id)) + } + } + + func startedConnecting(id: GRPCConnectionID) { + self.lock.withLock { + self.events.append(.startedConnecting(id)) + } + } + + func connectFailed(id: GRPCConnectionID, error: Error) { + self.lock.withLock { + self.events.append(.connectFailed(id)) + } + } + + func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) { + self.lock.withLock { + self.events.append(.connectSucceeded(id, streamCapacity)) + } + } + + func connectionClosed(id: GRPCConnectionID, error: Error?) { + self.lock.withLock { + self.events.append(.connectionClosed(id)) + } + } + + func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) { + self.lock.withLock { + self.events.append(.connectionUtilizationChanged(id, streamsUsed, streamCapacity)) + } + } + + func connectionQuiescing(id: GRPCConnectionID) { + self.lock.withLock { + self.events.append(.connectionQuiescing(id)) + } + } + + func connectionRemoved(id: GRPCConnectionID) { + self.lock.withLock { + self.events.append(.connectionRemoved(id)) + } + } +} + +#if swift(>=5.6) +extension EventRecordingConnectionPoolDelegate: @unchecked Sendable {} +#endif // swift(>=5.6) diff --git a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift index 1df346110..e43b67888 100644 --- a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift @@ -490,17 +490,31 @@ final class GRPCChannelPoolTests: GRPCTestCase { XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100)) XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 0, 100)) - // Grab the port and shutdown the server gracefully. - let serverPort = try XCTUnwrap(self.serverPort) - XCTAssertNoThrow(try self.server?.initiateGracefulShutdown().wait()) + // Start an RPC. + let rpc = self.echo.collect() + XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait()) + // Complete another one to make sure the previous one is known by the server. + XCTAssertNoThrow(try self.echo.get(.with { $0.text = "foo" }).status.wait()) - // Start the server again on the same port. - self.startServer(withTLS: false, port: serverPort) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 2, 100)) + XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id1, 1, 100)) - self.server = nil - self.startServer(withTLS: false) + // Start shutting the server down. + let didShutdown = self.server!.initiateGracefulShutdown() + self.server = nil // Avoid shutting down again in tearDown + + // Pause a moment so we know the client received the GOAWAY. + let sleep = self.group.any().scheduleTask(in: .milliseconds(50)) {} + XCTAssertNoThrow(try sleep.futureResult.wait()) + XCTAssertEqual(recorder.popFirst(), .connectionQuiescing(id1)) - XCTAssertEqual(recorder.popFirst(), .connectionClosed(id1)) + // Finish the RPC. + XCTAssertNoThrow(try rpc.sendEnd().wait()) + XCTAssertNoThrow(try rpc.status.wait()) + + // Server should shutdown now. + XCTAssertNoThrow(try didShutdown.wait()) } func testDelegateCanTellWhenFirstConnectionIsBeingEstablished() { @@ -568,7 +582,7 @@ final class GRPCChannelPoolTests: GRPCTestCase { // We should be able to do a bunch of other RPCs without the state changing (we'll XCTFail if // a state change happens). - let rpcs = (0 ..< 20).map { i in + let rpcs: [EventLoopFuture] = (0 ..< 20).map { i in let rpc = self.echo.get(.with { $0.text = "\(i)" }) return rpc.status } @@ -576,191 +590,3 @@ final class GRPCChannelPoolTests: GRPCTestCase { } } #endif // canImport(NIOSSL) - -final class IsConnectingDelegate: GRPCConnectionPoolDelegate { - private let lock = NIOLock() - private var connecting = Set() - private var active = Set() - - enum StateNotifacation: Hashable, GRPCSendable { - case connecting - case connected - } - - #if swift(>=5.6) - private let onStateChange: @Sendable (StateNotifacation) -> Void - #else - private let onStateChange: (StateNotifacation) -> Void - #endif - - #if swift(>=5.6) - init(onStateChange: @escaping @Sendable (StateNotifacation) -> Void) { - self.onStateChange = onStateChange - } - #else - init(onStateChange: @escaping (StateNotifacation) -> Void) { - self.onStateChange = onStateChange - } - #endif - - func startedConnecting(id: GRPCConnectionID) { - let didStartConnecting = self.lock.withLock { - let (inserted, _) = self.connecting.insert(id) - // Only intereseted new connection attempts when there are no active connections. - return inserted && self.connecting.count == 1 && self.active.isEmpty - } - - if didStartConnecting { - self.onStateChange(.connecting) - } - } - - func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) { - let didStopConnecting = self.lock.withLock { - let removed = self.connecting.remove(id) != nil - let (inserted, _) = self.active.insert(id) - return removed && inserted && self.active.count == 1 - } - - if didStopConnecting { - self.onStateChange(.connected) - } - } - - func connectionClosed(id: GRPCConnectionID, error: Error?) { - self.lock.withLock { - self.active.remove(id) - self.connecting.remove(id) - } - } - - func connectionQuiescing(id: GRPCConnectionID) { - self.lock.withLock { - _ = self.active.remove(id) - } - } - - // No-op. - func connectionAdded(id: GRPCConnectionID) {} - - // No-op. - func connectionRemoved(id: GRPCConnectionID) {} - - // Conection failures put the connection into a backing off state, we consider that to still - // be 'connecting' at this point. - func connectFailed(id: GRPCConnectionID, error: Error) {} - - // No-op. - func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) {} -} - -#if swift(>=5.6) -extension IsConnectingDelegate: @unchecked Sendable {} -#endif - -final class EventRecordingConnectionPoolDelegate: GRPCConnectionPoolDelegate { - struct UnexpectedEvent: Error { - var event: Event - - init(_ event: Event) { - self.event = event - } - } - - enum Event: Equatable { - case connectionAdded(GRPCConnectionID) - case startedConnecting(GRPCConnectionID) - case connectFailed(GRPCConnectionID) - case connectSucceeded(GRPCConnectionID, Int) - case connectionClosed(GRPCConnectionID) - case connectionUtilizationChanged(GRPCConnectionID, Int, Int) - case connectionQuiescing(GRPCConnectionID) - case connectionRemoved(GRPCConnectionID) - - var id: GRPCConnectionID { - switch self { - case let .connectionAdded(id), - let .startedConnecting(id), - let .connectFailed(id), - let .connectSucceeded(id, _), - let .connectionClosed(id), - let .connectionUtilizationChanged(id, _, _), - let .connectionQuiescing(id), - let .connectionRemoved(id): - return id - } - } - } - - private var events: CircularBuffer = [] - private let lock = NIOLock() - - var first: Event? { - return self.lock.withLock { - self.events.first - } - } - - var isEmpty: Bool { - return self.lock.withLock { self.events.isEmpty } - } - - func popFirst() -> Event? { - return self.lock.withLock { - self.events.popFirst() - } - } - - func connectionAdded(id: GRPCConnectionID) { - self.lock.withLock { - self.events.append(.connectionAdded(id)) - } - } - - func startedConnecting(id: GRPCConnectionID) { - self.lock.withLock { - self.events.append(.startedConnecting(id)) - } - } - - func connectFailed(id: GRPCConnectionID, error: Error) { - self.lock.withLock { - self.events.append(.connectFailed(id)) - } - } - - func connectSucceeded(id: GRPCConnectionID, streamCapacity: Int) { - self.lock.withLock { - self.events.append(.connectSucceeded(id, streamCapacity)) - } - } - - func connectionClosed(id: GRPCConnectionID, error: Error?) { - self.lock.withLock { - self.events.append(.connectionClosed(id)) - } - } - - func connectionUtilizationChanged(id: GRPCConnectionID, streamsUsed: Int, streamCapacity: Int) { - self.lock.withLock { - self.events.append(.connectionUtilizationChanged(id, streamsUsed, streamCapacity)) - } - } - - func connectionQuiescing(id: GRPCConnectionID) { - self.lock.withLock { - self.events.append(.connectionQuiescing(id)) - } - print("quiescing...") - } - - func connectionRemoved(id: GRPCConnectionID) { - self.lock.withLock { - self.events.append(.connectionRemoved(id)) - } - } -} - -#if swift(>=5.6) -extension EventRecordingConnectionPoolDelegate: @unchecked Sendable {} -#endif // swift(>=5.6) From 73ea26afb960969655586ccc99b8970123efaad7 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 27 Oct 2022 08:21:47 +0100 Subject: [PATCH 7/7] Help out 5.5 compiler --- Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift index e43b67888..b8c560bbb 100644 --- a/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift +++ b/Tests/GRPCTests/ConnectionPool/GRPCChannelPoolTests.swift @@ -457,7 +457,7 @@ final class GRPCChannelPoolTests: GRPCTestCase { XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 1, 100)) XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, 0, 100)) - let rpcs = try (1 ... 10).map { i in + let rpcs: [ClientStreamingCall] = try (1 ... 10).map { i in let rpc = self.echo.collect() XCTAssertNoThrow(try rpc.sendMessage(.with { $0.text = "foo" }).wait()) XCTAssertEqual(recorder.popFirst(), .connectionUtilizationChanged(id, i, 100))