-
Notifications
You must be signed in to change notification settings - Fork 41
Add a configurable escalation behaviour #162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,10 @@ public actor ServiceGroup: Sendable { | |
private let logger: Logger | ||
/// The logging configuration. | ||
private let loggingConfiguration: ServiceGroupConfiguration.LoggingConfiguration | ||
/// The maximum amount of time that graceful shutdown is allowed to take. | ||
private let maximumGracefulShutdownDuration: (secondsComponent: Int64, attosecondsComponent: Int64)? | ||
/// The maximum amount of time that task cancellation is allowed to take. | ||
private let maximumCancellationDuration: (secondsComponent: Int64, attosecondsComponent: Int64)? | ||
/// The signals that lead to graceful shutdown. | ||
private let gracefulShutdownSignals: [UnixSignal] | ||
/// The signals that lead to cancellation. | ||
|
@@ -57,6 +61,8 @@ public actor ServiceGroup: Sendable { | |
self.cancellationSignals = configuration.cancellationSignals | ||
self.logger = configuration.logger | ||
self.loggingConfiguration = configuration.logging | ||
self.maximumGracefulShutdownDuration = configuration._maximumGracefulShutdownDuration | ||
self.maximumCancellationDuration = configuration._maximumCancellationDuration | ||
} | ||
|
||
/// Initializes a new ``ServiceGroup``. | ||
|
@@ -94,6 +100,8 @@ public actor ServiceGroup: Sendable { | |
self.cancellationSignals = configuration.cancellationSignals | ||
self.logger = logger | ||
self.loggingConfiguration = configuration.logging | ||
self.maximumGracefulShutdownDuration = configuration._maximumGracefulShutdownDuration | ||
self.maximumCancellationDuration = configuration._maximumCancellationDuration | ||
} | ||
|
||
/// Runs all the services by spinning up a child task per service. | ||
|
@@ -176,6 +184,8 @@ public actor ServiceGroup: Sendable { | |
case signalSequenceFinished | ||
case gracefulShutdownCaught | ||
case gracefulShutdownFinished | ||
case gracefulShutdownTimedOut | ||
case cancellationCaught | ||
} | ||
|
||
private func _run( | ||
|
@@ -191,6 +201,10 @@ public actor ServiceGroup: Sendable { | |
] | ||
) | ||
|
||
// A task that is spawned when we got cancelled or | ||
// we cancel the task group to keep track of a timeout. | ||
var cancellationTimeoutTask: Task<Void, Never>? | ||
|
||
// Using a result here since we want a task group that has non-throwing child tasks | ||
// but the body itself is throwing | ||
let result = try await withThrowingTaskGroup(of: ChildTaskResult.self, returning: Result<Void, Error>.self) { group in | ||
|
@@ -267,6 +281,13 @@ public actor ServiceGroup: Sendable { | |
} | ||
} | ||
|
||
group.addTask { | ||
// This child task is waiting forever until the group gets cancelled. | ||
let (stream, _) = AsyncStream.makeStream(of: Void.self) | ||
await stream.first { _ in true } | ||
return .cancellationCaught | ||
} | ||
|
||
// We are storing the services in an optional array now. When a slot in the array is | ||
// empty it indicates that the service has been shutdown. | ||
var services = services.map { Optional($0) } | ||
|
@@ -293,7 +314,7 @@ public actor ServiceGroup: Sendable { | |
self.loggingConfiguration.keys.serviceKey: "\(service.service)", | ||
] | ||
) | ||
group.cancelAll() | ||
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) | ||
return .failure(ServiceGroupError.serviceFinishedUnexpectedly()) | ||
|
||
case .gracefullyShutdownGroup: | ||
|
@@ -307,6 +328,7 @@ public actor ServiceGroup: Sendable { | |
do { | ||
try await self.shutdownGracefully( | ||
services: services, | ||
cancellationTimeoutTask: &cancellationTimeoutTask, | ||
group: &group, | ||
gracefulShutdownManagers: gracefulShutdownManagers | ||
) | ||
|
@@ -327,7 +349,7 @@ public actor ServiceGroup: Sendable { | |
self.logger.debug( | ||
"All services finished." | ||
) | ||
group.cancelAll() | ||
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) | ||
return .success(()) | ||
} | ||
} | ||
|
@@ -342,7 +364,7 @@ public actor ServiceGroup: Sendable { | |
self.loggingConfiguration.keys.errorKey: "\(serviceError)", | ||
] | ||
) | ||
group.cancelAll() | ||
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) | ||
return .failure(serviceError) | ||
|
||
case .gracefullyShutdownGroup: | ||
|
@@ -358,6 +380,7 @@ public actor ServiceGroup: Sendable { | |
do { | ||
try await self.shutdownGracefully( | ||
services: services, | ||
cancellationTimeoutTask: &cancellationTimeoutTask, | ||
group: &group, | ||
gracefulShutdownManagers: gracefulShutdownManagers | ||
) | ||
|
@@ -381,7 +404,7 @@ public actor ServiceGroup: Sendable { | |
"All services finished." | ||
) | ||
|
||
group.cancelAll() | ||
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) | ||
return .success(()) | ||
} | ||
} | ||
|
@@ -398,6 +421,7 @@ public actor ServiceGroup: Sendable { | |
do { | ||
try await self.shutdownGracefully( | ||
services: services, | ||
cancellationTimeoutTask: &cancellationTimeoutTask, | ||
group: &group, | ||
gracefulShutdownManagers: gracefulShutdownManagers | ||
) | ||
|
@@ -413,7 +437,7 @@ public actor ServiceGroup: Sendable { | |
] | ||
) | ||
|
||
group.cancelAll() | ||
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) | ||
} | ||
|
||
case .gracefulShutdownCaught: | ||
|
@@ -423,19 +447,29 @@ public actor ServiceGroup: Sendable { | |
do { | ||
try await self.shutdownGracefully( | ||
services: services, | ||
cancellationTimeoutTask: &cancellationTimeoutTask, | ||
group: &group, | ||
gracefulShutdownManagers: gracefulShutdownManagers | ||
) | ||
} catch { | ||
return .failure(error) | ||
} | ||
|
||
case .cancellationCaught: | ||
// We caught cancellation in our child task so we have to spawn | ||
// our cancellation timeout task if needed | ||
self.logger.debug("Caught cancellation.") | ||
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) | ||
|
||
case .signalSequenceFinished, .gracefulShutdownFinished: | ||
// This can happen when we are either cancelling everything or | ||
// when the user did not specify any shutdown signals. We just have to tolerate | ||
// this. | ||
continue | ||
|
||
case .gracefulShutdownTimedOut: | ||
fatalError("Received gracefulShutdownTimedOut but never triggered a graceful shutdown") | ||
|
||
case nil: | ||
fatalError("Invalid result from group.next(). We checked if the group is empty before and still got nil") | ||
} | ||
|
@@ -447,18 +481,30 @@ public actor ServiceGroup: Sendable { | |
self.logger.debug( | ||
"Service lifecycle ended" | ||
) | ||
cancellationTimeoutTask?.cancel() | ||
try result.get() | ||
} | ||
|
||
private func shutdownGracefully( | ||
services: [ServiceGroupConfiguration.ServiceConfiguration?], | ||
cancellationTimeoutTask: inout Task<Void, Never>?, | ||
group: inout ThrowingTaskGroup<ChildTaskResult, Error>, | ||
gracefulShutdownManagers: [GracefulShutdownManager] | ||
) async throws { | ||
guard case .running = self.state else { | ||
fatalError("Unexpected state") | ||
} | ||
|
||
if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *), let maximumGracefulShutdownDuration = self.maximumGracefulShutdownDuration { | ||
group.addTask { | ||
try? await Task.sleep(for: Duration( | ||
secondsComponent: maximumGracefulShutdownDuration.secondsComponent, | ||
attosecondsComponent: maximumGracefulShutdownDuration.attosecondsComponent | ||
)) | ||
return .gracefulShutdownTimedOut | ||
} | ||
} | ||
|
||
// We are storing the first error of a service that threw here. | ||
var error: Error? | ||
|
||
|
@@ -509,7 +555,7 @@ public actor ServiceGroup: Sendable { | |
] | ||
) | ||
|
||
group.cancelAll() | ||
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) | ||
throw ServiceGroupError.serviceFinishedUnexpectedly() | ||
} | ||
|
||
|
@@ -561,9 +607,26 @@ public actor ServiceGroup: Sendable { | |
] | ||
) | ||
|
||
group.cancelAll() | ||
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) | ||
} | ||
|
||
case .gracefulShutdownTimedOut: | ||
// Gracefully shutting down took longer than the user configured | ||
// so we have to escalate it now. | ||
self.logger.debug( | ||
"Graceful shutdown took longer than allowed by the configuration. Cancelling the group now.", | ||
metadata: [ | ||
self.loggingConfiguration.keys.serviceKey: "\(service.service)", | ||
] | ||
) | ||
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) | ||
|
||
case .cancellationCaught: | ||
// We caught cancellation in our child task so we have to spawn | ||
// our cancellation timeout task if needed | ||
self.logger.debug("Caught cancellation.") | ||
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask) | ||
|
||
case .signalSequenceFinished, .gracefulShutdownCaught, .gracefulShutdownFinished: | ||
// We just have to tolerate this since signals and parent graceful shutdowns downs can race. | ||
continue | ||
|
@@ -575,7 +638,9 @@ public actor ServiceGroup: Sendable { | |
|
||
// If we hit this then all services are shutdown. The only thing remaining | ||
// are the tasks that listen to the various graceful shutdown signals. We | ||
// just have to cancel those | ||
// just have to cancel those. | ||
// In this case we don't have to spawn our cancellation timeout task since | ||
// we are sure all other child tasks are handling cancellation appropriately. | ||
group.cancelAll() | ||
|
||
// If we saw an error during graceful shutdown from a service that triggers graceful | ||
|
@@ -584,6 +649,45 @@ public actor ServiceGroup: Sendable { | |
throw error | ||
} | ||
} | ||
|
||
private func cancelGroupAndSpawnTimeoutIfNeeded( | ||
group: inout ThrowingTaskGroup<ChildTaskResult, Error>, | ||
cancellationTimeoutTask: inout Task<Void, Never>? | ||
) { | ||
guard cancellationTimeoutTask == nil else { | ||
// We already have a cancellation timeout task running. | ||
self.logger.debug( | ||
"Task cancellation timeout task already running." | ||
) | ||
return | ||
} | ||
group.cancelAll() | ||
|
||
if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *), let maximumCancellationDuration = self.maximumCancellationDuration { | ||
// We have to spawn an unstructured task here because the call to our `run` | ||
// method might have already been cancelled and we need to protect the sleep | ||
// from being cancelled. | ||
cancellationTimeoutTask = Task { | ||
do { | ||
self.logger.debug( | ||
"Task cancellation timeout task started." | ||
) | ||
try await Task.sleep(for: Duration( | ||
secondsComponent: maximumCancellationDuration.secondsComponent, | ||
attosecondsComponent: maximumCancellationDuration.attosecondsComponent | ||
)) | ||
Comment on lines
+675
to
+678
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No it's not. We are storing the raw components again because we cannot store a |
||
self.logger.debug( | ||
"Cancellation took longer than allowed by the configuration." | ||
) | ||
fatalError("Cancellation took longer than allowed by the configuration.") | ||
} catch { | ||
// We got cancelled so our services must have finished up. | ||
} | ||
} | ||
} else { | ||
cancellationTimeoutTask = nil | ||
} | ||
} | ||
} | ||
|
||
// This should be removed once we support Swift 5.9+ | ||
|
Uh oh!
There was an error while loading. Please reload this page.