-
Notifications
You must be signed in to change notification settings - Fork 113
/
Copy pathPoolTests.swift
150 lines (124 loc) · 4.28 KB
/
PoolTests.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Testing
@testable import AWSLambdaRuntime
struct PoolTests {
@Test
func testBasicPushAndIteration() async throws {
let pool = LambdaHTTPServer.Pool<String>()
// Push values
await pool.push("first")
await pool.push("second")
// Iterate and verify order
var values = [String]()
for try await value in pool {
values.append(value)
if values.count == 2 { break }
}
#expect(values == ["first", "second"])
}
@Test
func testCancellation() async throws {
let pool = LambdaHTTPServer.Pool<String>()
// Create a task that will be cancelled
let task = Task {
for try await _ in pool {
Issue.record("Should not receive any values after cancellation")
}
}
// Cancel the task immediately
task.cancel()
// This should complete without receiving any values
try await task.value
}
@Test
func testConcurrentPushAndIteration() async throws {
let pool = LambdaHTTPServer.Pool<Int>()
let iterations = 1000
var receivedValues = Set<Int>()
// Start consumer task first
let consumer = Task {
var count = 0
for try await value in pool {
receivedValues.insert(value)
count += 1
if count >= iterations { break }
}
}
// Create multiple producer tasks
try await withThrowingTaskGroup(of: Void.self) { group in
for i in 0..<iterations {
group.addTask {
await pool.push(i)
}
}
try await group.waitForAll()
}
// Wait for consumer to complete
try await consumer.value
// Verify all values were received exactly once
#expect(receivedValues.count == iterations)
#expect(Set(0..<iterations) == receivedValues)
}
@Test
func testPushToWaitingConsumer() async throws {
let pool = LambdaHTTPServer.Pool<String>()
let expectedValue = "test value"
// Start a consumer that will wait for a value
let consumer = Task {
for try await value in pool {
#expect(value == expectedValue)
break
}
}
// Give consumer time to start waiting
try await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds
// Push a value
await pool.push(expectedValue)
// Wait for consumer to complete
try await consumer.value
}
@Test
func testStressTest() async throws {
let pool = LambdaHTTPServer.Pool<Int>()
let producerCount = 10
let messagesPerProducer = 1000
var receivedValues = [Int]()
// Start consumer
let consumer = Task {
var count = 0
for try await value in pool {
receivedValues.append(value)
count += 1
if count >= producerCount * messagesPerProducer { break }
}
}
// Create multiple producers
try await withThrowingTaskGroup(of: Void.self) { group in
for p in 0..<producerCount {
group.addTask {
for i in 0..<messagesPerProducer {
await pool.push(p * messagesPerProducer + i)
}
}
}
try await group.waitForAll()
}
// Wait for consumer to complete
try await consumer.value
// Verify we received all values
#expect(receivedValues.count == producerCount * messagesPerProducer)
#expect(Set(receivedValues).count == producerCount * messagesPerProducer)
}
}