Skip to content

Warn on using generator as void function #42717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
ride4sun opened this issue Jul 16, 2020 · 15 comments
Open

Warn on using generator as void function #42717

ride4sun opened this issue Jul 16, 2020 · 15 comments
Labels
area-devexp For issues related to the analysis server, IDE support, linter, `dart fix`, and diagnostic messages. devexp-warning Issues with the analyzer's Warning codes P4 type-enhancement A request for a change that isn't a bug

Comments

@ride4sun
Copy link

I am using a stream to read out location data in a bloc. I have a start and a stop event. In the stop method, I cancel the stream subscription. When I use listen to a stream to yield the state the inside where the yield statement is never gets called.
What really bugs me that I don't get any error message warning or compiler error

    _locationSubscription = location.onLocationChanged.listen(
      (location) async* {
        if (location.isNotNull) {
          yield LocationState.sendData(location: updateLocation(location));
        }
      },
    );

When I replace the listen to await for I don't see any way
to stop this from yielding events because the subscription handle is gone.
Any ideas? Any explanations?

   Stream<LocationState> _start() async* {

    await Location()
        .changeSettings(accuracy: LocationAccuracy.high, interval: 1);

    await for (LocationData location in location.onLocationChanged) {
      if (location.isNotNull) {
        yield LocationState.sendData(location: updateLocation(location));
      }
    }

    //send one initial update to change state
    yield LocationState.sendData(
        location: updateLocation(await Location().getLocation()));
  }

This tracker is for issues related to:

  • Dart core libraries ("dart:async", "dart:io", etc.)

  • Dart SDK Version 2.8.4

  • Windows, MacOSX, and Linux

@lrhn lrhn added the type-question A question about expected behavior or functionality label Jul 16, 2020
@lrhn
Copy link
Member

lrhn commented Jul 16, 2020

To stop an await for iteration, you need to break out of the loop.
You can't do that from the outside. The most you can do is to set a flag, and on the next event, the loop can check that flag and break out. It doesn't cancel it immediately.

It's probably possible to create some Stream wrapper which allows all subscription created from that stream to be cancelled from somewhere else, and would then send a done event to the listener, but I'm not aware of such a class having been created yet.

I'd probably just use non-async code for this, keep the listen and use a StreamController for the resulting stream.

This code:

   _locationSubscription = location.onLocationChanged.listen(
      (location) async* {
        if (location.isNotNull) {
          yield LocationState.sendData(location: updateLocation(location));
        }
      },
    );

does not do what you expect. Each call to the inner function creates a new stream. Nobody listens to that stream (the onData callback to listen expects a void function, not one which returns a stream).
The yield of an async* function only works if it's directly inside the async* function. It cannot be put into a nested function.

@ride4sun
Copy link
Author

Ok, is it possible to get it working as I would have it expected? This is failing without any hint right now. Maybe Dart or the compiler/ analyzer can catch that? using listen in a stream is a pretty often used use case...

@lrhn
Copy link
Member

lrhn commented Jul 18, 2020

We won't make the yield inside a nested function do a yield in the outer function.

As for the "no warning", maybe the analyzer could issue a warning if you use a sync* or async* generator function in a void-returning function context. That would suggest that you are doing something useless since a generator function doesn't really run until the returned iterator/stream is iterated/listened to, and something expecting a void function won't do that.
I'll keep this issue as a feature request for this warning from the analyzer.

If you just want your code to keep working, I'd simply not switch to await for. If you need to cancel the subscription from the side, that's a use-case for using an explicit subscription object. If you can't write the code with .forEach, but have to use listen, then it probably also won't work with await for.

(You should use forEach over listen when you can, and then consider using await for if possible. Using listen should not be as common a use-case for Stream as it actually is because people use that instead of forEach).

Or you could write up a cancelable stream abstraction, like:

import "dart:async";


/// A stream which can be interrupted at any point.
///
/// Any subscription on [tream] can be cancelled at any
/// time using [interrupt].
abstract class InterruptibleStream<T> {
  /// Creates an interruptible stream which forwards to [source]
  ///
  /// Listening on the interruptible [stream] provides the same
  /// events as listening on [source] until either [source]
  /// is done or the [stream] is [interrupt]ed.
  factory InterruptibleStream(Stream<T> source) =>
      _InterruptibleStream<T>(() => source);
  
  /// Creates an interruptible stream which forwards to `source()`
  ///
  /// Listening on the interruptible [stream] creates a new
  /// stream by calling [source], then provides the same
  /// events as listening on that stream until either it
  /// is done or the [stream] is [interrupt]ed.
  factory InterruptibleStream.multi(Stream<T> source()) =>
      _InterruptibleStream<T>(source);

