From 780ef83027866b5c00c63abe4caa3f6d579a1789 Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Thu, 23 Nov 2023 10:38:59 +0000 Subject: [PATCH 1/3] Add debug logging to AsyncBackpressuredStream --- .../AsyncBackpressuredStream.swift | 58 ++++++++++++++- .../AsyncBackpressuredStreamTests.swift | 73 +++++++++++++++++++ 2 files changed, 127 insertions(+), 4 deletions(-) diff --git a/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift b/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift index 6aa7cd8..0867e11 100644 --- a/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift +++ b/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift @@ -346,7 +346,20 @@ extension AsyncBackpressuredStream { } func write(contentsOf sequence: S) throws -> Source.WriteResult where S.Element == Element { - let action = self.lock.withLock { return self.stateMachine.write(sequence) } + let action = self.lock.withLock { + let stateBefore = self.stateMachine.state + let action = self.stateMachine.write(sequence) + let stateAfter = self.stateMachine.state + debug(""" + --- + event: write + state before: \(stateBefore) + state after: \(stateAfter) + action: \(action) + --- + """) + return action + } switch action { case .returnProduceMore: return .produceMore @@ -385,7 +398,18 @@ extension AsyncBackpressuredStream { onProduceMore: @escaping @Sendable (Result) -> Void ) { let action = self.lock.withLock { - return self.stateMachine.enqueueProducer(writeToken: writeToken, onProduceMore: onProduceMore) + let stateBefore = self.stateMachine.state + let action = self.stateMachine.enqueueProducer(writeToken: writeToken, onProduceMore: onProduceMore) + let stateAfter = self.stateMachine.state + debug(""" + --- + event: \(#function) + state before: \(stateBefore) + state after: \(stateAfter) + action: \(action) + --- + """) + return action } switch action { @@ -449,7 +473,20 @@ extension AsyncBackpressuredStream { } func next() async throws -> Element? { - let action = self.lock.withLock { return self.stateMachine.next() } + let action = self.lock.withLock { + let stateBefore = self.stateMachine.state + let action = self.stateMachine.next() + let stateAfter = self.stateMachine.state + debug(""" + --- + event: next + state before: \(stateBefore) + state after: \(stateAfter) + action: \(action) + --- + """) + return action + } switch action { case .returnElement(let element): return element @@ -476,7 +513,20 @@ extension AsyncBackpressuredStream { func suspendNext() async throws -> Element? { return try await withTaskCancellationHandler { return try await withCheckedThrowingContinuation { continuation in - let action = self.lock.withLock { return self.stateMachine.suspendNext(continuation: continuation) } + let action = self.lock.withLock { + let stateBefore = self.stateMachine.state + let action = self.stateMachine.suspendNext(continuation: continuation) + let stateAfter = self.stateMachine.state + debug(""" + --- + event: \(#function) + state before: \(stateBefore) + state after: \(stateAfter) + action: \(action) + --- + """) + return action + } switch action { case .resumeContinuationWithElement(let continuation, let element): diff --git a/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift b/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift index bdb474a..ffa52e9 100644 --- a/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift +++ b/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift @@ -198,6 +198,42 @@ final class AsyncBackpressuredStreamTests: XCTestCase { XCTAssertEqual(strategy.didConsume(elements: Slice([])), true) XCTAssertEqual(strategy.currentWatermark, 0) } + +extension AsyncBackpressuredStream.Source.WriteResult: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .enqueueCallback: return "enqueueCallBack" + case .produceMore: return "produceMore" + } + } +} + +extension AsyncBackpressuredStream.StateMachine.SuspendNextAction: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .none: return "none" + case .resumeContinuationWithElement: return "resumeContinuationWithElement" + case .resumeContinuationWithElementAndProducers: return "resumeContinuationWithElementAndProducers" + case .resumeContinuationWithFailureAndCallOnTerminate: return "resumeContinuationWithFailureAndCallOnTerminate" + case .resumeContinuationWithNil: return "resumeContinuationWithNil" + } + } +} + +extension AsyncBackpressuredStream.StateMachine.State: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .initial: return "initial" + case .streaming(_, let buffer, let consumer, let producers, _, let demand, _, _): + return + "streaming(buffer.count: \(buffer.count), consumer: \(consumer != nil ? "yes" : "no"), producers: \(producers), demand: \(demand))" + case .finished: return "finished" + case .sourceFinished: return "sourceFinished" + } + } } extension AsyncSequence { @@ -206,3 +242,40 @@ extension AsyncSequence { try await self.reduce(into: []) { accumulated, next in accumulated.append(next) } } } + +extension AsyncBackpressuredStream.StateMachine.NextAction: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .returnNil: return "returnNil" + case .returnElementAndResumeProducers: return "returnElementAndResumeProducers" + case .returnFailureAndCallOnTerminate: return "returnFailureAndCallOnTerminate" + case .returnElement: return "returnElement" + case .suspendTask: return "suspendTask" + } + } +} + +extension AsyncBackpressuredStream.StateMachine.WriteAction: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .returnProduceMore: return "returnProduceMore" + case .returnEnqueue: return "returnEnqueue" + case .resumeConsumerContinuationAndReturnProduceMore: return "resumeConsumerContinuationAndReturnProduceMore" + case .resumeConsumerContinuationAndReturnEnqueue: return "resumeConsumerContinuationAndReturnEnqueue" + case .throwFinishedError: return "throwFinishedError" + } + } +} + +extension AsyncBackpressuredStream.StateMachine.EnqueueProducerAction: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .resumeProducer: return "resumeProducer" + case .resumeProducerWithCancellationError: return "resumeProducerWithCancellationError" + case .none: return "none" + } + } +} From 8fb401068e623f92f7bf55bf8505fd0cb7d1aafd Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Thu, 23 Nov 2023 10:40:19 +0000 Subject: [PATCH 2/3] Add tests that catch failure in state machine --- .../AsyncBackpressuredStreamTests.swift | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift b/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift index ffa52e9..f51f656 100644 --- a/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift +++ b/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift @@ -199,6 +199,84 @@ final class AsyncBackpressuredStreamTests: XCTestCase { XCTAssertEqual(strategy.currentWatermark, 0) } + func testWritingOverWatermark() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + let (stream, continuation) = AsyncBackpressuredStream + .makeStream(backPressureStrategy: .highLowWatermark(lowWatermark: 1, highWatermark: 1)) + + group.addTask { + for i in 1...10 { + debug("Producer writing element \(i)...") + let writeResult = try continuation.write(contentsOf: CollectionOfOne(i)) + debug("Producer wrote element \(i), result = \(writeResult)") + // ignore backpressure result and write again anyway + } + debug("Producer finished") + continuation.finish(throwing: nil) + } + + var iterator = stream.makeAsyncIterator() + var numElementsConsumed = 0 + var expectedNextValue = 1 + while true { + debug("Consumer reading element...") + guard let element = try await iterator.next() else { break } + XCTAssertEqual(element, expectedNextValue) + debug("Consumer read element: \(element), expected: \(expectedNextValue)") + numElementsConsumed += 1 + expectedNextValue += 1 + } + XCTAssertEqual(numElementsConsumed, 10) + + group.cancelAll() + } + } + + func testStateMachineSuspendNext() async throws { + typealias Stream = AsyncBackpressuredStream + + var strategy = Stream.InternalBackPressureStrategy.highLowWatermark(.init(lowWatermark: 1, highWatermark: 1)) + _ = strategy.didYield(elements: Slice([1, 2, 3])) + var stateMachine = Stream.StateMachine(backPressureStrategy: strategy, onTerminate: nil) + stateMachine.state = .streaming( + backPressureStrategy: strategy, + buffer: [1, 2, 3], + consumerContinuation: nil, + producerContinuations: [], + cancelledAsyncProducers: [], + hasOutstandingDemand: false, + iteratorInitialized: true, + onTerminate: nil + ) + + guard case .streaming(_, let buffer, let consumerContinuation, _, _, _, _, _) = stateMachine.state else { + XCTFail("Unexpected state: \(stateMachine.state)") + return + } + XCTAssertEqual(buffer, [1, 2, 3]) + XCTAssertNil(consumerContinuation) + + _ = try await withCheckedThrowingContinuation { continuation in + let action = stateMachine.suspendNext(continuation: continuation) + + guard case .resumeContinuationWithElement(_, let element) = action else { + XCTFail("Unexpected action: \(action)") + return + } + XCTAssertEqual(element, 1) + + guard case .streaming(_, let buffer, let consumerContinuation, _, _, _, _, _) = stateMachine.state else { + XCTFail("Unexpected state: \(stateMachine.state)") + return + } + XCTAssertEqual(buffer, [2, 3]) + XCTAssertNil(consumerContinuation) + + continuation.resume(returning: element) + } + } +} + extension AsyncBackpressuredStream.Source.WriteResult: CustomStringConvertible { // swift-format-ignore: AllPublicDeclarationsHaveDocumentation public var description: String { From a768a21b6d09171cf26c481258cc86f93df3298c Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Thu, 23 Nov 2023 10:40:56 +0000 Subject: [PATCH 3/3] Add missing state transition before resuming consumer continuation --- .../AsyncBackpressuredStream.swift | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift b/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift index 0867e11..2c792b4 100644 --- a/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift +++ b/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift @@ -1289,6 +1289,16 @@ extension AsyncBackpressuredStream { guard shouldProduceMore else { // We don't have any new demand, so we can just return the element. + self.state = .streaming( + backPressureStrategy: backPressureStrategy, + buffer: buffer, + consumerContinuation: nil, + producerContinuations: producerContinuations, + cancelledAsyncProducers: cancelledAsyncProducers, + hasOutstandingDemand: hasOutstandingDemand, + iteratorInitialized: iteratorInitialized, + onTerminate: onTerminate + ) return .resumeContinuationWithElement(continuation, element) } let producers = Array(producerContinuations.map { $0.1 })