Skip to content

Incorrect semantics for async* #48695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
mkustermann opened this issue Mar 29, 2022 · 17 comments
Open

Incorrect semantics for async* #48695

mkustermann opened this issue Mar 29, 2022 · 17 comments
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. type-bug Incorrect behavior (everything from a crash to more subtle misbehavior)

Comments

@mkustermann
Copy link
Member

See the following example:

import 'dart:async';

main() async {
  print('1');
  await for (final value in produce()) {
    print('5');
  }
  print('8');
}

Stream<dynamic> produce() async* {
  print('2');
  await for (String response in produceInner()) {
    print('4');
    yield response;
  }
  print('7');
}

Stream produceInner() async* {
  print('3');
  yield 'foo';
  print('6');
}

One would expect the numbers 1 2 3 4 5 6 7 8 be printed in order.

Dart2js does seem to have this semantics.
Though running in the Dart VM yields instead 1 2 3 4 6 5 7 8.

This is due to the fact that after produceInner has yielded it's value, it will not actually yield, but rather continue (with small delay due to enqueing in microtask queue). This logic is in our async_patch.dart:

class _AsyncStreamController<T> ... {
  ...

  // Generated code for `yield <x>` calls this `add(<x>)` method
  bool add(T event) {
    ...
    controller.add(event);
    // ^^^ Forward yielded value to listeners of the stream returned by `async*`. This happens asynchronously
    scheduleGenerator(); 
    // ^^^ Immediately schedule the `async*` function to continue. Why???
    ...
  }

  void scheduleGenerator() {
    if (controller.isPaused || ... ) {
      // ^^^ We are not paused yet: An `await for (...)` / `StreamIterator.moveNext()` hasn't
      // received the value yet, so it hasn't issued the pause signal either.
      return;
    }
    ...
    scheduleMicrotask(runBody);
  }

  void runBody() {
    ...
    asyncStarBody(null, null);
  }
}
@mkustermann mkustermann added area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. type-bug Incorrect behavior (everything from a crash to more subtle misbehavior) labels Mar 29, 2022
@mkustermann
Copy link
Member Author

/cc @lrhn Could you confirm that dart2js semantics (i.e. 1-8) is expected here?

@lrhn
Copy link
Member

lrhn commented Mar 29, 2022

Yes, this is issue #34775 again. It's the root of a number of other issues.

@mkustermann
Copy link
Member Author

@alexmarkov Since you're re-working some async/async*/sync* machinery, maybe you can look into fixing the semantics here as well?

@lrhn
Copy link
Member

lrhn commented Mar 29, 2022

The intended behavior should be achievable by using a sync controller and delivering the event,
then check whether the controller is paused or cancelled. If paused, wait until unpaused or cancelled. If cancelled, act like return;.
Then go back to the async* function body. If neither paused nor cancelled, the body may continue synchronously.

If await for is also implemented correctly, it should be possible for an async* function's stream in an await for with a synchronous body to run continuously, efficiently and correctly without ever pausing.

@mkustermann
Copy link
Member Author

We have the same issue also for yield* <stream>:

import 'dart:async';

main() async {
  await for (final x in produce()) {
    if (x == 1) break;
  }
}

Stream produce() async* {
  yield* produceInner();
  print('should never print this');
}

Stream produceInner() async* {
  yield 1;
  yield 2;
}

@mkustermann
Copy link
Member Author

There's an interesting question what should happen if an async* function is suspended due to an await and before that await returns the consumer cancels the subscription.

Test case:

import 'dart:async';

main() async {
  final asyncValue = Completer();
  final ss = produce().listen((value) {
    asyncValue.complete(value);
  });
  print('Got ${await asyncValue.future}');
  await ss.cancel();
  print('Main done');
}

Stream produce() async* {
  yield 1;
  await Future.delayed(const Duration(seconds: 1));
  print('Should not be printed!');
}

Right now all our implementations would print Should not be printed!.

@lrhn Should the semantics here be similar to the yields, that once the await returns and there's no longer a subscriber we should run finally blocks and exit?

@alexmarkov
Copy link
Contributor

@mkustermann Yes, I can definitely take a look at this bug after I'm done with the new aync/async*/sync* implementation in the VM.

@mkustermann @lrhn Could you please clarify what exactly is broken: await for or async*? Also, could you explain what particular place in the spec is violated and why. Thanks!

@mkustermann
Copy link
Member Author

@mkustermann Yes, I can definitely take a look at this bug after I'm done with the new aync/async*/sync* implementation in the VM.

I have made now a prototype in cl/239421 to prove that this would also fix flutter/flutter#100441

copybara-service bot pushed a commit that referenced this issue Mar 30, 2022
…a field is not a Future

Issue flutter/flutter#100441
Issue #48695

TEST=vm/dart{,_2}/flutter_regress_100441_test

Change-Id: Ifdd331dc3b0a2b825a938132808b4021c0af5f71
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/239311
Reviewed-by: Slava Egorov <[email protected]>
Reviewed-by: Alexander Markov <[email protected]>
Commit-Queue: Martin Kustermann <[email protected]>
@lrhn
Copy link
Member

