Skip to content

Don't trap on invalid connection state transitions #1573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 80 additions & 70 deletions Sources/GRPC/ConnectionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ internal final class ConnectionManager {
var scheduled: Scheduled<Void>
var reason: Error

init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error?) {
self.backoffIterator = state.backoffIterator
self.readyChannelMuxPromise = state.readyChannelMuxPromise
self.scheduled = scheduled
self.reason = reason
self.reason = reason ?? GRPCStatus(
code: .unavailable,
message: "Unexpected connection drop"
)
}

init(from state: ConnectedState, scheduled: Scheduled<Void>) {
Expand Down Expand Up @@ -391,7 +394,7 @@ internal final class ConnectionManager {
self.startConnecting()
// We started connecting so we must transition to the `connecting` state.
guard case let .connecting(connecting) = self.state else {
self.invalidState()
self.unreachableState()
}
multiplexer = connecting.readyChannelMuxPromise.futureResult

Expand Down Expand Up @@ -432,7 +435,7 @@ internal final class ConnectionManager {
self.startConnecting()
// We started connecting so we must transition to the `connecting` state.
guard case let .connecting(connecting) = self.state else {
self.invalidState()
self.unreachableState()
}
return connecting.candidateMuxPromise.futureResult
case let .connecting(state):
Expand Down Expand Up @@ -674,20 +677,13 @@ internal final class ConnectionManager {
case .shutdown:
channel.close(mode: .all, promise: nil)

// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
case .idle:
self.invalidState()

case .active:
self.invalidState()

case .ready:
self.invalidState()

case .transientFailure:
self.invalidState()
case .idle, .transientFailure:
// Received a channelActive when not connecting. Can happen if channelActive and
// channelInactive are reordered. Ignore.
()
case .active, .ready:
// Received a second 'channelActive', already active so ignore.
()
}
}

Expand All @@ -700,6 +696,43 @@ internal final class ConnectionManager {
])

switch self.state {
// We can hit inactive in connecting if we see channelInactive before channelActive; that's not
// common but we should tolerate it.
case let .connecting(connecting):
// Should we try connecting again?
switch connecting.reconnect {
// No, shutdown instead.
case .none:
self.logger.debug("shutting down connection")

let error = GRPCStatus(
code: .unavailable,
message: "The connection was dropped and connection re-establishment is disabled"
)

let shutdownState = ShutdownState(
closeFuture: self.eventLoop.makeSucceededFuture(()),
reason: error
)

self.state = .shutdown(shutdownState)
// Shutting down, so fail the outstanding promises.
connecting.readyChannelMuxPromise.fail(error)
connecting.candidateMuxPromise.fail(error)

// Yes, after some time.
case let .after(delay):
let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
// Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again.
connecting.candidateMuxPromise.fail(error)

let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
self.startConnecting()
}
self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
self.state = .transientFailure(.init(from: connecting, scheduled: scheduled, reason: nil))
}

// The channel is `active` but not `ready`. Should we try again?
case let .active(active):
switch active.reconnect {
Expand Down Expand Up @@ -766,14 +799,9 @@ internal final class ConnectionManager {
case .shutdown:
()

// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
case .connecting:
self.invalidState()

// Received 'channelInactive' twice; fine, ignore.
case .transientFailure:
self.invalidState()
()
}
}

Expand All @@ -793,20 +821,20 @@ internal final class ConnectionManager {
case .shutdown:
()

// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
case .idle:
self.invalidState()

case .transientFailure:
self.invalidState()
case .idle, .transientFailure:
// No connection or connection attempt exists but connection was marked as ready. This is
// strange. Ignore it in release mode as there's nothing to close and nowehere to fire an
// error to.
assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")

case .connecting:
self.invalidState()
// No channel exists to receive initial HTTP/2 SETTINGS frame on... weird. Ignore in release
// mode.
assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")

case .ready:
self.invalidState()
// Already received initial HTTP/2 SETTINGS frame; ignore in release mode.
assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
}
}

Expand Down Expand Up @@ -834,17 +862,14 @@ internal final class ConnectionManager {
// 'channelInactive()'.
()

// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
case .idle:
self.invalidState()
case .idle, .transientFailure:
// There's no connection to idle; ignore.
()

case .connecting:
self.invalidState()

case .transientFailure:
self.invalidState()
// The idle watchdog is started when the connection is active, this shouldn't happen
// in the connecting state. Ignore it in release mode.
assertionFailure("tried to idle a connection in the \(self.state.label) state")
}
}

