diff --git a/Package@swift-6.0.swift b/Package@swift-6.0.swift index fb4e82d0..a10bb0b8 100644 --- a/Package@swift-6.0.swift +++ b/Package@swift-6.0.swift @@ -56,6 +56,11 @@ let package = Package( .byName(name: "AWSLambdaRuntime"), .product(name: "NIOTestUtils", package: "swift-nio"), .product(name: "NIOFoundationCompat", package: "swift-nio"), + ], + swiftSettings: [ + .define("FoundationJSONSupport"), + .define("ServiceLifecycleSupport"), + .define("LocalServerSupport"), ] ), // for perf testing diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 4d85f7b2..d959a776 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftAWSLambdaRuntime open source project // -// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -75,7 +75,7 @@ extension Lambda { /// 1. POST /invoke - the client posts the event to the lambda function /// /// This server passes the data received from /invoke POST request to the lambda function (GET /next) and then forwards the response back to the client. -private struct LambdaHTTPServer { +internal struct LambdaHTTPServer { private let invocationEndpoint: String private let invocationPool = Pool() @@ -425,7 +425,7 @@ private struct LambdaHTTPServer { /// A shared data structure to store the current invocation or response requests and the continuation objects. /// This data structure is shared between instances of the HTTPHandler /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). - private final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { + internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { typealias Element = T enum State: ~Copyable { diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift new file mode 100644 index 00000000..a5def86d --- /dev/null +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -0,0 +1,150 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Testing + +@testable import AWSLambdaRuntime + +struct PoolTests { + + @Test + func testBasicPushAndIteration() async throws { + let pool = LambdaHTTPServer.Pool() + + // Push values + await pool.push("first") + await pool.push("second") + + // Iterate and verify order + var values = [String]() + for try await value in pool { + values.append(value) + if values.count == 2 { break } + } + + #expect(values == ["first", "second"]) + } + + @Test + func testCancellation() async throws { + let pool = LambdaHTTPServer.Pool() + + // Create a task that will be cancelled + let task = Task { + for try await _ in pool { + Issue.record("Should not receive any values after cancellation") + } + } + + // Cancel the task immediately + task.cancel() + + // This should complete without receiving any values + try await task.value + } + + @Test + func testConcurrentPushAndIteration() async throws { + let pool = LambdaHTTPServer.Pool() + let iterations = 1000 + var receivedValues = Set() + + // Start consumer task first + let consumer = Task { + var count = 0 + for try await value in pool { + receivedValues.insert(value) + count += 1 + if count >= iterations { break } + } + } + + // Create multiple producer tasks + try await withThrowingTaskGroup(of: Void.self) { group in + for i in 0..() + let expectedValue = "test value" + + // Start a consumer that will wait for a value + let consumer = Task { + for try await value in pool { + #expect(value == expectedValue) + break + } + } + + // Give consumer time to start waiting + try await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds + + // Push a value + await pool.push(expectedValue) + + // Wait for consumer to complete + try await consumer.value + } + + @Test + func testStressTest() async throws { + let pool = LambdaHTTPServer.Pool() + let producerCount = 10 + let messagesPerProducer = 1000 + var receivedValues = [Int]() + + // Start consumer + let consumer = Task { + var count = 0 + for try await value in pool { + receivedValues.append(value) + count += 1 + if count >= producerCount * messagesPerProducer { break } + } + } + + // Create multiple producers + try await withThrowingTaskGroup(of: Void.self) { group in + for p in 0..