From fe47a7264f38d5153842394eed2758e45f79e005 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Fri, 26 Jan 2024 09:18:27 +0000 Subject: [PATCH 1/2] Fix logic issues during graceful shutdown # Motivation This should fix the remaining issues raised in https://github.com/swift-server/swift-service-lifecycle/pull/166. The problem here was that if a service finished/threw out of order then we were wrongly treating this as if the service that we are currently shutting down finished. # Modification This PR ensures that we use the same `services` array during the graceful shutdown to nil out services that have finished. This way we correctly keep track of any service that finished. Additionally, there was a separate bug where we started to shutdown the next service to early if another service threw and had the termination behaviour of `shutdownGracefully`. # Result No more incorrect shutdown orderings. --- Sources/ServiceLifecycle/ServiceGroup.swift | 30 +- .../ServiceGroupTests.swift | 277 +++++++++++++++++- 2 files changed, 296 insertions(+), 11 deletions(-) diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index 88dfaef..6fa3af7 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -327,7 +327,7 @@ public actor ServiceGroup: Sendable { services[index] = nil do { try await self.shutdownGracefully( - services: services, + services: &services, cancellationTimeoutTask: &cancellationTimeoutTask, group: &group, gracefulShutdownManagers: gracefulShutdownManagers @@ -380,7 +380,7 @@ public actor ServiceGroup: Sendable { do { try await self.shutdownGracefully( - services: services, + services: &services, cancellationTimeoutTask: &cancellationTimeoutTask, group: &group, gracefulShutdownManagers: gracefulShutdownManagers @@ -421,7 +421,7 @@ public actor ServiceGroup: Sendable { ) do { try await self.shutdownGracefully( - services: services, + services: &services, cancellationTimeoutTask: &cancellationTimeoutTask, group: &group, gracefulShutdownManagers: gracefulShutdownManagers @@ -448,7 +448,7 @@ public actor ServiceGroup: Sendable { do { try await self.shutdownGracefully( - services: services, + services: &services, cancellationTimeoutTask: &cancellationTimeoutTask, group: &group, gracefulShutdownManagers: gracefulShutdownManagers @@ -489,7 +489,7 @@ public actor ServiceGroup: Sendable { } private func shutdownGracefully( - services: [ServiceGroupConfiguration.ServiceConfiguration?], + services: inout [ServiceGroupConfiguration.ServiceConfiguration?], cancellationTimeoutTask: inout Task?, group: inout ThrowingTaskGroup, gracefulShutdownManagers: [GracefulShutdownManager] @@ -519,7 +519,7 @@ public actor ServiceGroup: Sendable { self.logger.debug( "Service already finished. Skipping shutdown" ) - continue + continue gracefulShutdownLoop } self.logger.debug( "Triggering graceful shutdown for service", @@ -533,6 +533,7 @@ public actor ServiceGroup: Sendable { while let result = try await group.next() { switch result { case .serviceFinished(let service, let index): + services[index] = nil if group.isCancelled { // The group is cancelled and we expect all services to finish continue gracefulShutdownLoop @@ -561,7 +562,8 @@ public actor ServiceGroup: Sendable { throw ServiceGroupError.serviceFinishedUnexpectedly() } - case .serviceThrew(let service, _, let serviceError): + case .serviceThrew(let service, let index, let serviceError): + services[index] = nil switch service.failureTerminationBehavior.behavior { case .cancelGroup: self.logger.debug( @@ -587,8 +589,15 @@ public actor ServiceGroup: Sendable { error = serviceError } - // We can continue shutting down the next service now - continue gracefulShutdownLoop + if index == gracefulShutdownIndex { + // The service that we were shutting down right now threw. Since it's failure + // behaviour is to shutdown the group we can continue + continue gracefulShutdownLoop + } else { + // Another service threw while we were waiting for a shutdown + // We have to continue the iterating the task group's result + break + } case .ignore: self.logger.debug( @@ -635,7 +644,8 @@ public actor ServiceGroup: Sendable { case .signalSequenceFinished, .gracefulShutdownCaught, .gracefulShutdownFinished: // We just have to tolerate this since signals and parent graceful shutdowns downs can race. - // We are going to continue the + // We are going to continue the result loop since we have to wait for our service + // to finish. break } } diff --git a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift index 1e525f3..ccf562e 100644 --- a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift +++ b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift @@ -943,7 +943,8 @@ final class ServiceGroupTests: XCTestCase { // Let's throw from the second service await service2.resumeRunContinuation(with: .failure(ExampleError())) - // The first service should still be running + // The first service should still be running but seeing a graceful shutdown signal firsts + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) service1.sendPing() await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) @@ -1194,6 +1195,280 @@ final class ServiceGroupTests: XCTestCase { } } + func testTriggerGracefulShutdown_serviceThrows_inOrder_gracefullyShutdownGroup() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1, failureTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service2, failureTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service3, failureTerminationBehavior: .gracefullyShutdownGroup)] + ) + + do { + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + // The final service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + + try await group.waitForAll() + } + + XCTFail("Expected error not thrown") + } catch is ExampleError { + // expected error + } + } + + func testTriggerGracefulShutdown_serviceThrows_inOrder_ignore() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1, failureTerminationBehavior: .ignore), + .init(service: service2, failureTerminationBehavior: .ignore), + .init(service: service3, failureTerminationBehavior: .ignore)] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .failure(ExampleError())) + + // The final service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully) + + // Waiting to see that the one remaining are still running + service1.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + + // Let's exit from the first service + await service1.resumeRunContinuation(with: .success(())) + + try await group.waitForAll() + } + } + + func testTriggerGracefulShutdown_serviceThrows_outOfOrder_gracefullyShutdownGroup() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1, failureTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service2, failureTerminationBehavior: .gracefullyShutdownGroup), + .init(service: service3, failureTerminationBehavior: .gracefullyShutdownGroup)] + ) + + do { + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the first service (even though the second service + // is gracefully shutting down) + await service1.resumeRunContinuation(with: .failure(ExampleError())) + + // Waiting to see that the one remaining are still running + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .success(())) + + // The first service shutdown will be skipped + try await group.waitForAll() + } + + XCTFail("Expected error not thrown") + } catch is ExampleError { + // expected error + } + } + + func testTriggerGracefulShutdown_serviceThrows_outOfOrder_ignore() async throws { + let service1 = MockService(description: "Service1") + let service2 = MockService(description: "Service2") + let service3 = MockService(description: "Service3") + let serviceGroup = self.makeServiceGroup( + services: [.init(service: service1, failureTerminationBehavior: .ignore), + .init(service: service2, failureTerminationBehavior: .ignore), + .init(service: service3, failureTerminationBehavior: .ignore)] + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await serviceGroup.run() + } + + var eventIterator1 = service1.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator1.next(), .run) + + var eventIterator2 = service2.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator2.next(), .run) + + var eventIterator3 = service3.events.makeAsyncIterator() + await XCTAsyncAssertEqual(await eventIterator3.next(), .run) + + await serviceGroup.triggerGracefulShutdown() + + // The last service should receive the shutdown signal first + await XCTAsyncAssertEqual(await eventIterator3.next(), .shutdownGracefully) + + // Waiting to see that all three are still running + service1.sendPing() + service2.sendPing() + service3.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator3.next(), .runPing) + + // Let's exit from the last service + await service3.resumeRunContinuation(with: .success(())) + + // The middle service should now receive the signal + await XCTAsyncAssertEqual(await eventIterator2.next(), .shutdownGracefully) + + // Waiting to see that the two remaining are still running + service1.sendPing() + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing) + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the first service (even though the second service + // is gracefully shutting down) + await service1.resumeRunContinuation(with: .failure(ExampleError())) + + // Waiting to see that the one remaining are still running + service2.sendPing() + await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing) + + // Let's exit from the second service + await service2.resumeRunContinuation(with: .success(())) + + // The first service shutdown will be skipped + try await group.waitForAll() + } + } + // MARK: - Helpers private func makeServiceGroup( From 5a35085abc08e7125e42ed36cc86de9dbfc06270 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Fri, 26 Jan 2024 11:42:55 +0000 Subject: [PATCH 2/2] Fix one more race condition --- Sources/ServiceLifecycle/ServiceGroup.swift | 55 +++++++++++++------ .../ServiceGroupTests.swift | 3 + 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/Sources/ServiceLifecycle/ServiceGroup.swift b/Sources/ServiceLifecycle/ServiceGroup.swift index 6fa3af7..8644648 100644 --- a/Sources/ServiceLifecycle/ServiceGroup.swift +++ b/Sources/ServiceLifecycle/ServiceGroup.swift @@ -577,14 +577,6 @@ public actor ServiceGroup: Sendable { throw serviceError case .gracefullyShutdownGroup: - self.logger.debug( - "Service threw error during graceful shutdown.", - metadata: [ - self.loggingConfiguration.keys.serviceKey: "\(service.service)", - self.loggingConfiguration.keys.errorKey: "\(serviceError)", - ] - ) - if error == nil { error = serviceError } @@ -592,24 +584,51 @@ public actor ServiceGroup: Sendable { if index == gracefulShutdownIndex { // The service that we were shutting down right now threw. Since it's failure // behaviour is to shutdown the group we can continue + self.logger.debug( + "The service that we were shutting down threw. Continuing with the next one.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + self.loggingConfiguration.keys.errorKey: "\(serviceError)", + ] + ) continue gracefulShutdownLoop } else { // Another service threw while we were waiting for a shutdown // We have to continue the iterating the task group's result + self.logger.debug( + "Another service than the service that we were shutting down threw. Continuing with the next one.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + self.loggingConfiguration.keys.errorKey: "\(serviceError)", + ] + ) break } case .ignore: - self.logger.debug( - "Service threw error during graceful shutdown.", - metadata: [ - self.loggingConfiguration.keys.serviceKey: "\(service.service)", - self.loggingConfiguration.keys.errorKey: "\(serviceError)", - ] - ) - - // We can continue shutting down the next service now - continue gracefulShutdownLoop + if index == gracefulShutdownIndex { + // The service that we were shutting down right now threw. Since it's failure + // behaviour is to shutdown the group we can continue + self.logger.debug( + "The service that we were shutting down threw. Continuing with the next one.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + self.loggingConfiguration.keys.errorKey: "\(serviceError)", + ] + ) + continue gracefulShutdownLoop + } else { + // Another service threw while we were waiting for a shutdown + // We have to continue the iterating the task group's result + self.logger.debug( + "Another service than the service that we were shutting down threw. Continuing with the next one.", + metadata: [ + self.loggingConfiguration.keys.serviceKey: "\(service.service)", + self.loggingConfiguration.keys.errorKey: "\(serviceError)", + ] + ) + break + } } case .signalCaught(let signal): diff --git a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift index ccf562e..e603d7f 100644 --- a/Tests/ServiceLifecycleTests/ServiceGroupTests.swift +++ b/Tests/ServiceLifecycleTests/ServiceGroupTests.swift @@ -1457,6 +1457,9 @@ final class ServiceGroupTests: XCTestCase { // is gracefully shutting down) await service1.resumeRunContinuation(with: .failure(ExampleError())) + // We are sleeping here for a tiny bit to make sure the error is handled from the service 1 + try await Task.sleep(for: .seconds(0.05)) + // Waiting to see that the one remaining are still running service2.sendPing() await XCTAsyncAssertEqual(await eventIterator2.next(), .runPing)