Expand Down Expand Up @@ -908,22 +933,10 @@ extension ConnectionManager {
case .shutdown:
()

// We can't fail to connect if we aren't trying.
//
// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
case .idle:
self.invalidState()

case .active:
self.invalidState()

case .ready:
self.invalidState()

case .transientFailure:
self.invalidState()
// Connection attempt failed, but no connection attempt is in progress.
case .idle, .active, .ready, .transientFailure:
// Nothing we can do other than ignore in release mode.
assertionFailure("connect promise failed in \(self.state.label) state")
}
}
}
Expand Down Expand Up @@ -951,17 +964,14 @@ extension ConnectionManager {
case .shutdown:
()

// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
// We only call startConnecting() if the connection does not exist and after checking what the
// current state is, so none of these states should be reachable.
case .connecting:
self.invalidState()

self.unreachableState()
case .active:
self.invalidState()

self.unreachableState()
case .ready:
self.invalidState()
self.unreachableState()
}
}

Expand Down Expand Up @@ -1066,11 +1076,11 @@ extension ConnectionManager {
}

extension ConnectionManager {
private func invalidState(
private func unreachableState(
function: StaticString = #function,
file: StaticString = #fileID,
line: UInt = #line
) -> Never {
preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
fatalError("Invalid state \(self.state) for \(function)", file: file, line: line)
}
}
113 changes: 113 additions & 0 deletions Tests/GRPCTests/ConnectionManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,119 @@ extension ConnectionManagerTests {
}
}

func testChannelInactiveBeforeActiveWithNoReconnect() throws {
let channel = EmbeddedChannel(loop: self.loop)
let channelPromise = self.loop.makePromise(of: Channel.self)

let manager = self.makeConnectionManager { _, _ in
return channelPromise.futureResult
}

// Start the connection.
self.waitForStateChange(from: .idle, to: .connecting) {
// Triggers the connect.
_ = manager.getHTTP2Multiplexer()
self.loop.run()
}

try channel.pipeline.syncOperations.addHandler(
GRPCIdleHandler(
connectionManager: manager,
multiplexer: HTTP2StreamMultiplexer(
mode: .client,
channel: channel,
inboundStreamInitializer: nil
),
idleTimeout: .minutes(5),
keepalive: .init(),
logger: self.logger
)
)
channelPromise.succeed(channel)
// Oops: wrong way around. We should tolerate this.
self.waitForStateChange(from: .connecting, to: .shutdown) {
channel.pipeline.fireChannelInactive()
}

// Should be ignored.
channel.pipeline.fireChannelActive()
}

func testChannelInactiveBeforeActiveWillReconnect() throws {
var channels = [EmbeddedChannel(loop: self.loop), EmbeddedChannel(loop: self.loop)]
var channelPromises: [EventLoopPromise<Channel>] = [self.loop.makePromise(),
self.loop.makePromise()]
var channelFutures = Array(channelPromises.map { $0.futureResult })

var configuration = self.defaultConfiguration
configuration.connectionBackoff = .oneSecondFixed

let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
return channelFutures.removeLast()
}

// Start the connection.
self.waitForStateChange(from: .idle, to: .connecting) {
// Triggers the connect.
_ = manager.getHTTP2Multiplexer()
self.loop.run()
}

// Setup the channel.
let channel1 = channels.removeLast()
let channel1Promise = channelPromises.removeLast()

try channel1.pipeline.syncOperations.addHandler(
GRPCIdleHandler(
connectionManager: manager,
multiplexer: HTTP2StreamMultiplexer(
mode: .client,
channel: channel1,
inboundStreamInitializer: nil
),
idleTimeout: .minutes(5),
keepalive: .init(),
logger: self.logger
)
)
channel1Promise.succeed(channel1)
// Oops: wrong way around. We should tolerate this.
self.waitForStateChange(from: .connecting, to: .transientFailure) {
channel1.pipeline.fireChannelInactive()
}

channel1.pipeline.fireChannelActive()

// Start the next attempt.
self.waitForStateChange(from: .transientFailure, to: .connecting) {
self.loop.advanceTime(by: .seconds(1))
}

let channel2 = channels.removeLast()
let channel2Promise = channelPromises.removeLast()
try channel2.pipeline.syncOperations.addHandler(
GRPCIdleHandler(
connectionManager: manager,
multiplexer: HTTP2StreamMultiplexer(
mode: .client,
channel: channel1,
inboundStreamInitializer: nil
),
idleTimeout: .minutes(5),
keepalive: .init(),
logger: self.logger
)
)

channel2Promise.succeed(channel2)

try self.waitForStateChange(from: .connecting, to: .ready) {
channel2.pipeline.fireChannelActive()
let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
XCTAssertNoThrow(try channel2.writeInbound(frame))
}
}

func testIdleTimeoutWhenThereAreActiveStreams() throws {
let channelPromise = self.loop.makePromise(of: Channel.self)
let manager = self.makeConnectionManager { _, _ in
Expand Down