diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index 853cc6d..580313a 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -14,6 +14,7 @@ import Logging import UnixSignals +import AsyncAlgorithms /// A ``ServiceGroup`` is responsible for running a number of services, setting up signal handling and signalling graceful shutdown to the services. public actor ServiceGroup: Sendable, Service { @@ -23,7 +24,8 @@ public actor ServiceGroup: Sendable, Service { case initial(services: [ServiceGroupConfiguration.ServiceConfiguration]) /// The state once ``ServiceGroup/run()`` has been called. case running( - gracefulShutdownStreamContinuation: AsyncStream.Continuation + gracefulShutdownStreamContinuation: AsyncStream.Continuation, + addedServiceChannel: AsyncChannel ) /// The state once ``ServiceGroup/run()`` has finished. case finished @@ -106,6 +108,38 @@ public actor ServiceGroup: Sendable, Service { self.maximumCancellationDuration = configuration._maximumCancellationDuration } + /// Adds a new service to the group. + /// + /// If the group is currently running, the added service will be started immediately. + /// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started. + /// - Parameters: + /// - serviceConfiguration: The service configuration to add. + public func addServiceUnlessShutdown(_ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration) async { + switch self.state { + case var .initial(services: services): + self.state = .initial(services: []) + services.append(serviceConfiguration) + self.state = .initial(services: services) + + case .running(_, let addedServiceChannel): + await addedServiceChannel.send(serviceConfiguration) + + case .finished: + // Since this is a best effort operation we don't have to do anything here + return + } + } + + /// Adds a new service to the group. + /// + /// If the group is currently running, the added service will be started immediately. + /// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started. + /// - Parameters: + /// - service: The service to add. + public func addServiceUnlessShutdown(_ service: any Service) async { + await self.addServiceUnlessShutdown(ServiceGroupConfiguration.ServiceConfiguration(service: service)) + } + /// Runs all the services by spinning up a child task per service. /// Furthermore, this method sets up the correct signal handlers /// for graceful shutdown. @@ -128,16 +162,19 @@ public actor ServiceGroup: Sendable, Service { } let (gracefulShutdownStream, gracefulShutdownContinuation) = AsyncStream.makeStream(of: Void.self) + let addedServiceChannel = AsyncChannel() self.state = .running( - gracefulShutdownStreamContinuation: gracefulShutdownContinuation + gracefulShutdownStreamContinuation: gracefulShutdownContinuation, + addedServiceChannel: addedServiceChannel ) var potentialError: Error? do { try await self._run( services: &services, - gracefulShutdownStream: gracefulShutdownStream + gracefulShutdownStream: gracefulShutdownStream, + addedServiceChannel: addedServiceChannel ) } catch { potentialError = error @@ -173,7 +210,7 @@ public actor ServiceGroup: Sendable, Service { self.state = .finished return - case .running(let gracefulShutdownStreamContinuation): + case .running(let gracefulShutdownStreamContinuation, _): // We cannot transition to shuttingDown here since we are signalling over to the task // that runs `run`. This task is responsible for transitioning to shuttingDown since // there might be multiple signals racing to trigger it @@ -198,11 +235,13 @@ public actor ServiceGroup: Sendable, Service { case gracefulShutdownFinished case gracefulShutdownTimedOut case cancellationCaught + case newServiceAdded(ServiceGroupConfiguration.ServiceConfiguration) } private func _run( services: inout [ServiceGroupConfiguration.ServiceConfiguration], - gracefulShutdownStream: AsyncStream + gracefulShutdownStream: AsyncStream, + addedServiceChannel: AsyncChannel ) async throws { self.logger.debug( "Starting service lifecycle", @@ -280,25 +319,12 @@ public actor ServiceGroup: Sendable, Service { let gracefulShutdownManager = GracefulShutdownManager() gracefulShutdownManagers.append(gracefulShutdownManager) - // This must be addTask and not addTaskUnlessCancelled - // because we must run all the services for the below logic to work. - group.addTask { - return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) { - do { - try await serviceConfiguration.service.run() - return .serviceFinished(service: serviceConfiguration, index: index) - } catch { - return .serviceThrew(service: serviceConfiguration, index: index, error: error) - } - } - } - } - - group.addTask { - // This child task is waiting forever until the group gets cancelled. - let (stream, _) = AsyncStream.makeStream(of: Void.self) - await stream.first { _ in true } - return .cancellationCaught + self.addServiceTask( + group: &group, + service: serviceConfiguration, + gracefulShutdownManager: gracefulShutdownManager, + index: index + ) } // We are storing the services in an optional array now. When a slot in the array is @@ -310,12 +336,52 @@ public actor ServiceGroup: Sendable, Service { "We did not create a graceful shutdown manager per service" ) + group.addTask { + // This child task is waiting forever until the group gets cancelled. + let (stream, _) = AsyncStream.makeStream(of: Void.self) + await stream.first { _ in true } + return .cancellationCaught + } + + // Adds a task that listens to added services and funnels them into the task group + self.addAddedServiceListenerTask(group: &group, channel: addedServiceChannel) + // We are going to wait for any of the services to finish or // the signal sequence to throw an error. while !group.isEmpty { let result: ChildTaskResult? = try await group.next() switch result { + case .newServiceAdded(let serviceConfiguration): + self.logger.debug( + "Starting added service", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(serviceConfiguration.service)" + ] + ) + + let gracefulShutdownManager = GracefulShutdownManager() + gracefulShutdownManagers.append(gracefulShutdownManager) + services.append(serviceConfiguration) + + precondition( + services.count == gracefulShutdownManagers.count, + "Mismatch between services and graceful shutdown managers" + ) + + self.addServiceTask( + group: &group, + service: serviceConfiguration, + gracefulShutdownManager: gracefulShutdownManager, + index: services.count - 1 + ) + + // Each listener task can only handle a single added service, so we must add a new listener + self.addAddedServiceListenerTask( + group: &group, + channel: addedServiceChannel + ) + case .serviceFinished(let service, let index): if group.isCancelled { // The group is cancelled and we expect all services to finish @@ -530,10 +596,13 @@ public actor ServiceGroup: Sendable, Service { group: inout ThrowingTaskGroup, gracefulShutdownManagers: [GracefulShutdownManager] ) async throws { - guard case .running = self.state else { + guard case let .running(_, addedServiceChannel) = self.state else { fatalError("Unexpected state") } + // Signal to stop adding new services (it is important that no new services are added after this point) + addedServiceChannel.finish() + if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *), let maximumGracefulShutdownDuration = self.maximumGracefulShutdownDuration { @@ -717,6 +786,10 @@ public actor ServiceGroup: Sendable, Service { // We are going to continue the result loop since we have to wait for our service // to finish. break + + case .newServiceAdded: + // Since adding services is best effort, we simply ignore this + break } } } @@ -777,6 +850,46 @@ public actor ServiceGroup: Sendable, Service { cancellationTimeoutTask = nil } } + + private func addServiceTask( + group: inout ThrowingTaskGroup, + service serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration, + gracefulShutdownManager: GracefulShutdownManager, + index: Int + ) { + // This must be addTask and not addTaskUnlessCancelled + // because we must run all the services for the shutdown logic to work. + group.addTask { + return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) { + do { + try await serviceConfiguration.service.run() + return .serviceFinished(service: serviceConfiguration, index: index) + } catch { + return .serviceThrew(service: serviceConfiguration, index: index, error: error) + } + } + } + } + + private func addAddedServiceListenerTask( + group: inout ThrowingTaskGroup, + channel: AsyncChannel + ) { + group.addTask { + return await withTaskCancellationHandler { + var iterator = channel.makeAsyncIterator() + if let addedService = await iterator.next() { + return .newServiceAdded(addedService) + } + + return .gracefulShutdownFinished + } onCancel: { + // Once the group is cancelled we will no longer read from the channel. + // This will resume any suspended producer in `addServiceUnlessShutdown`. + channel.finish() + } + } + } } // This should be removed once we support Swift 5.9+ diff --git a/Tests/ServiceLifecycleTests/MockService.swift b/Tests/ServiceLifecycleTests/MockService.swift new file mode 100644 index 0000000..ecef7dc --- /dev/null +++ b/Tests/ServiceLifecycleTests/MockService.swift @@ -0,0 +1,85 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftServiceLifecycle open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftServiceLifecycle project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftServiceLifecycle project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import ServiceLifecycle + +actor MockService: Service, CustomStringConvertible { + enum Event { + case run + case runPing + case runCancelled + case shutdownGracefully + } + + let events: AsyncStream + private(set) var hasRun: Bool = false + + private let eventsContinuation: AsyncStream.Continuation + + private var runContinuation: CheckedContinuation? + + nonisolated let description: String + + private let pings: AsyncStream + private nonisolated let pingContinuation: AsyncStream.Continuation + + init( + description: String + ) { + var eventsContinuation: AsyncStream.Continuation! + events = AsyncStream { eventsContinuation = $0 } + self.eventsContinuation = eventsContinuation! + + var pingContinuation: AsyncStream.Continuation! + pings = AsyncStream { pingContinuation = $0 } + self.pingContinuation = pingContinuation! + + self.description = description + } + + func run() async throws { + hasRun = true + + try await withTaskCancellationHandler { + try await withGracefulShutdownHandler { + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + self.eventsContinuation.yield(.run) + for await _ in self.pings { + self.eventsContinuation.yield(.runPing) + } + } + + try await withCheckedThrowingContinuation { + self.runContinuation = $0 + } + + group.cancelAll() + } + } onGracefulShutdown: { + self.eventsContinuation.yield(.shutdownGracefully) + } + } onCancel: { + self.eventsContinuation.yield(.runCancelled) + } + } + + func resumeRunContinuation(with result: Result) { + runContinuation?.resume(with: result) + } + + nonisolated func sendPing() { + pingContinuation.yield() + } +} diff --git a/Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift b/Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift new file mode 100644 index 0000000..9bed364 --- /dev/null +++ b/Tests/ServiceLifecycleTests/ServiceGroupAddServiceTests.swift @@ -0,0 +1,397 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftServiceLifecycle open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftServiceLifecycle project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftServiceLifecycle project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +import ServiceLifecycle +import UnixSignals +import XCTest + +private struct ExampleError: Error, Hashable {} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +final class ServiceGroupAddServiceTests: XCTestCase { + + func testAddService_whenNotRunning() async { + let mockService = MockService(description: "Service1") + let serviceGroup = self.makeServiceGroup() + await serviceGroup.addServiceUnlessShutdown(mockService) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator = mockService.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator.next(), .run) + + group.cancelAll() + await XCTAsyncAssertEqual(await eventIterator.next(), .runCancelled) + + await mockService.resumeRunContinuation(with: .success(())) + } + + } + + func testAddService_whenRunning() async throws { + let mockService1 = MockService(description: "Service1") + let mockService2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService1)] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = mockService1.events.makeAsyncIterator() + var eventIterator2 = mockService2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.addServiceUnlessShutdown(mockService2) + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + await mockService1.resumeRunContinuation(with: .success(())) + + await XCTAsyncAssertEqual(await eventIterator2.next(), .runCancelled) + await mockService2.resumeRunContinuation(with: .success(())) + } + } + + func testAddService_whenShuttingDown() async throws { + let mockService1 = MockService(description: "Service1") + let mockService2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService1)] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = mockService1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + await serviceGroup.addServiceUnlessShutdown(mockService2) + + await mockService1.resumeRunContinuation(with: .success(())) + } + + await XCTAsyncAssertEqual(await mockService2.hasRun, false) + } + + func testAddService_whenCancelling() async throws { + let mockService1 = MockService(description: "Service1") + let mockService2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: mockService1)] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = mockService1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + group.cancelAll() + + await XCTAsyncAssertEqual(await eventIterator1.next(), .runCancelled) + await serviceGroup.addServiceUnlessShutdown(mockService2) + + await mockService1.resumeRunContinuation(with: .success(())) + } + + await XCTAsyncAssertEqual(await mockService2.hasRun, false) + } + + func testRun_whenAddedServiceExitsEarly_andIgnore() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [], + gracefulShutdownSignals: [.sigalrm] + ) + + await serviceGroup.addServiceUnlessShutdown(.init(service: service1, successTerminationBehavior: .ignore)) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.addServiceUnlessShutdown(.init(service: service2, failureTerminationBehavior: .ignore)) + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + await service1.resumeRunContinuation(with: .success(())) + + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + try await XCTAsyncAssertNoThrow(await group.next()) + } + } + + func testRun_whenAddedServiceExitsEarly_andShutdownGracefully() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [ + .init(service: service1) + ] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.addServiceUnlessShutdown( + .init(service: service2, successTerminationBehavior: .gracefullyShutdownGroup) + ) + await serviceGroup.addServiceUnlessShutdown(.init(service: service3)) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await service2.resumeRunContinuation(with: .success(())) + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that the remaining two are still running + service1.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // Waiting to see that the remaining is still running + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + } + } + + func testRun_whenAddedServiceThrows() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1)], + gracefulShutdownSignals: [.sigalrm] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var service1EventIterator = service1.events.makeAsyncIterator() + var service2EventIterator = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await service1EventIterator.next(), .run) + await serviceGroup.addServiceUnlessShutdown(service2) + + await XCTAsyncAssertEqual(await service2EventIterator.next(), .run) + + // Throwing from service2 here and expect that service1 gets cancelled + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + await XCTAsyncAssertEqual(await service1EventIterator.next(), .runCancelled) + await service1.resumeRunContinuation(with: .success(())) + + try await XCTAsyncAssertThrowsError(await group.next()) { + XCTAssertTrue($0 is ExampleError) + } + } + } + + func testGracefulShutdownOrdering_withAddedServices() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1)], + gracefulShutdownSignals: [.sigalrm] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.addServiceUnlessShutdown(service2) + await serviceGroup.addServiceUnlessShutdown(service3) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + let pid = getpid() + kill(pid, UnixSignal.sigalrm.rawValue) // ignore-unacceptable-language + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the middle service + await service2.resumeRunContinuation(with: .success(())) + + // The first service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + } + } + + func testGracefulShutdownOrdering_whenAddedServiceExits() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1)], + gracefulShutdownSignals: [.sigalrm] + ) + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + await serviceGroup.addServiceUnlessShutdown(service2) + await serviceGroup.addServiceUnlessShutdown(service3) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + let pid = getpid() + kill(pid, UnixSignal.sigalrm.rawValue) // ignore-unacceptable-language + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + + // The middle service should now receive a cancellation + await XCTAsyncAssertEqual(await eventIterator2.next(), .runCancelled) + + // Let's exit from the first service + await service2.resumeRunContinuation(with: .success(())) + } + } + + // MARK: - Helpers + + private func makeServiceGroup( + services: [ServiceGroupConfiguration.ServiceConfiguration] = [], + gracefulShutdownSignals: [UnixSignal] = .init(), + cancellationSignals: [UnixSignal] = .init(), + maximumGracefulShutdownDuration: Duration? = nil, + maximumCancellationDuration: Duration? = .seconds(5) + ) -> ServiceGroup { + var logger = Logger(label: "Tests") + logger.logLevel = .debug + + var configuration = ServiceGroupConfiguration( + services: services, + gracefulShutdownSignals: gracefulShutdownSignals, + cancellationSignals: cancellationSignals, + logger: logger + ) + configuration.maximumGracefulShutdownDuration = maximumGracefulShutdownDuration + configuration.maximumCancellationDuration = maximumCancellationDuration + return .init( + configuration: configuration + ) + } +} diff --git a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift index b536d2a..0120251 100644 --- a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift +++ b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift @@ -19,73 +19,6 @@ import XCTest private struct ExampleError: Error, Hashable {} -private actor MockService: Service, CustomStringConvertible { - enum Event { - case run - case runPing - case runCancelled - case shutdownGracefully - } - - let events: AsyncStream - - private let eventsContinuation: AsyncStream.Continuation - - private var runContinuation: CheckedContinuation? - - nonisolated let description: String - - private let pings: AsyncStream - private nonisolated let pingContinuation: AsyncStream.Continuation - - init( - description: String - ) { - var eventsContinuation: AsyncStream.Continuation! - self.events = AsyncStream { eventsContinuation = $0 } - self.eventsContinuation = eventsContinuation! - - var pingContinuation: AsyncStream.Continuation! - self.pings = AsyncStream { pingContinuation = $0 } - self.pingContinuation = pingContinuation! - - self.description = description - } - - func run() async throws { - try await withTaskCancellationHandler { - try await withGracefulShutdownHandler { - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - self.eventsContinuation.yield(.run) - for await _ in self.pings { - self.eventsContinuation.yield(.runPing) - } - } - - try await withCheckedThrowingContinuation { - self.runContinuation = $0 - } - - group.cancelAll() - } - } onGracefulShutdown: { - self.eventsContinuation.yield(.shutdownGracefully) - } - } onCancel: { - self.eventsContinuation.yield(.runCancelled) - } - } - - func resumeRunContinuation(with result: Result) { - self.runContinuation?.resume(with: result) - } - - nonisolated func sendPing() { - self.pingContinuation.yield() - } -} - @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) final class ServiceGroupTests: XCTestCase { func testRun_whenAlreadyRunning() async throws {