lrhn commented Mar 30, 2022

@mkustermann
An await has no interaction with the stream nature of an async* function. The pause/cancel state is only checked at yield/yield*, after every yielded event.
An await just awaits a future.

We thought about this when designing async/await, and it was too fragile to insert (effectively) returns in unpredictable and uncontrollable positions. The classical example was:

Stream<int> foo() async* {
  var resource = await allocateResource();
  try {
    while (something) yield other;
  } finally {
    resource.dispose();
  }
}

If the await allocateResource() can act like a return, then there is no safe way to handle resource management for it.
So, an await must always either throw or provide a value.

@mkustermann
Copy link
Member Author

@lrhn That makes sense - just wanted to make sure.

@lrhn
Copy link
Member

lrhn commented Mar 31, 2022

I don't think the VM's await for is broken (not according to my quick tests). It responds immediately to a an event, and doesn't pause unnecessarily.

The yield an yield* should check for pause and cancel immediately after delivering each event. "After delivering" most likely means using a synchronous controller and checking after calling add. Basically, yield e would desugar to:

  Completer<void>? $pauseCompleter;  // somewhere earlier
  $c.onResume = $c.onCancel = () { var c = $pauseCompleter; $pauseCompleter = null; c?.complete(); };
  // ...

  var $v = e; // maybe handle throwing, if necessary
  $c.add($v);
  if ($c.isPaused) {
    await ($pauseCompleter = Completer<void>()).future;
  }
  if (!$c.hasListener) return;

The spec is https://github.com/dart-lang/language/blob/master/specification/dartLangSpec.tex#L18540

I actually think there is a bug in that, in that it doesn't check for pause after delivering the event. Getting an event is the precise point where a callback would pause, to stop more events from arriving. It's precisely where an await for will pause.
@eernstg

It does check before sending the event, but that's not enough (and not really necessary if it checks afterwards).

