Skip to content

Commit 979adad

Browse files
committed
Fix stream accounting bug when stream close leads to connection close
Motivation: The connection pool manager manages a pool of connections per event-loop. It spreads load across these pools by tracking how many streams a pool has capacity for and how many streams are in use. To facilitate this each pool reports back to the pool manager when streams have been reserved and when they have been returned. If connections are closed unexpectedly (due to an error, for example) then the pool reports this in bulk. However when the streams are closed they are also reported back to the pool manager. This means the manager can end up thinking a pool has a negative number of reserved streams which results in an assertion failure. Modifications: - Check if the connection a stream is being returned to is available before reporting stream closures to the pool manager. Result: - Better stream accounting. - Resolved grpc#1598
1 parent 76ae1e4 commit 979adad

File tree

3 files changed

+52
-3
lines changed

3 files changed

+52
-3
lines changed

Sources/GRPC/ConnectionPool/ConnectionPool+PerConnectionState.swift

+5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ extension ConnectionPool {
2626
@usableFromInline
2727
internal var _availability: StreamAvailability?
2828

29+
@usableFromInline
30+
internal var isAvailable: Bool {
31+
return self._availability != nil
32+
}
33+
2934
@usableFromInline
3035
internal var isQuiescing: Bool {
3136
get {

Sources/GRPC/ConnectionPool/ConnectionPool.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -701,9 +701,9 @@ extension ConnectionPool: ConnectionManagerHTTP2Delegate {
701701
)
702702
}
703703

704-
// Don't return the stream to the pool manager if the connection is quiescing, they were returned
705-
// when the connection started quiescing.
706-
if !self._connections.values[index].isQuiescing {
704+
// Return the stream to the pool manager if the connection is available and not quiescing. For
705+
// quiescing connections streams were returned when the connection started quiescing.
706+
if self._connections.values[index].isAvailable, !self._connections.values[index].isQuiescing {
707707
self.streamLender.returnStreams(1, to: self)
708708

709709
// A stream was returned: we may be able to service a waiter now.

Tests/GRPCTests/ConnectionPool/ConnectionPoolTests.swift

+44
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,50 @@ final class ConnectionPoolTests: GRPCTestCase {
876876
XCTAssertNil(waiter._scheduledTimeout)
877877
}
878878

879+
func testReturnStreamAfterConnectionCloses() throws {
880+
var returnedStreams = 0
881+
let (pool, controller) = self.setUpPoolAndController(onReservationReturned: { returned in
882+
returnedStreams += returned
883+
})
884+
pool.initialize(connections: 1)
885+
886+
let waiter = pool.makeStream(deadline: .distantFuture, logger: self.logger.wrapped) {
887+
$0.eventLoop.makeSucceededVoidFuture()
888+
}
889+
// Start creating the channel.
890+
self.eventLoop.run()
891+
XCTAssertEqual(controller.count, 1)
892+
893+
// Fire up the connection.
894+
controller.connectChannel(atIndex: 0)
895+
controller.sendSettingsToChannel(atIndex: 0, maxConcurrentStreams: 10)
896+
897+
// Run the loop to create the stream, we need to fire the stream creation event too.
898+
self.eventLoop.run()
899+
XCTAssertNoThrow(try waiter.wait())
900+
controller.openStreamInChannel(atIndex: 0)
901+
902+
XCTAssertEqual(pool.sync.waiters, 0)
903+
XCTAssertEqual(pool.sync.availableStreams, 9)
904+
XCTAssertEqual(pool.sync.reservedStreams, 1)
905+
XCTAssertEqual(pool.sync.connections, 1)
906+
907+
// Close all streams on connection 0.
908+
let error = GRPCStatus(code: .internalError, message: nil)
909+
controller.throwError(error, inChannelAtIndex: 0)
910+
controller.fireChannelInactiveForChannel(atIndex: 0)
911+
XCTAssertEqual(returnedStreams, 1)
912+
913+
XCTAssertEqual(pool.sync.waiters, 0)
914+
XCTAssertEqual(pool.sync.availableStreams, 0)
915+
XCTAssertEqual(pool.sync.reservedStreams, 0)
916+
XCTAssertEqual(pool.sync.connections, 1)
917+
918+
// The connection is closed so the stream shouldn't be returned again.
919+
controller.closeStreamInChannel(atIndex: 0)
920+
XCTAssertEqual(returnedStreams, 1)
921+
}
922+
879923
func testConnectionPoolDelegate() throws {
880924
let recorder = EventRecordingConnectionPoolDelegate()
881925
let (pool, controller) = self.setUpPoolAndController(delegate: recorder)

0 commit comments

Comments
 (0)