Skip to content

_AsyncStarStreamController does not propagate cancelation in some cases #41693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
mjeffryes opened this issue Apr 28, 2020 · 2 comments
Open
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends.

Comments

@mjeffryes
Copy link

We recently discovered a bug in our flutter app that boiled down to a difference in cancelation behavior between an async* function and it's equivalent filter + map expression.

Our code uses stream.firstWhere to wait for a desired value from a filtered+transformed stream. When the filter and transformation is implemented with the where and map methods on stream, this works as expected, but if we replace the these methods with an async* function, the cancellation from firstWhere does not propagate upstream, causing firstWhere to block indefinitely. (it appears that new values that make it through the filter may trigger the cancellation, but in our production environment, such events can be rare.)

My apologies if this is a known issue or expected behavior, but I did some searching and could not find an open issue or documentation to explain the difference so here I am.

Sample code to reproduce:

import 'dart:async';

// mock source stream
Stream<int> _source() {
  final sc = StreamController<int>();
  sc.onListen = () {
    // In production this claims resources and may produce multiple values
    sc.add(3);
  };
  sc.onCancel = () {
    print("onCancel called");
    sc.close();
    // In production this frees the resources claimed above
  };

  return sc.stream;
}

// bad
Stream<int> asyncStar() async* { 
  await for (var a in _source()) {
    if (a % 3 == 0) {
      yield a + 1;
    }
  }
}

// good
Stream<int> filterMap() {
  return _source().where((a) => a % 3 == 0).map((a) => a + 1);
}

Future<void> theTest(Stream<int> s) async {
  final out = await s.firstWhere((i) => i % 2 == 0);
  print("SUCCESS: $out");
}

void main() async {
  print("--------------------");
  print("filterMap:");
  await theTest(filterMap());
  print("--------------------");
  print("asyncStar:");
  await theTest(asyncStar()); // hangs
  print("--------------------"); // never get here
}

Example of running the script above:

▶ dart --version
Dart VM version: 2.7.1 (Thu Jan 23 13:02:26 2020 +0100) on "macos_x64"
▶ dart repro.dart
--------------------
filterMap:
onCancel called
SUCCESS: 4
--------------------
asyncStar:
@mraleph mraleph added the area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. label Apr 29, 2020
@mraleph
Copy link
Member

mraleph commented Apr 29, 2020

@lrhn Lasse could you make a call what is the expected behaviour here and whether it is a VM bug?

@lrhn
Copy link
Member

lrhn commented Apr 10, 2021

Just found this issue. It's probably still a problem in the VM (#34775).
I think it's a symptom of the implementation of async* not following the spec.

The async* function yields 3;, then continues running without waiting to see if it's been cancelled. It should deliver the value synchronously, then check for pause/cancel immediately on returning.

Instead it just goes on running and won't check that again until reaching the next yield. Since _stream() only emits one event, and hasn't been cancelled, the await for (var a in _source()) { stalls and never gets to the next yield.

And so the cancel from firstWhere gets stuck waiting for subscription.cancel() on the asyncStar() stream's subscription to complete, which it won't because the function is stuck, and the cancel of an async* function won't complete until the function body terminates (that's to allow it to run finally blocks, but it also allows it to get stuck on any await or await for which doesn't complete).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends.
Projects
None yet
Development

No branches or pull requests

3 participants