Skip to content

Commit 184230a

Browse files
authored
Add a connection pool delegate (#1515)
Motivation: It can be useful to observe what the underlying connection pool is doing over time. This could, for example, be instrumenting it to understand how many streams are active at a given time or to understand when a connection is in the processing of being brought up. Modifications: Add new API: - `GRPCConnectionPoolDelegate` which users can implement and configure on the `GRPCChannelPool` to receive notifications about the connection pool state. - `GRPCConnectionID`, an opaque ID to distinguish between different connections in the pool. Modify the connection pool: - Per-EventLoop pools now hold on to quiescing connections and track additional state, including whether a connection is quiescing and how many streams are currently open on a connection. By contrast the pool manager (which manages a number of these per-EventLoop pools) tracks reserved streams (which are not necessarily open). The delegate tracks _opened_ streams rather than _reserved_ streams (as reserved streams are not allocated to a connection but to an any connection running on an appropriate event-loop). - Wire through the new delegate and call out to it in appropriate places. - Add a stream opened function to the internal H2 delegate - Expose various pieces of state from the connection manager Result: Users can instrument the underlying connection pool.
1 parent 85416bc commit 184230a

14 files changed

+804
-36
lines changed

Diff for: Sources/GRPC/ConnectionManager+Delegates.swift

+6
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ internal protocol ConnectionManagerConnectivityDelegate {
3535
}
3636

3737
internal protocol ConnectionManagerHTTP2Delegate {
38+
/// An HTTP/2 stream was opened.
39+
///
40+
/// - Parameters:
41+
/// - connectionManager: The connection manager reporting the opened stream.
42+
func streamOpened(_ connectionManager: ConnectionManager)
43+
3844
/// An HTTP/2 stream was closed.
3945
///
4046
/// - Parameters:

Diff for: Sources/GRPC/ConnectionManager.swift

+46
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,17 @@ internal final class ConnectionManager {
242242
}
243243
}
244244

245+
/// Returns whether the state is 'shutdown'.
246+
private var isShutdown: Bool {
247+
self.eventLoop.assertInEventLoop()
248+
switch self.state {
249+
case .shutdown:
250+
return true
251+
case .idle, .connecting, .transientFailure, .active, .ready:
252+
return false
253+
}
254+
}
255+
245256
/// Returns the `HTTP2StreamMultiplexer` from the 'ready' state or `nil` if it is not available.
246257
private var multiplexer: HTTP2StreamMultiplexer? {
247258
self.eventLoop.assertInEventLoop()
@@ -582,6 +593,31 @@ internal final class ConnectionManager {
582593
}
583594
}
584595

596+
/// Registers a callback which fires when the current active connection is closed.
597+
///
598+
/// If there is a connection, the callback will be invoked with `true` when the connection is
599+
/// closed. Otherwise the callback is invoked with `false`.
600+
internal func onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
601+
if self.eventLoop.inEventLoop {
602+
self._onCurrentConnectionClose(onClose)
603+
} else {
604+
self.eventLoop.execute {
605+
self._onCurrentConnectionClose(onClose)
606+
}
607+
}
608+
}
609+
610+
private func _onCurrentConnectionClose(_ onClose: @escaping (Bool) -> Void) {
611+
self.eventLoop.assertInEventLoop()
612+
613+
switch self.state {
614+
case let .ready(state):
615+
state.channel.closeFuture.whenComplete { _ in onClose(true) }
616+
case .idle, .connecting, .active, .transientFailure, .shutdown:
617+
onClose(false)
618+
}
619+
}
620+
585621
// MARK: - State changes from the channel handler.
586622

587623
/// The channel caught an error. Hold on to it until the channel becomes inactive, it may provide
@@ -807,6 +843,11 @@ internal final class ConnectionManager {
807843
}
808844
}
809845

