Skip to content

cancelOnGracefulShutdown hangs, if cancellation is not immediately #177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Sources/ServiceLifecycle/AsyncGracefulShutdownSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
@usableFromInline
struct AsyncGracefulShutdownSequence: AsyncSequence, Sendable {
@usableFromInline
typealias Element = Void
typealias Element = CancellationWaiter.Reason

@inlinable
init() {}
Expand All @@ -36,9 +36,8 @@ struct AsyncGracefulShutdownSequence: AsyncSequence, Sendable {
init() {}

@inlinable
func next() async throws -> Element? {
try await CancellationWaiter().wait()
return ()
func next() async -> Element? {
await CancellationWaiter().wait()
}
}
}
28 changes: 15 additions & 13 deletions Sources/ServiceLifecycle/CancellationWaiter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,38 @@
/// An actor that provides a function to wait on cancellation/graceful shutdown.
@usableFromInline
actor CancellationWaiter {
private var taskContinuation: CheckedContinuation<Void, Error>?
@usableFromInline
enum Reason: Sendable {
case cancelled
case gracefulShutdown
}

private var taskContinuation: CheckedContinuation<Reason, Never>?

@usableFromInline
init() {}

@usableFromInline
func wait() async throws {
try await withTaskCancellationHandler {
try await withGracefulShutdownHandler {
try await withCheckedThrowingContinuation { continuation in
func wait() async -> Reason {
await withTaskCancellationHandler {
await withGracefulShutdownHandler {
await withCheckedContinuation { (continuation: CheckedContinuation<Reason, Never>) in
self.taskContinuation = continuation
}
} onGracefulShutdown: {
Task {
await self.finish()
await self.finish(reason: .gracefulShutdown)
}
}
} onCancel: {
Task {
await self.finish(throwing: CancellationError())
await self.finish(reason: .cancelled)
}
}
}

private func finish(throwing error: Error? = nil) {
if let error {
self.taskContinuation?.resume(throwing: error)
} else {
self.taskContinuation?.resume()
}
private func finish(reason: Reason) {
self.taskContinuation?.resume(returning: reason)
self.taskContinuation = nil
}
}
23 changes: 16 additions & 7 deletions Sources/ServiceLifecycle/GracefulShutdown.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,21 @@ public func withTaskCancellationOrGracefulShutdownHandler<T>(
///
/// - Throws: `CancellationError` if the task is cancelled.
public func gracefulShutdown() async throws {
try await AsyncGracefulShutdownSequence().first { _ in true }
switch await AsyncGracefulShutdownSequence().first(where: { _ in true }) {
case .cancelled:
throw CancellationError()
case .gracefulShutdown:
return
case .none:
fatalError()
}
}

/// This is just a helper type for the result of our task group.
enum ValueOrGracefulShutdown<T: Sendable>: Sendable {
case value(T)
case gracefulShutdown
case cancelled
}

/// Cancels the closure when a graceful shutdown was triggered.
Expand All @@ -115,11 +123,12 @@ public func cancelOnGracefulShutdown<T: Sendable>(_ operation: @Sendable @escapi
}

group.addTask {
for try await _ in AsyncGracefulShutdownSequence() {
switch await CancellationWaiter().wait() {
case .cancelled:
return .cancelled
case .gracefulShutdown:
return .gracefulShutdown
}

throw CancellationError()
}

let result = try await group.next()
Expand All @@ -128,13 +137,13 @@ public func cancelOnGracefulShutdown<T: Sendable>(_ operation: @Sendable @escapi
switch result {
case .value(let t):
return t
case .gracefulShutdown:

case .gracefulShutdown, .cancelled:
switch try await group.next() {
case .value(let t):
return t
case .gracefulShutdown:
case .gracefulShutdown, .cancelled:
fatalError("Unexpectedly got gracefulShutdown from group.next()")

case nil:
fatalError("Unexpectedly got nil from group.next()")
}
Expand Down
84 changes: 84 additions & 0 deletions Tests/ServiceLifecycleTests/GracefulShutdownTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,88 @@ final class GracefulShutdownTests: XCTestCase {
group.cancelAll()
}
}

func testCancelOnGracefulShutdownSurvivesCancellation() async throws {
await withTaskGroup(of: Void.self) { group in
group.addTask {
await withGracefulShutdownHandler {
await cancelOnGracefulShutdown {
await OnlyCancellationWaiter().cancellation

try! await uncancellable {
try! await Task.sleep(for: .milliseconds(500))
}
}
} onGracefulShutdown: {
XCTFail("Unexpect graceful shutdown")
}
}

group.cancelAll()
}
}

func testCancelOnGracefulShutdownSurvivesErrorThrown() async throws {
struct MyError: Error, Equatable {}

await withTaskGroup(of: Void.self) { group in
group.addTask {
do {
try await withGracefulShutdownHandler {
try await cancelOnGracefulShutdown {
await OnlyCancellationWaiter().cancellation

try! await uncancellable {
try! await Task.sleep(for: .milliseconds(500))
}

throw MyError()
}
} onGracefulShutdown: {
XCTFail("Unexpect graceful shutdown")
}
XCTFail("Expected to have thrown")
} catch {
XCTAssertEqual(error as? MyError, MyError())
}
}

group.cancelAll()
}
}
}

func uncancellable(_ closure: @escaping @Sendable () async throws -> Void) async throws {
let task = Task {
try await closure()
}

try await task.value
}

private actor OnlyCancellationWaiter {
private var taskContinuation: CheckedContinuation<Void, Never>?

@usableFromInline
init() {}

@usableFromInline
var cancellation: Void {
get async {
await withTaskCancellationHandler {
await withCheckedContinuation { continuation in
self.taskContinuation = continuation
}
} onCancel: {
Task {
await self.finish()
}
}
}
}

private func finish() {
self.taskContinuation?.resume()
self.taskContinuation = nil
}
}