Skip to content

Commit 90030eb

Browse files
committed
Fix: StreamGroup close() completes when streams close without being removed (dart-lang#372)
1 parent 379e9c2 commit 90030eb

File tree

3 files changed

+34
-1
lines changed

3 files changed

+34
-1
lines changed

pkgs/async/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## 2.13.0
22

33
- Fix type check and cast in SubscriptionStream's cancelOnError wrapper
4+
- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed.
45

56
## 2.12.0
67

pkgs/async/lib/src/stream_group.dart

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,23 @@ class StreamGroup<T> implements Sink<Stream<T>> {
289289
if (_closed) return _controller.done;
290290

291291
_closed = true;
292-
if (_subscriptions.isEmpty) _controller.close();
292+
293+
if (_subscriptions.isEmpty) {
294+
_onIdleController?.add(null);
295+
_onIdleController?.close();
296+
_controller.close();
297+
return _controller.done;
298+
}
299+
300+
if (_controller.stream.isBroadcast) {
301+
for (var entry in _subscriptions.entries.where((e) => e.value == null)) {
302+
try {
303+
_subscriptions[entry.key] = _listenToStream(entry.key);
304+
} catch (_) {
305+
_subscriptions.remove(entry.key);
306+
}
307+
}
308+
}
293309

294310
return _controller.done;
295311
}

pkgs/async/test/stream_group_test.dart

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,22 @@ void main() {
491491
controller.add('first');
492492
expect(streamGroup.close(), completes);
493493
});
494+
495+
test('completes close() when streams close without being removed',
496+
() async {
497+
var controller = StreamController.broadcast();
498+
var group = StreamGroup.broadcast();
499+
group.add(controller.stream);
500+
var closeCompleted = false;
501+
group.close().then((_) => closeCompleted = true);
502+
503+
await flushMicrotasks();
504+
expect(closeCompleted, isFalse);
505+
506+
await controller.close();
507+
await flushMicrotasks();
508+
expect(closeCompleted, isTrue);
509+
});
494510
});
495511

496512
group('regardless of type', () {

0 commit comments

Comments
 (0)