846+
internal func streamOpened() {
847+
self.eventLoop.assertInEventLoop()
848+
self.http2Delegate?.streamOpened(self)
849+
}
850+
810851
internal func streamClosed() {
811852
self.eventLoop.assertInEventLoop()
812853
self.http2Delegate?.streamClosed(self)
@@ -1001,6 +1042,11 @@ extension ConnectionManager {
10011042
return self.manager.isIdle
10021043
}
10031044

1045+
/// Returne `true` if the connection is in the shutdown state.
1046+
internal var isShutdown: Bool {
1047+
return self.manager.isShutdown
1048+
}
1049+
10041050
/// Returns the `multiplexer` from a connection in the `ready` state or `nil` if it is any
10051051
/// other state.
10061052
internal var multiplexer: HTTP2StreamMultiplexer? {

Diff for: Sources/GRPC/ConnectionPool/ConnectionManagerID.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
@usableFromInline
18-
internal struct ConnectionManagerID: Hashable, CustomStringConvertible {
18+
internal struct ConnectionManagerID: Hashable, CustomStringConvertible, GRPCSendable {
1919
@usableFromInline
2020
internal let _id: ObjectIdentifier
2121

Diff for: Sources/GRPC/ConnectionPool/ConnectionPool+PerConnectionState.swift

+48-4
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,32 @@ extension ConnectionPool {
2626
@usableFromInline
2727
internal var _availability: StreamAvailability?
2828

29+
@usableFromInline
30+
internal var isQuiescing: Bool {
31+
get {
32+
return self._availability?.isQuiescing ?? false
33+
}
34+
set {
35+
self._availability?.isQuiescing = true
36+
}
37+
}
38+
2939
@usableFromInline
3040
internal struct StreamAvailability {
41+
@usableFromInline
42+
struct Utilization {
43+
@usableFromInline
44+
var used: Int
45+
@usableFromInline
46+
var capacity: Int
47+
48+
@usableFromInline
49+
init(used: Int, capacity: Int) {
50+
self.used = used
51+
self.capacity = capacity
52+
}
53+
}
54+
3155
@usableFromInline
3256
var multiplexer: HTTP2StreamMultiplexer
3357
/// Maximum number of available streams.
@@ -36,24 +60,39 @@ extension ConnectionPool {
3660
/// Number of streams reserved.
3761
@usableFromInline
3862
var reserved: Int = 0
63+
/// Number of streams opened.
64+
@usableFromInline
65+
var open: Int = 0
66+
@usableFromInline
67+
var isQuiescing = false
3968
/// Number of available streams.
4069
@usableFromInline
4170
var available: Int {
42-
return self.maxAvailable - self.reserved
71+
return self.isQuiescing ? 0 : self.maxAvailable - self.reserved
4372
}
4473

4574
/// Increment the reserved streams and return the multiplexer.
4675
@usableFromInline
4776
mutating func reserve() -> HTTP2StreamMultiplexer {
77+
assert(!self.isQuiescing)
4878
self.reserved += 1
4979
return self.multiplexer
5080
}
5181

82+
@usableFromInline
83+
mutating func opened() -> Utilization {
84+
self.open += 1
85+
return .init(used: self.open, capacity: self.maxAvailable)
86+
}
87+
5288
/// Decrement the reserved streams by one.
5389
@usableFromInline
54-
mutating func `return`() {
90+
mutating func `return`() -> Utilization {
5591
self.reserved -= 1
92+
self.open -= 1
5693
assert(self.reserved >= 0)
94+
assert(self.open >= 0)
95+
return .init(used: self.open, capacity: self.maxAvailable)
5796
}
5897
}
5998

@@ -92,10 +131,15 @@ extension ConnectionPool {
92131
return self._availability?.reserve()
93132
}
94133

134+
@usableFromInline
135+
internal mutating func openedStream() -> PerConnectionState.StreamAvailability.Utilization? {
136+
return self._availability?.opened()
137+
}
138+
95139
/// Return a reserved stream to the connection.
96140
@usableFromInline
97-
internal mutating func returnStream() {
98-
self._availability?.return()
141+
internal mutating func returnStream() -> PerConnectionState.StreamAvailability.Utilization? {
142+
return self._availability?.return()
99143
}
100144

101145
/// Update the maximum concurrent streams available on the connection, marking it as available

0 commit comments

Comments
 (0)