  /// The interruptible stream itself.
  ///
  /// All subscriptions on this stream can be interrupted using
  /// [interrupt].
  Stream<T> get stream;

  /// Interrupt all current subscriptions on [stream].
  ///
  /// An interrupted subscription cancels its underlying
  /// subscription, then emits a done event.
  ///
  /// If [error] is supplied, it is emitted as an error
  /// event after the cancel has completed, before
  /// emitting a done event.
  void interrupt([Object error, StackTrace stack]);
}

class _InterruptibleStream<T> implements InterruptibleStream<T> {
  final Stream<T> stream;

  /// Set of currently active subscriptions that [interrupt] should end.
  final Set<_InterruptibleSubscription<T>> _subscriptions;

  _InterruptibleStream(Stream<T> source()) : this._(source, {});

  _InterruptibleStream._(Stream<T> source(), this._subscriptions)
      : stream = Stream<T>.multi((StreamController<T> c) {
          _subscriptions.add(_InterruptibleSubscription<T>(
              _subscriptions, c, source().listen(null)));
        });

  void interrupt([Object? error, StackTrace? stack]) {
    for (var subscription in _subscriptions.toList()) {
      if (_subscriptions.remove(subscription)) {
        subscription.interrupt(error, stack);
      }
    }
  }
}

class _InterruptibleSubscription<T> {
  final StreamController<T> controller;
  final StreamSubscription<T> subscription;
  final Set<_InterruptibleSubscription<T>> owner;

  _InterruptibleSubscription(this.owner, this.controller, this.subscription) {
    controller.onPause = subscription.pause;
    controller.onResume = subscription.resume;
    controller.onCancel = _onCancel;
    subscription.onData(controller.add);
    subscription.onError(controller.addError);
    subscription.onDone(_onDone);
  }

  FutureOr<void> _onCancel() {
    owner.remove(this); // Can no longer be interrupted.
    return subscription.cancel();
  }

  void _onDone() {
    owner.remove(this); // Can no longer be interrupted.
    controller.close();
  }

  void interrupt(Object? error, StackTrace? stack) {
    var cancelResult = subscription.cancel();
    void end([_]) {
      if (error != null) controller.addError(error, stack);
      controller.close();
    }

    if (cancelResult is Future<void>) {
      cancelResult.then<void>(end, onError: (e, s) {
        controller.addError(e, s);
        end();
      });
    } else {
      scheduleMicrotask(end);
    }
  }
}

(The code as written requires a dev-release and null safety experiment for now).

@lrhn lrhn added legacy-area-analyzer Use area-devexp instead. type-enhancement A request for a change that isn't a bug and removed type-question A question about expected behavior or functionality labels Jul 18, 2020
@srawlins srawlins added the P4 label Jan 23, 2021
@petoknm
Copy link

petoknm commented Apr 23, 2021

I was expecting this (cancelling a subscription from a stream using await-for in an async generator) to just work. Is this something that will be fixed in future versions of Dart?

@lrhn
Copy link
Member

lrhn commented Apr 26, 2021

Not sure what "just work" means here. Cancelling the subscription of a stream being generated by an async* function running an await-for will cancel at the next yield. If there is no yield inside the await for, it will run to completion, just as a non-await for loop. There are no plans to change this.

@idkq
Copy link

idkq commented Jun 16, 2021

Simply use break;

await for (final event in stream) {
      if (event.length==0) {
        yield Object();

        // Breaks the await for
        break;
     }
     //...
}

@unicomp21
Copy link

if a break is used, it looks like something prevents the enclosing task from exiting. think this might be a bug.

however, if using a StreamIterator, instead of "await for", the "break" works as expected, and i don't get into trouble w/ the outer enclosing task exiting

as a general run, i "think" "await for" must always be allowed to run to completion, otherwise bad things happen.

@lrhn
Copy link
Member

lrhn commented Oct 11, 2021

There might be a bug somewhere, but that's impossible to say without seeing the actual code which fails.

