@@ -22,7 +22,9 @@ public actor ServiceRunner: Sendable {
22
22
/// The initial state of the runner.
23
23
case initial
24
24
/// The state once ``ServiceRunner/run()`` has been called.
25
- case running
25
+ case running(
26
+ gracefulShutdownStreamContinuation: AsyncStream < Void > . Continuation
27
+ )
26
28
/// The state once ``ServiceRunner/run()`` has finished.
27
29
case finished
28
30
}
@@ -59,15 +61,34 @@ public actor ServiceRunner: Sendable {
59
61
public func run( file: String = #file, line: Int = #line) async throws {
60
62
switch self . state {
61
63
case . initial:
62
- self . state = . running
63
- try await self . _run ( )
64
+ guard !self . services. isEmpty else {
65
+ self . state = . finished
66
+ return
67
+ }
68
+
69
+ let ( gracefulShutdownStream, gracefulShutdownContinuation) = AsyncStream . makeStream ( of: Void . self)
70
+
71
+ self . state = . running(
72
+ gracefulShutdownStreamContinuation: gracefulShutdownContinuation
73
+ )
74
+
75
+ var potentialError : Error ?
76
+ do {
77
+ try await self . _run ( gracefulShutdownStream: gracefulShutdownStream)
78
+ } catch {
79
+ potentialError = error
80
+ }
64
81
65
82
switch self . state {
66
83
case . initial, . finished:
67
84
fatalError ( " ServiceRunner is in an invalid state \( self . state) " )
68
85
69
86
case . running:
70
87
self . state = . finished
88
+
89
+ if let potentialError {
90
+ throw potentialError
91
+ }
71
92
}
72
93
73
94
case . running:
@@ -78,15 +99,43 @@ public actor ServiceRunner: Sendable {
78
99
}
79
100
}
80
101
102
+ /// Triggers the graceful shutdown of all services.
103
+ ///
104
+ /// This method returns immediately after triggering the graceful shutdown and doesn't wait until the service have shutdown.
105
+ public func shutdownGracefully( ) async {
106
+ switch self . state {
107
+ case . initial:
108
+ // We aren't even running so we can stop right away.
109
+ self . state = . finished
110
+ return
111
+
112
+ case . running( let gracefulShutdownStreamContinuation) :
113
+ // We cannot transition to shuttingDown here since we are signalling over to the task
114
+ // that runs `run`. This task is responsible for transitioning to shuttingDown since
115
+ // there might be multiple signals racing to trigger it
116
+
117
+ // We are going to signal the run method that graceful shutdown
118
+ // should be triggered
119
+ gracefulShutdownStreamContinuation. yield ( )
120
+ gracefulShutdownStreamContinuation. finish ( )
121
+
122
+ case . finished:
123
+ // Already finished running so nothing to do here
124
+ return
125
+ }
126
+ }
127
+
81
128
private enum ChildTaskResult {
82
129
case serviceFinished( service: any Service , index: Int )
83
130
case serviceThrew( service: any Service , index: Int , error: any Error )
84
131
case signalCaught( UnixSignal )
85
132
case signalSequenceFinished
133
+ case gracefulShutdownCaught
134
+ case gracefulShutdownFinished
86
135
}
87
136
88
- private func _run( ) async throws {
89
- self . logger. info (
137
+ private func _run( gracefulShutdownStream : AsyncStream < Void > ) async throws {
138
+ self . logger. debug (
90
139
" Starting service lifecycle " ,
91
140
metadata: [
92
141
self . configuration. logging. signalsKey: " \( self . configuration. gracefulShutdownSignals) " ,
@@ -100,6 +149,7 @@ public actor ServiceRunner: Sendable {
100
149
// First we have to register our signals.
101
150
let unixSignals = await UnixSignalsSequence ( trapping: self . configuration. gracefulShutdownSignals)
102
151
152
+ // This is the task that listens to signals
103
153
group. addTask {
104
154
for await signal in unixSignals {
105
155
return . signalCaught( signal)
@@ -108,6 +158,26 @@ public actor ServiceRunner: Sendable {
108
158
return . signalSequenceFinished
109
159
}
110
160
161
+ // This is the task that listens to manual graceful shutdown
162
+ group. addTask {
163
+ for await _ in gracefulShutdownStream {
164
+ return . gracefulShutdownCaught
165
+ }
166
+
167
+ return . gracefulShutdownFinished
168
+ }
169
+
170
+ // This is an optional task that listens to graceful shutdowns from the parent task
171
+ if let _ = TaskLocals . gracefulShutdownManager {
172
+ group. addTask {
173
+ for await _ in AsyncGracefulShutdownSequence ( ) {
174
+ return . gracefulShutdownCaught
175
+ }
176
+
177
+ return . gracefulShutdownFinished
178
+ }
179
+ }
180
+
111
181
// We have to create a graceful shutdown manager per service
112
182
// since we want to signal them individually and wait for a single service
113
183
// to finish before moving to the next one
@@ -174,10 +244,8 @@ public actor ServiceRunner: Sendable {
174
244
return . failure( error)
175
245
176
246
case . signalCaught( let unixSignal) :
177
- // We got a signal. Let's initiate graceful shutdown in reverse order than we started the
178
- // services. This allows the users to declare a hierarchy with the order they passed
179
- // the services.
180
- self . logger. info (
247
+ // We got a signal. Let's initiate graceful shutdown.
248
+ self . logger. debug (
181
249
" Signal caught. Shutting down services " ,
182
250
metadata: [
183
251
self . configuration. logging. signalKey: " \( unixSignal) " ,
@@ -193,7 +261,20 @@ public actor ServiceRunner: Sendable {
193
261
return . failure( error)
194
262
}
195
263
196
- case . signalSequenceFinished:
264
+ case . gracefulShutdownCaught:
265
+ // We got a manual or inherited graceful shutdown. Let's initiate graceful shutdown.
266
+ self . logger. debug ( " Graceful shutdown caught. Cascading shutdown to services " )
267
+
268
+ do {
269
+ try await self . shutdownGracefully (
270
+ group: & group,
271
+ gracefulShutdownManagers: gracefulShutdownManagers
272
+ )
273
+ } catch {
274
+ return . failure( error)
275
+ }
276
+
277
+ case . signalSequenceFinished, . gracefulShutdownFinished:
197
278
// This can happen when we are either cancelling everything or
198
279
// when the user did not specify any shutdown signals. We just have to tolerate
199
280
// this.
@@ -207,7 +288,7 @@ public actor ServiceRunner: Sendable {
207
288
return . success( ( ) )
208
289
}
209
290
210
- self . logger. info (
291
+ self . logger. debug (
211
292
" Service lifecycle ended "
212
293
)
213
294
try result. get ( )
@@ -217,6 +298,10 @@ public actor ServiceRunner: Sendable {
217
298
group: inout TaskGroup < ChildTaskResult > ,
218
299
gracefulShutdownManagers: [ GracefulShutdownManager ]
219
300
) async throws {
301
+ guard case . running = self . state else {
302
+ fatalError ( " Unexpected state " )
303
+ }
304
+
220
305
// We have to shutdown the services in reverse. To do this
221
306
// we are going to signal each child task the graceful shutdown and then wait for
222
307
// its exit.
@@ -246,7 +331,7 @@ public actor ServiceRunner: Sendable {
246
331
continue
247
332
} else {
248
333
// Another service exited unexpectedly
249
- self . logger. error (
334
+ self . logger. debug (
250
335
" Service finished unexpectedly during graceful shutdown. Cancelling all other services now " ,
251
336
metadata: [
252
337
self . configuration. logging. serviceKey: " \( service) " ,
@@ -258,7 +343,7 @@ public actor ServiceRunner: Sendable {
258
343
}
259
344
260
345
case . serviceThrew( let service, _, let error) :
261
- self . logger. error (
346
+ self . logger. debug (
262
347
" Service threw error during graceful shutdown. Cancelling all other services now " ,
263
348
metadata: [
264
349
self . configuration. logging. serviceKey: " \( service) " ,
@@ -269,12 +354,30 @@ public actor ServiceRunner: Sendable {
269
354
270
355
throw error
271
356
272
- case . signalCaught, . signalSequenceFinished:
273
- fatalError ( " Signal sequence already returned a signal. " )
357
+ case . signalCaught, . signalSequenceFinished, . gracefulShutdownCaught, . gracefulShutdownFinished:
358
+ // We just have to tolerate this since signals and parent graceful shutdowns downs can race.
359
+ continue
274
360
275
361
case nil :
276
362
fatalError ( " Invalid result from group.next(). " )
277
363
}
278
364
}
365
+
366
+ // If we hit this then all services are shutdown. The only thing remaining
367
+ // are the tasks that listen to the various graceful shutdown signals. We
368
+ // just have to cancel those
369
+ group. cancelAll ( )
370
+ }
371
+ }
372
+
373
+ // This should be removed once we support Swift 5.9+
374
+ extension AsyncStream {
375
+ fileprivate static func makeStream(
376
+ of elementType: Element . Type = Element . self,
377
+ bufferingPolicy limit: Continuation . BufferingPolicy = . unbounded
378
+ ) -> ( stream: AsyncStream < Element > , continuation: AsyncStream < Element > . Continuation ) {
379
+ var continuation : AsyncStream < Element > . Continuation !
380
+ let stream = AsyncStream < Element > ( bufferingPolicy: limit) { continuation = $0 }
381
+ return ( stream: stream, continuation: continuation!)
279
382
}
280
383
}
0 commit comments