Skip to content

yield* desugaring doesn't handle errors correctly #52464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Tracked by #55025
osa1 opened this issue May 22, 2023 · 6 comments
Closed
Tracked by #55025

yield* desugaring doesn't handle errors correctly #52464

osa1 opened this issue May 22, 2023 · 6 comments
Assignees
Labels
area-dart2wasm Issues for the dart2wasm compiler.

Comments

@osa1
Copy link
Member

osa1 commented May 22, 2023

This is covered in test language/async_star/async_star_test. Minimal repro:

import "dart:async";

Stream<int> testtt() async* {
  yield* Stream.error("error");
  yield 1;
}

void main() async {
  await testtt().handleError(print).forEach(print);
}

This should print "error" and then "1", but currently it prints "error" and then crashes with uncaught exception undefined:0: [object WebAssembly.Exception]. https://dart-review.googlesource.com/c/sdk/+/304501 somewhat improves this (it no longer crashes), but it still doesn't print "1".

With the linked CL, desugared testtt looks like this:

Stream<int> testtt() {
  StreamController<Object?> controller = StreamController<Object?>();

  Future<void> body() async {
    bool async_temporary_2;
    Completer<bool> completer = Completer<bool>();
    controller.add(completer);
    try {
      await completer.future;

      // Desugared `yield* ...`
      {
        StreamIterator<int> forIterator =
            StreamIterator<int>(Stream<int>.error("error"));

        bool jumpSentinel = false;

        try {
          for (;;) {
            async_temporary_2 = await forIterator.moveNext();
            if (jumpSentinel = async_temporary_2) {
              int awaitForVar = forIterator.current;
              controller.add(awaitForVar);
              completer = Completer<bool>();
              controller.add(completer);
              await completer.future;
            } else {
              break;
            }
          }
        } finally {
          if (jumpSentinel) {
            await forIterator.cancel();
          }
        }
      }

      // Desugared `yield 1`
      {
          controller.add(1);
          completer = Completer<bool>();
          controller.add(completer);
          await completer.future;
      }
    } catch (_6) {
      controller.addError(_6);
    } finally {
      controller.close();
    }
  }
  bool isFirst = true;
  bool isEven = false;
  controller.add(null);
  return controller.stream.asyncMap<Object?>((Object? value) {
    if (isFirst) {
      body();
      return null;
    }
    if (value is Completer<bool>) value.complete(true);
    return value;
  }).where((Object? value) {
    if (isFirst) {
      isFirst = false;
      return false;
    }
    bool keep = isEven;
    isEven = !isEven;
    return keep;
  }).cast<int>();
}

I think we need a catch somewhere in the desugared await* ... to call controller.addError(...).

@osa1 osa1 added the area-dart2wasm Issues for the dart2wasm compiler. label May 22, 2023
@osa1 osa1 self-assigned this May 23, 2023
@osa1
Copy link
Member Author

osa1 commented Feb 27, 2024

I think we need a catch somewhere in the desugared await* ... to call controller.addError(...).

I'm testing this in https://dart-review.googlesource.com/c/sdk/+/354224.

@lrhn
Copy link
Member

lrhn commented Feb 27, 2024

Catching an error is not enough. You need to forward the event and continue streaming.

The desugaring should not use StreamIterator at all. Even with a catch, that will break iteration at the first error.

A better approach may be

   await controller.addAll(Stream<int>.error("error")); // cancelOnError: false is default.
   if (!controller.hasListener) return;

Consider this example:

// Emits: value 1, error StateError, value 2
Stream<int> embeddedError() => Stream.multi((c) {
  c.add(1);
  c.addError(StateError("banana"));
  c.add(2);
  c.close();
});

// Should emit: value -1, value 1, error StateERror, value 2, value -1
Stream<int> wrap(Stream<int> source) async* {
  yield -1;
  yield* source;
  yield -1;
}

// Should print: "value -1", "value 1", "error Bad state: banana", "value 2", "value -1", "done"
void main() {
  wrap(embeddedError()).listen((v) {
    print("value $v");
  }, onError: (e, s) {
    print("error $e");
  }, onDone: () {
    print("done");
  });
}

It should print all six lines.

The desugaring I usually recommend for an async* function with body body is:

Stream<T> name(args) { 
  var controller = StreamController<T>(sync: true);
  void _body() async {
    Completer<void>? _$paused;
    controller.onResume = controller.onCancel = () {
      _$paused?.complete(null);
      _$paused = null;
    };
    $body:
    try {
       // Insert *body* with the following changes:
       yield* e; // -> 
         await controller.addStream(e);
         if (!controller.hasListener) break $body;

        yield e; // ->
          controller.add(e);
          if (controller.isPaused) {
            await (_$paused = Completer()).future;
          }
          if (!controller.hasListener) break $body;

       return; -> 
         break $body;
       // end *body*.      
    } catch (e, s) {
       controller.addError(e, s);
    }
    controller.close();
  }
  controller.onListen = () {
    scheduleMicrotask(_body); // Async* bodies start asynchronously. 
    // Could also insert an `await null;` at the start of the compiled body instead
    // and use `controller.onListen = _body;`.
  };
  return controller.stream;
}

@osa1
Copy link
Member Author

osa1 commented Mar 4, 2024

I'm implementing the suggested desugaring above in https://dart-review.googlesource.com/c/sdk/+/355360.

@osa1
Copy link
Member Author

osa1 commented Mar 4, 2024

@lrhn I implemented the desugaring in https://dart-review.googlesource.com/c/sdk/+/355360, and I think it's missing a yield after controller.add. Consider this example:

import "dart:async";

Stream<int> s() async* {
  for (int i = 0; i < 2; i++) {
    print("$i 0");
    yield i;
    print("$i 3");
  }
}

void main() async {
  await for (int i in s()) {
    print("$i 1");
    await Future.microtask(() {});
    print("$i 2");
  }
}

This prints

0 0
0 1
0 2
0 3
1 0
1 1
1 2
1 3

If I desugar s according to the rules in your comment:

import "dart:async";

Stream<int> s() {
  StreamController<int> controller = StreamController<int>();

  var body = () async {
    Completer<void>? paused;

    dynamic onCancelCallback = () {
      if (paused != null) {
        paused!.complete(null);
      }

      paused = null;
    };

    controller.onResume = onCancelCallback;
    controller.onCancel = onCancelCallback;

    try {
      try {
        for (int i = 0; i < 2; i = i + 1) {
          print("$i 0");

          controller.add(i);

          if (controller.isPaused) {
            await (paused = Completer<void>());
          }

          if (!controller.hasListener) {
            return;
          }

          print("$i 3");
        }
      } catch (err, stack) {
        controller.addError(err, stack);
      }
    } finally {
      controller.close();
    }
  };

  controller.onListen = () {
    scheduleMicrotask(body);
  };

  return controller.stream;
}

void main() async {
  await for (int i in s()) {
    print("$i 1");
    await Future.microtask(() {});
    print("$i 2");
  }
}

This version prints

0 0
0 3
1 0
1 3
0 1
0 2
1 1
1 2

We need to yield after the line controller.add(i), but I'm not sure what's the best way to do that. I can do it with a await Future(() {}). Is there a better way?

@osa1
Copy link
Member Author

osa1 commented Mar 4, 2024

Hm, I think yielding after an element is not enough, we should pause the generator until the generated element is consumed, right?

The current desugaring yields a Completer<bool> after every element for this reason. The consumer then completes the completer after consuming the element. The transformation is shown here:

// #controller.add(i);
// #completer = Completer<bool>();
// #controller.add(#completer)
// await #completer.future;

@lrhn
Copy link
Member

lrhn commented Mar 4, 2024

Yes, the desugaring only works if the controller is sync. It needs to deliver the event, so that the await can cause the surrounding await for subscription to pause in time.

If the controller is not sync, then to wait for the event to be delivered, you'll need to wait for the event to be delivered. If the controller is not paused (and not cancelled), that'll very, very likely happen as a microtask.

  • If hasListener and not isPaused, delay for a microtask, as by await Future.microtask((){});.
  • If isPaused, make a resume-completer and wait for it's future's completion (and complete and clear it in onResume and onCancel if there is a completer).
  • If not hasListener, return.

That does assume that the microtask queue is a queue. It's possible for zones to intercept the microtask queue and make it, fx, a stack. They probably won't.
Another alternative is to use a modified stream controller which completes a future when it has delievered an event, then wait for that future. Something like _AsyncStarStreamController<T> $c = ...; ... await $c.add(value);.
(It's so much simpler to just use a sync controller, but that will change timing, which might make some badly written tests fail. Most likely those tests haven't been run on Wasm before, so it's not like you're changing existing behavior, just creating a good new one.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-dart2wasm Issues for the dart2wasm compiler.
Projects
None yet
Development

No branches or pull requests

2 participants