Breaking an await for is perfectly valid and the recommended way to stop listening to a stream in async code.
(We are aware of exiting at a yield not always happening soon enough, #44616.)

@idraper
Copy link

idraper commented Jan 19, 2022

I also ran into this problem (in a slightly more specific domain) and figured I'd share my work-around. I have a function which re-maps a never-ending stream (and cannot use .map since it cannot be built lazily). Then, if the listener closes their subscription, the stream doesn't close since it listens to (using await for) a never-ending stream. An example of where this could happen is with await getStream().first; internally the stream would never close. I came up with the following based on:

The most you can do is to set a flag, and on the next event, the loop can check that flag and break out. It doesn't cancel it immediately.

As long as it is not a broadcast stream, you can check to see if a controller has any subscribers, and if it doesn't you can break. For example:

Stream<S> getStream<S>() {
  final rtn = StreamController<S>(); // cannot be .broadcast()

  () async {
    await for (final val in <some never-ending stream, eg Stream.periodic>) {
      if (!rtn.hasListener) break;

      // add implementation here
      rtn.add(val);
    }
    rtn.close();
  }();

  return rtn.stream;
}

This wouldn't work for broadcast streams, since you could potentially cancel all listeners and then add a new one later. It also doesn't close immediately, but at least exists cleanly eventually (assuming another event is raised). This has the advantage (for my specific use case) that the client doesn't need to explicitly set a flag to clean up.

@wangkaibule
Copy link

Since the stream returned from an async* function is always single-subscription (reference), which means we cannot reattach to the stream once we cancelled the previous subscription on it.
Why we need a dangling await for stream listener that probably iterate forever without any output?

@srawlins srawlins added the devexp-warning Issues with the analyzer's Warning codes label Jul 26, 2024
@hoc081098
Copy link

hoc081098 commented Aug 28, 2024

Hello,

When using await for with a infinite Stream, and yield inside loop, the finally block is not called when we cancel StreamSubsription (or take(1) for the same purpose):

import 'dart:async';

import 'package:rxdart/rxdart.dart';

void main() async {
  late StreamSubscription<void> sub;
  sub = getStream().listen((v) {
    print('Cancelling...');
    sub.cancel().then((_) => print('Cancelled...'));
  });
  
  await Future<void>.delayed(const Duration(seconds: 1));
  print('---');
  
  getStream().take(1).forEach(print);
}

Stream<String> getStream() async* {
  final s = Stream.value("Hello").concatWith([Rx.never()]);
  // or:  final s = BehaviorSubject.seeded('Hello');
  try {
    await for (final innerValue in s) {
      yield innerValue;
    }
  } finally {
    print('Done');
  }
}

@lrhn
Copy link
Member

lrhn commented Aug 28, 2024

@hoc081098 this sounds like an unrelated problem, not fitting this issue which is now an analyzer enhancement request. Do file a separate new issue instead. (Also, do try to reproduce without using Rx streams, otherwise the bug may be in those, and it should be filled in the Rx-stream repo if it is. Although I'm guessing the bug is a known issue with the current async* implementation.)

@lrhn lrhn changed the title How can I cancel a 'await for' in a flutter / dart bloc ? Warn on using generator as void function Aug 28, 2024
@hoc081098
Copy link

@hoc081098 this sounds like an unrelated problem, not fitting this issue which is now an analyzer enhancement request. Do file a separate new issue instead. (Also, do try to reproduce without using Rx streams, otherwise the bug may be in those, and it should be filled in the Rx-stream repo if it is. Although I'm guessing the bug is a known issue with the current async* implementation.)

I can reproduce it without rxdart lib 🙏

import 'dart:async';

void main() async {
  late StreamSubscription<void> sub;
  sub = getStream().listen((v) {
    print('Cancelling...');
    sub.cancel().then((_) => print('Cancelled...'));
  });
  
  await Future<void>.delayed(const Duration(seconds: 1));
  print('---');
  
  getStream().take(1).forEach(print);
}

Stream<String> getStream() async* {
  final controller = StreamController<String>();
  controller.onListen = () {
    controller.add('Hello');
  };
  
  try {
    await for (final innerValue in controller.stream) {
      yield innerValue;
    }
  } finally {
    print('Done');
  }
}

Console:

Cancelling...
---
Hello

@lrhn
Copy link
Member

lrhn commented Aug 28, 2024

Cool. Do file as a new issue, it still isn't relevant to this issue. 😉
(Or maybe add it to #49621 or one of the issues linked from there, since it looks like the same issue if it only affects the dev-compiler.)

@hoc081098
Copy link

Cool. Do file as a new issue, it still isn't relevant to this issue. 😉 (Or maybe add it to #49621 or one of the issues linked from there, since it looks like the same issue if it only affects the dev-compiler.)

I've just opened #56619

@bwilkerson bwilkerson added area-devexp For issues related to the analysis server, IDE support, linter, `dart fix`, and diagnostic messages. and removed legacy-area-analyzer Use area-devexp instead. labels Feb 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-devexp For issues related to the analysis server, IDE support, linter, `dart fix`, and diagnostic messages. devexp-warning Issues with the analyzer's Warning codes P4 type-enhancement A request for a change that isn't a bug
Projects
None yet
Development

No branches or pull requests

10 participants