Skip to content

Fix StreamGroup.broadcast() close() not completing when streams close. #876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 10, 2025

Conversation

suojae
Copy link
Contributor

@suojae suojae commented Apr 2, 2025

Fix StreamGroup.broadcast() close() not completing when streams close.

Description

This PR fixes issue #372 where StreamGroup.close() never completes when streams in the group close without being explicitly removed.

The documentation states:

[stream] won't close until [close] is called on the group and every stream in the group closes.

However, with the current implementation, when a broadcast StreamGroup is used and close() is called, it will never complete unless streams are explicitly removed from the group.

Changes

  • Added a test case that demonstrates the issue: "completes close() when streams close without being removed"
  • Fixed the implementation of close() in StreamGroup to properly detect and handle broadcast streams closing
  • The fix ensures that when all streams in a group close, the group's close() method completes as documented

Testing

The added test case now passes, and all existing tests continue to pass.

@suojae suojae requested a review from a team as a code owner April 3, 2025 17:30
@suojae suojae force-pushed the fix-streamgroup-close-372 branch from 41e94a6 to f1283f7 Compare April 3, 2025 17:30
@suojae suojae changed the title Fix #372: StreamGroup close() completes when streams close without be… Fix StreamGroup.broadcast() close() not completing when streams close. Apr 3, 2025
Copy link

github-actions bot commented Apr 3, 2025

PR Health

Breaking changes ✔️
Package Change Current Version New Version Needed Version Looking good?
async None 2.13.0 2.13.1-wip 2.13.0 ✔️
Changelog Entry ✔️
Package Changed Files

Changes to files need to be accounted for in their respective changelogs.

Coverage ⚠️
File Coverage
pkgs/async/lib/src/stream_group.dart 💔 93 % ⬇️ 2 %

This check for test coverage is informational (issues shown here will not fail the PR).

This check can be disabled by tagging the PR with skip-coverage-check.

API leaks ✔️

The following packages contain symbols visible in the public API, but not exported by the library. Export these symbols or remove them from your publicly visible API.

Package Leaked API symbols
License Headers ✔️
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
Files
no missing headers

All source files should start with a license header.

// For a broadcast group that's closed, we must listen to streams with
// null subscriptions to detect when they complete. This ensures the
// group itself can close once all its streams have closed.
final streamsToRemove = <Stream<T>>[];
Copy link
Member

@lrhn lrhn Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a listen throwing is likely rare, I'd prefer to not allocate this list unless it's actually needed.
It's wasteful to allocate a list that won't be used in 99.9% of cases.

So

  List<Stream<T>>? streamsToRemove;
  ... updateAll...
    on Object {
      (streamsToRemove ??= []).add(stream);
      return null;
    }
...
  if (streamsToRemove != null) {
    for (final stream in streamsToRemove) {
      ...
   }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've implemented the suggested optimization to avoid unnecessary list allocation.
Changed the code to use a nullable list that's only initialized when needed and the null-safe call to forEach ensures we only iterate when a list was actually created.

commit log: cd2faf2

@@ -1,6 +1,7 @@
## 2.13.0

- Fix type check and cast in SubscriptionStream's cancelOnError wrapper
- Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed.
Copy link
Member

@lrhn lrhn Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to a

## 2.14.0-wip

entry, and update the pubspec.yaml version to match.
The 2.13.0 version has been released, so this fix won't be in it.
Can probably be a 2.13.1-wip "bugfix". @natebosch WDYT?

Copy link
Contributor Author

@suojae suojae Apr 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to 2.13.1-wip, Let me know if there are any other changes needed. Thanks! ☺️

commit log: 032a24b

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's looking good. It needs a second reviewer, so we can have a second opinion.

@suojae suojae force-pushed the fix-streamgroup-close-372 branch from afd5e4f to 032a24b Compare April 5, 2025 18:05
@lrhn lrhn requested a review from natebosch April 8, 2025 09:02
@lrhn
Copy link
Member

lrhn commented Apr 9, 2025

Pubspec version and changelog version do not agree. Both should be 2.13.1-wip.

@suojae
Copy link
Contributor Author

suojae commented Apr 10, 2025

Pubspec version and changelog version do not agree. Both should be 2.13.1-wip.

Fix version mismatch - update pubspec to 2.13.1-wip to match changelog. Thanks!

commit log: ef32af5

@lrhn lrhn merged commit e32c7b9 into dart-lang:main Apr 10, 2025
14 checks passed
@lrhn
Copy link
Member

lrhn commented Apr 10, 2025

Landed. Thank you for all the work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants