From 90030eb542dcdfd5bd6048008106e32178435977 Mon Sep 17 00:00:00 2001 From: suojae Date: Wed, 2 Apr 2025 12:39:28 +0900 Subject: [PATCH 1/6] Fix: StreamGroup close() completes when streams close without being removed (#372) --- pkgs/async/CHANGELOG.md | 1 + pkgs/async/lib/src/stream_group.dart | 18 +++++++++++++++++- pkgs/async/test/stream_group_test.dart | 16 ++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/pkgs/async/CHANGELOG.md b/pkgs/async/CHANGELOG.md index ca81027fa..e84644f6c 100644 --- a/pkgs/async/CHANGELOG.md +++ b/pkgs/async/CHANGELOG.md @@ -1,6 +1,7 @@ ## 2.13.0 - Fix type check and cast in SubscriptionStream's cancelOnError wrapper +- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed. ## 2.12.0 diff --git a/pkgs/async/lib/src/stream_group.dart b/pkgs/async/lib/src/stream_group.dart index 502a111cc..7867d189b 100644 --- a/pkgs/async/lib/src/stream_group.dart +++ b/pkgs/async/lib/src/stream_group.dart @@ -289,7 +289,23 @@ class StreamGroup implements Sink> { if (_closed) return _controller.done; _closed = true; - if (_subscriptions.isEmpty) _controller.close(); + + if (_subscriptions.isEmpty) { + _onIdleController?.add(null); + _onIdleController?.close(); + _controller.close(); + return _controller.done; + } + + if (_controller.stream.isBroadcast) { + for (var entry in _subscriptions.entries.where((e) => e.value == null)) { + try { + _subscriptions[entry.key] = _listenToStream(entry.key); + } catch (_) { + _subscriptions.remove(entry.key); + } + } + } return _controller.done; } diff --git a/pkgs/async/test/stream_group_test.dart b/pkgs/async/test/stream_group_test.dart index 3700120ef..7b40f6e30 100644 --- a/pkgs/async/test/stream_group_test.dart +++ b/pkgs/async/test/stream_group_test.dart @@ -491,6 +491,22 @@ void main() { controller.add('first'); expect(streamGroup.close(), completes); }); + + test('completes close() when streams close without being removed', + () async { + var controller = StreamController.broadcast(); + var group = StreamGroup.broadcast(); + group.add(controller.stream); + var closeCompleted = false; + group.close().then((_) => closeCompleted = true); + + await flushMicrotasks(); + expect(closeCompleted, isFalse); + + await controller.close(); + await flushMicrotasks(); + expect(closeCompleted, isTrue); + }); }); group('regardless of type', () { From f1283f7fbda113f6e0dee45e8f00b613c44770af Mon Sep 17 00:00:00 2001 From: suojae Date: Fri, 4 Apr 2025 02:24:18 +0900 Subject: [PATCH 2/6] Fix: Remove redundant idle event emission (#372) --- pkgs/async/lib/src/stream_group.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/pkgs/async/lib/src/stream_group.dart b/pkgs/async/lib/src/stream_group.dart index 7867d189b..2dfb892d9 100644 --- a/pkgs/async/lib/src/stream_group.dart +++ b/pkgs/async/lib/src/stream_group.dart @@ -291,7 +291,6 @@ class StreamGroup implements Sink> { _closed = true; if (_subscriptions.isEmpty) { - _onIdleController?.add(null); _onIdleController?.close(); _controller.close(); return _controller.done; From 89dcb4ec11f9c845dff31adf003a118644baad3a Mon Sep 17 00:00:00 2001 From: suojae Date: Fri, 4 Apr 2025 03:02:47 +0900 Subject: [PATCH 3/6] Fix: ConcurrentModificationError and improve broadcast stream handling (#372) --- pkgs/async/lib/src/stream_group.dart | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pkgs/async/lib/src/stream_group.dart b/pkgs/async/lib/src/stream_group.dart index 2dfb892d9..6c4547b57 100644 --- a/pkgs/async/lib/src/stream_group.dart +++ b/pkgs/async/lib/src/stream_group.dart @@ -297,12 +297,24 @@ class StreamGroup implements Sink> { } if (_controller.stream.isBroadcast) { - for (var entry in _subscriptions.entries.where((e) => e.value == null)) { + // For a broadcast group that's closed, we must listen to streams with + // null subscriptions to detect when they complete. This ensures the + // group itself can close once all its streams have closed. + final streamsToRemove = >[]; + + _subscriptions.updateAll((stream, subscription) { + if (subscription != null) return subscription; + try { - _subscriptions[entry.key] = _listenToStream(entry.key); - } catch (_) { - _subscriptions.remove(entry.key); + return _listenToStream(stream); + } on Object { + streamsToRemove.add(stream); + return null; } + }); + + for (final stream in streamsToRemove) { + _subscriptions.remove(stream); } } From cd2faf2ad8369bf5665e1d612b00757bcdd6a14e Mon Sep 17 00:00:00 2001 From: suojae Date: Fri, 4 Apr 2025 20:00:36 +0900 Subject: [PATCH 4/6] Fix: avoid unnecessary list allocation in broadcast close() (#372) --- pkgs/async/lib/src/stream_group.dart | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pkgs/async/lib/src/stream_group.dart b/pkgs/async/lib/src/stream_group.dart index 6c4547b57..79f057f19 100644 --- a/pkgs/async/lib/src/stream_group.dart +++ b/pkgs/async/lib/src/stream_group.dart @@ -300,7 +300,7 @@ class StreamGroup implements Sink> { // For a broadcast group that's closed, we must listen to streams with // null subscriptions to detect when they complete. This ensures the // group itself can close once all its streams have closed. - final streamsToRemove = >[]; + List>? streamsToRemove; _subscriptions.updateAll((stream, subscription) { if (subscription != null) return subscription; @@ -308,14 +308,12 @@ class StreamGroup implements Sink> { try { return _listenToStream(stream); } on Object { - streamsToRemove.add(stream); + (streamsToRemove ??= []).add(stream); return null; } }); - for (final stream in streamsToRemove) { - _subscriptions.remove(stream); - } + streamsToRemove?.forEach(_subscriptions.remove); } return _controller.done; From 032a24b8ea433aeb79dc02cae0dbf94cf4021763 Mon Sep 17 00:00:00 2001 From: suojae Date: Sun, 6 Apr 2025 02:17:36 +0900 Subject: [PATCH 5/6] Move StreamGroup.broadcast() fix to 2.14.0-wip section --- pkgs/async/CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkgs/async/CHANGELOG.md b/pkgs/async/CHANGELOG.md index e84644f6c..1f6d74dac 100644 --- a/pkgs/async/CHANGELOG.md +++ b/pkgs/async/CHANGELOG.md @@ -1,7 +1,10 @@ +## 2.13.1-wip + +- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed. + ## 2.13.0 - Fix type check and cast in SubscriptionStream's cancelOnError wrapper -- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed. ## 2.12.0 From ef32af5fda6099985bc367c4d299c0b5f15e88ce Mon Sep 17 00:00:00 2001 From: suojae Date: Thu, 10 Apr 2025 11:23:25 +0900 Subject: [PATCH 6/6] Fix version mismatch: update pubspec to 2.13.1-wip to match changelog --- pkgs/async/pubspec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/async/pubspec.yaml b/pkgs/async/pubspec.yaml index db2d84f4c..688c14b6e 100644 --- a/pkgs/async/pubspec.yaml +++ b/pkgs/async/pubspec.yaml @@ -1,5 +1,5 @@ name: async -version: 2.13.0 +version: 2.13.1-wip description: Utility functions and classes related to the 'dart:async' library. repository: https://github.com/dart-lang/core/tree/main/pkgs/async issue_tracker: https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync