Skip to content

Commit 8f0a07e

Browse files
Merge pull request #102 from d-exclaimation/leak-fix
Fixing issues with ConcurrentEventStream from #101
2 parents 3071750 + a99f92a commit 8f0a07e

File tree

2 files changed

+43
-11
lines changed

2 files changed

+43
-11
lines changed

Sources/GraphQL/Subscription/EventStream.swift

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,42 @@ public class ConcurrentEventStream<Element>: EventStream<Element> {
3131
extension AsyncThrowingStream {
3232
func mapStream<To>(_ closure: @escaping (Element) throws -> To) -> AsyncThrowingStream<To, Error> {
3333
return AsyncThrowingStream<To, Error> { continuation in
34-
Task {
35-
for try await event in self {
36-
let newEvent = try closure(event)
37-
continuation.yield(newEvent)
34+
let task = Task {
35+
do {
36+
for try await event in self {
37+
let newEvent = try closure(event)
38+
continuation.yield(newEvent)
39+
}
40+
continuation.finish()
41+
} catch {
42+
continuation.finish(throwing: error)
3843
}
3944
}
45+
46+
continuation.onTermination = { @Sendable reason in
47+
task.cancel()
48+
}
4049
}
4150
}
4251

4352
func filterStream(_ isIncluded: @escaping (Element) throws -> Bool) -> AsyncThrowingStream<Element, Error> {
4453
return AsyncThrowingStream<Element, Error> { continuation in
45-
Task {
46-
for try await event in self {
47-
if try isIncluded(event) {
48-
continuation.yield(event)
54+
let task = Task {
55+
do {
56+
for try await event in self {
57+
if try isIncluded(event) {
58+
continuation.yield(event)
59+
}
4960
}
61+
continuation.finish()
62+
} catch {
63+
continuation.finish(throwing: error)
5064
}
5165
}
66+
67+
continuation.onTermination = { @Sendable _ in
68+
task.cancel()
69+
}
5270
}
5371
}
5472
}

Tests/GraphQLTests/SubscriptionTests/SubscriptionTests.swift

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -605,12 +605,15 @@ class SubscriptionTests : XCTestCase {
605605

606606
var results = [GraphQLResult]()
607607
var expectation = XCTestExpectation()
608-
_ = stream.map { event in
608+
609+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
610+
let keepForNow = stream.map { event in
609611
event.map { result in
610612
results.append(result)
611613
expectation.fulfill()
612614
}
613615
}
616+
614617
var expected = [GraphQLResult]()
615618

616619
db.trigger(email: Email(
@@ -675,6 +678,9 @@ class SubscriptionTests : XCTestCase {
675678
)
676679
wait(for: [expectation], timeout: timeoutDuration)
677680
XCTAssertEqual(results, expected)
681+
682+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
683+
_ = keepForNow
678684
}
679685

680686
/// 'should not trigger when subscription is already done'
@@ -701,7 +707,8 @@ class SubscriptionTests : XCTestCase {
701707

702708
var results = [GraphQLResult]()
703709
var expectation = XCTestExpectation()
704-
_ = stream.map { event in
710+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
711+
let keepForNow = stream.map { event in
705712
event.map { result in
706713
results.append(result)
707714
expectation.fulfill()
@@ -747,6 +754,9 @@ class SubscriptionTests : XCTestCase {
747754
// Ensure that the current result was the one before the db was stopped
748755
wait(for: [expectation], timeout: timeoutDuration)
749756
XCTAssertEqual(results, expected)
757+
758+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
759+
_ = keepForNow
750760
}
751761

752762
/// 'should not trigger when subscription is thrown'
@@ -861,7 +871,8 @@ class SubscriptionTests : XCTestCase {
861871

862872
var results = [GraphQLResult]()
863873
var expectation = XCTestExpectation()
864-
_ = stream.map { event in
874+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
875+
let keepForNow = stream.map { event in
865876
event.map { result in
866877
results.append(result)
867878
expectation.fulfill()
@@ -925,6 +936,9 @@ class SubscriptionTests : XCTestCase {
925936
)
926937
wait(for: [expectation], timeout: timeoutDuration)
927938
XCTAssertEqual(results, expected)
939+
940+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
941+
_ = keepForNow
928942
}
929943

930944
/// 'should pass through error thrown in source event stream'

0 commit comments

Comments
 (0)