The yield* spec (https://github.com/dart-lang/language/blob/master/specification/dartLangSpec.tex#L18710) has the same problem, it should check for pause after delivering event.

@mkustermann
Copy link
Member Author

The yield an yield* should check for pause and cancel immediately after delivering each event.

👍 That's what my PoC in cl/239421 should do.

@eernstg
Copy link
Member

eernstg commented Mar 31, 2022

@lrhn wrote:

I actually think there is a bug in that, in that it doesn't check for pause after delivering the event

The specification of yield as well as yield* indicates that add is invoked after pause/resume, and it seems natural that we'd check for cancellation before adding anything to the stream. But you think we should change it such that add occurs earlier?

@lrhn
Copy link
Member

lrhn commented Apr 1, 2022

The goal is to allow a ping-pong between an async* function and an await for loop, so that if the loop doesn't do anything async, then the stream isn't paused, and if the loop does do anything async, then the loop is pause at the yield, before continuing. And in particular, if the loop exits, the stream is cancelled at that yield too, before any unnecessary computation is started. Because the loop immediately pauses the stream at an await, an exit happening after that event is still going to cancel before any further async* code runs.

That's the optimal interaction between async* and await for, which is why we want it.
It should give the most predictable and useful behavior, and would even allow such a generator+loop to run with no asynchronous gaps. (Which might actually be slightly dangerous if it starves other async code, but I'm unperturbed by that - if the code really does nothing asynchronously on either side, the author has effectively written an overly complicated for loop, and we don't worry about those either).

I'm sure I wrote this before. Somewhere, but apparently not into the spec.
Ah, there it was: dart-lang/language#121 (with feature spec: https://github.com/dart-lang/language/blob/master/accepted/future-releases/async-star-behavior/feature-specification.md, which is apparently even accepted, but never implemented.)

(It's safe to do an add on a stream controller before checking for pause or cancel, because adding to a paused stream controller will just buffer the event until the subscription resumes, and adding to a cancelled stream controller will discard the event. That's effectively the same as delaying the add if paused, or omitting the add if cancelled. It's important to check after adding because that's where the event handler has had time to give feedback.
The specification is not written in terms of a StreamController, so it can't say that. It just says that it "emits the event" if the stream isn't currently paused or cancelled, and waits until resume if the stream is paused, but the implementation can just call add and let the controller handle all that - ignoring the event if cancelled, delaying it if paused. The controller won't resume until all buffered events have been delivered, which is precisely what we want! The implementation then needs to do a check after the add, because that's what allows proper push-back from the listener.)

copybara-service bot pushed a commit that referenced this issue Apr 6, 2022
…active subscription (not paused/cancelled)

This fixes an issue where VM would run the async* generator after a
`yield` / `yield*` even though the subscription may be paused or
cancelled.

Furthermore this fixes an issue where `StackTrace.current` used
in async* generator crashes VM and/or produces truncated stack
trace.

This fixes the following existing tests that were failing before:

  * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t08
  * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t09
  * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t10
  * language/async_star/async_star_cancel_test
  * language/async_star/pause_test

Issue flutter/flutter#100441
Issue #48695
Issue #34775

TEST=vm/dart{,_2}/causal_stacks/flutter_regress_100441_test

Change-Id: I73f7d0b70937a3e3766b992740fa6fe6e6d57cec
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/239421
Reviewed-by: Lasse Nielsen <[email protected]>
Commit-Queue: Martin Kustermann <[email protected]>
copybara-service bot pushed a commit that referenced this issue Apr 7, 2022
…there's active subscription (not paused/cancelled)"

This reverts commit 837ee17.

Reason for revert: Please see flutter/flutter#101514

Original change's description:
> [vm] Fix some async* semantics issues: Only run generator if there's active subscription (not paused/cancelled)
>
> This fixes an issue where VM would run the async* generator after a
> `yield` / `yield*` even though the subscription may be paused or
> cancelled.
>
> Furthermore this fixes an issue where `StackTrace.current` used
> in async* generator crashes VM and/or produces truncated stack
> trace.
>
> This fixes the following existing tests that were failing before:
>
>   * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t08
>   * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t09
>   * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t10
>   * language/async_star/async_star_cancel_test
>   * language/async_star/pause_test
>
> Issue flutter/flutter#100441
> Issue #48695
> Issue #34775
>
> TEST=vm/dart{,_2}/causal_stacks/flutter_regress_100441_test
>
> Change-Id: I73f7d0b70937a3e3766b992740fa6fe6e6d57cec
> Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/239421
> Reviewed-by: Lasse Nielsen <[email protected]>
> Commit-Queue: Martin Kustermann <[email protected]>

# Not skipping CQ checks because original CL landed > 1 day ago.

Change-Id: Ic3d9c0508310a33a2aaee67860c0bb2ec83bab4a
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/240506
Reviewed-by: Siva Annamalai <[email protected]>
Reviewed-by: Ben Konyi <[email protected]>
Commit-Queue: Siva Annamalai <[email protected]>
copybara-service bot pushed a commit that referenced this issue Apr 26, 2022
…there's active subscription (not paused/cancelled)"

This fixes an issue where VM would run the async* generator after a
`yield` / `yield*` even though the subscription may be paused or
cancelled.

Furthermore this fixes an issue where `StackTrace.current` used
in async* generator crashes VM and/or produces truncated stack
trace.

This fixes the following existing tests that were failing before:

  * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t08
  * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t09
  * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t10
  * language/async_star/async_star_cancel_test
  * language/async_star/pause_test

New in reland: Allow the generator to to cause cancelling it's own consumer.
This addresses the issue of original revert
  -> see flutter/flutter#101514

Issue flutter/flutter#100441
Issue #48695
Issue #34775

TEST=vm/dart{,_2}/causal_stacks/flutter_regress_100441_test

Change-Id: I091b7159d59ea15fc31162b4b6b17260d523d7cb
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/242400
Reviewed-by: Lasse Nielsen <[email protected]>
Commit-Queue: Martin Kustermann <[email protected]>
@alexmarkov
Copy link
Contributor

@mkustermann Is this bug fully fixed in efdffab? Is there anything left for me to look at?

@mkustermann
Copy link
Member Author

@mkustermann Is this bug fully fixed in efdffab? Is there anything left for me to look at?

The particular semantics issue I was mainly concerned about (generator continues running even if listener cancelled) should be fixed, yes.

Though reading @lrhn 's description there may be other things left to do, which would change semantics a little and increase performance. Mainly I'm thinking about this comment from @lrhn

The goal is to allow a ping-pong between an async* function and an await for loop ...

Right now every yield <value> will deliver <value> to the consumer synchronously. The consumer may not cancel, in which case the generator currently doesn't continue immediately, instead it does so with a microtask hop. See code:

  @pragma("vm:entry-point", "call")
  bool add(T event) {
    ....
    controller.add(event);
    if (!controller.hasListener) ...

    scheduleGenerator();
    isSuspendedAtYield = true;
    return false;
  }

=> As can be see here, in the normal circumstance, we always call scheduleGenerator() which makes enqueue the generator in the microtask queue.
=> This means that async* with individual yield <value>s will be inherently inefficient.

@lrhn Did your comment above mean semantics would allow avoiding this microtask hop for every yield <value>?

@lrhn
Copy link
Member

lrhn commented May 4, 2022

Yes. The semantics are written to allow (but not require) having no async gap between the yield event being delivered, and the generator continuing. If the event is delivered synchronously (also allowed and not required), the generator can run completely synchronous - unless the event handler pauses immediately during the event delivery, then it has to wait at that yield.

My only worry is that such synchronous ping-pong can potentially lead to starvation of other code, if the code is written to assume async gaps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-vm Use area-vm for VM related issues, including code coverage, and the AOT and JIT backends. type-bug Incorrect behavior (everything from a crash to more subtle misbehavior)
Projects
None yet
Development

No branches or pull requests

4 participants