Skip to content

Add async gracefulShutdown function to wait on graceful shutdown trigger #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions Sources/ServiceLifecycle/AsyncGracefulShutdownSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
/// An async sequence that emits an element once graceful shutdown has been triggered.
///
/// This sequence is a broadcast async sequence and will only produce one value and then finish.
///
/// - Note: This sequence respects cancellation and thus is `throwing`.
@usableFromInline
struct AsyncGracefulShutdownSequence: AsyncSequence, Sendable {
@usableFromInline
Expand All @@ -34,19 +36,9 @@ struct AsyncGracefulShutdownSequence: AsyncSequence, Sendable {
init() {}

@inlinable
func next() async -> Element? {
var cont: AsyncStream<Void>.Continuation!
let stream = AsyncStream<Void> { cont = $0 }
let continuation = cont!

return await withTaskGroup(of: Void.self) { _ in
await withGracefulShutdownHandler {
await stream.first { _ in true }
} onGracefulShutdown: {
continuation.yield(())
continuation.finish()
}
}
func next() async throws -> Element? {
try await CancellationWaiter().wait()
return ()
}
}
}
50 changes: 50 additions & 0 deletions Sources/ServiceLifecycle/CancellationWaiter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftServiceLifecycle open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftServiceLifecycle project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftServiceLifecycle project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// An actor that provides a function to wait on cancellation/graceful shutdown.
@usableFromInline
actor CancellationWaiter {
private var taskContinuation: CheckedContinuation<Void, Error>?

@usableFromInline
init() {}

@usableFromInline
func wait() async throws {
try await withTaskCancellationHandler {
try await withGracefulShutdownHandler {
try await withCheckedThrowingContinuation { continuation in
self.taskContinuation = continuation
}
} onGracefulShutdown: {
Task {
await self.finish()
}
}
} onCancel: {
Task {
await self.finish(throwing: CancellationError())
}
}
}

private func finish(throwing error: Error? = nil) {
if let error {
self.taskContinuation?.resume(throwing: error)
} else {
self.taskContinuation?.resume()
}
self.taskContinuation = nil
}
}
20 changes: 19 additions & 1 deletion Sources/ServiceLifecycle/GracefulShutdown.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ public func withGracefulShutdownHandler<T>(
return try await operation()
}

/// Waits until graceful shutdown is triggered.
///
/// This method suspends the caller until graceful shutdown is triggered. If the calling task is cancelled before
/// graceful shutdown is triggered then this method will throw a `CancellationError`.
///
/// - Throws: `CancellationError` if the task is cancelled.
public func gracefulShutdown() async throws {
try await AsyncGracefulShutdownSequence().first { _ in true }
}

/// This is just a helper type for the result of our task group.
enum ValueOrGracefulShutdown<T: Sendable>: Sendable {
case value(T)
Expand All @@ -72,7 +82,7 @@ public func cancelOnGracefulShutdown<T: Sendable>(_ operation: @Sendable @escapi
}

group.addTask {
for await _ in AsyncGracefulShutdownSequence() {
for try await _ in AsyncGracefulShutdownSequence() {
return .gracefulShutdown
}

Expand Down Expand Up @@ -138,6 +148,8 @@ public final class GracefulShutdownManager: @unchecked Sendable {
fileprivate var handlerCounter: UInt64 = 0
/// A boolean indicating if we have been shutdown already.
fileprivate var isShuttingDown = false
/// Continuations to resume after all of the handlers have been executed.
fileprivate var gracefulShutdownFinishedContinuations = [CheckedContinuation<Void, Never>]()
}

private let state = LockedValueBox(State())
Expand Down Expand Up @@ -191,6 +203,12 @@ public final class GracefulShutdownManager: @unchecked Sendable {
}

state.handlers.removeAll()

for continuation in state.gracefulShutdownFinishedContinuations {
continuation.resume()
}

state.gracefulShutdownFinishedContinuations.removeAll()
}
}
}
10 changes: 5 additions & 5 deletions Sources/ServiceLifecycle/ServiceGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public actor ServiceGroup: Sendable {

// Using a result here since we want a task group that has non-throwing child tasks
// but the body itself is throwing
let result = await withTaskGroup(of: ChildTaskResult.self, returning: Result<Void, Error>.self) { group in
let result = try await withThrowingTaskGroup(of: ChildTaskResult.self, returning: Result<Void, Error>.self) { group in
// First we have to register our signals.
let gracefulShutdownSignals = await UnixSignalsSequence(trapping: self.gracefulShutdownSignals)
let cancellationSignals = await UnixSignalsSequence(trapping: self.cancellationSignals)
Expand Down Expand Up @@ -228,7 +228,7 @@ public actor ServiceGroup: Sendable {
// This is an optional task that listens to graceful shutdowns from the parent task
if let _ = TaskLocals.gracefulShutdownManager {
group.addTask {
for await _ in AsyncGracefulShutdownSequence() {
for try await _ in AsyncGracefulShutdownSequence() {
return .gracefulShutdownCaught
}

Expand Down Expand Up @@ -276,7 +276,7 @@ public actor ServiceGroup: Sendable {
// We are going to wait for any of the services to finish or
// the signal sequence to throw an error.
while !group.isEmpty {
let result: ChildTaskResult? = await group.next()
let result: ChildTaskResult? = try await group.next()

switch result {
case .serviceFinished(let service, let index):
Expand Down Expand Up @@ -452,7 +452,7 @@ public actor ServiceGroup: Sendable {

private func shutdownGracefully(
services: [ServiceGroupConfiguration.ServiceConfiguration?],
group: inout TaskGroup<ChildTaskResult>,
group: inout ThrowingTaskGroup<ChildTaskResult, Error>,
gracefulShutdownManagers: [GracefulShutdownManager]
) async throws {
guard case .running = self.state else {
Expand Down Expand Up @@ -481,7 +481,7 @@ public actor ServiceGroup: Sendable {

gracefulShutdownManager.shutdownGracefully()

let result = await group.next()
let result = try await group.next()

switch result {
case .serviceFinished(let service, let index):
Expand Down
50 changes: 50 additions & 0 deletions Tests/ServiceLifecycleTests/GracefulShutdownTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,54 @@ final class GracefulShutdownTests: XCTestCase {
XCTAssertTrue(Task.isShuttingDownGracefully)
}
}

func testWaitForGracefulShutdown() async throws {
try await testGracefulShutdown { gracefulShutdownTestTrigger in
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await Task.sleep(for: .milliseconds(10))
gracefulShutdownTestTrigger.triggerGracefulShutdown()
}

try await withGracefulShutdownHandler {
try await gracefulShutdown()
} onGracefulShutdown: {
// No-op
}

try await group.waitForAll()
}
}
}

func testWaitForGracefulShutdown_WhenAlreadyShutdown() async throws {
try await testGracefulShutdown { gracefulShutdownTestTrigger in
gracefulShutdownTestTrigger.triggerGracefulShutdown()

try await withGracefulShutdownHandler {
try await Task.sleep(for: .milliseconds(10))
try await gracefulShutdown()
} onGracefulShutdown: {
// No-op
}
}
}

func testWaitForGracefulShutdown_Cancellation() async throws {
do {
try await testGracefulShutdown { _ in
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await gracefulShutdown()
}

group.cancelAll()
try await group.waitForAll()
}
}
XCTFail("Expected CancellationError to be thrown")
} catch {
XCTAssertTrue(error is CancellationError)
}
}
}