Skip to content

Support streams of objects that need cleanup #52221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
AsturaPhoenix opened this issue Apr 29, 2023 · 5 comments
Open

Support streams of objects that need cleanup #52221

AsturaPhoenix opened this issue Apr 29, 2023 · 5 comments
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-async type-enhancement A request for a change that isn't a bug

Comments

@AsturaPhoenix
Copy link
Contributor

AsturaPhoenix commented Apr 29, 2023

Streams are great for primitives and simple data, but they can be hostile against objects that need cleanup. Parts of the stream contract allow buffering and discarding events, which can leave a gap in the chain of custody and let objects that need cleanup leak. While such objects could be served using a callback scheme or a custom controller/sink/stream trinity, it becomes unfortunate to reimplement so much and lose async*/await for syntax.

Example use case:

import 'dart:async';

class MyAllocatedResource {
  ...
  Future<void> close() async {
    ...
  }
}

/// Consumers must close received objects.
Stream<MyAllocatedResource> multiOpen(...) async* {
  for (...) {
    yield MyAllocatedResource(...); // I'm likely to leak. Protect me.
  }
}

/// ADDENDUM: This is oversimplified; see #issuecomment-1530060075
Stream<...> myConsumer(...) async* {
  await for (final resource in multiOpen(...)) {
    try {
      ...
      yield ...; // I can be cancelled too.
    } finally {
      unawaited(resource.close());
    }
  }
}

In cases where the event is discarded, the resource leaks.

This can be worked around without abandoning the use of Stream, but it would be nice for this use case to be first-class. Workarounds are also complicated by bugs/inconsistencies in the async* implementation.

My current workaround looks like this. I may try to punt the allocation into onListen to try to avoid the yield* pitfall described below, but differences in error handling (try/catch vs. forwarding to Stream) may hinder that further.

Two issues in particular complicate workarounds:

Related use cases:

Possibly related:

Versions/platforms:

  • Dart SDK Version: 3.0.0-204.0.dev (dev) / DartPad stable, master
  • Platforms: Windows (unit testing), web (Chrome), Android
@AsturaPhoenix
Copy link
Contributor Author

AsturaPhoenix commented Apr 29, 2023

Possible related wishlist item: if StreamController.add could return a Future that completes when the event is delivered or dropped. There are plenty of details that need to be hammered out (e.g. something like orCancel, and what about broadcast streams).

@lrhn
Copy link
Member

lrhn commented Apr 29, 2023

I can see that the async* (and its various difference in implementation) are not very useful or predictable.

You generally cannot know whether a stream event is received. There is an asynchronous gap between sending and it being received, and the receiver can always cancel in that gap.

The one thing that I think can be made completely certain is a synchronous single-subscription stream controller.
It delivers the event immediately when you call add if the listener is not paused, and you can know whether it's paused before you send the event.

I'd probably build an abstraction on top of that, with the ability to give feedback on whether an event is delivered or not, rather than add extra overhead to the normal stream controller.
Something like:

import "dart:async";
import "dart:collection";

/// Stream which allows registering events, to ensure delivery.
///
/// Creates a stream by calling [source] with a `register` function.
/// Events sent by the stream can be registered using the `register` function,
/// and if the stream is cancelled before the event has been delivered,
/// the `undelivered` function is called for each undelivered registered event.
///
/// Not all events need to be registered, but registered events *must* be sent
/// in the same order they are registered.
/// The `register` function returns the event again, to allow writing, e.g.,
/// `yield register(event);`.
///
/// Example usage:
/// ```dart
/// /// Allocates chunks of memory every second as long as anyone wants them.
/// Stream<Allocation> allocations() => ensureDelivery((register) async* {
///   while (true) {
///     await Future.delayed(const Duration(seconds: 1));
///     yield register(Memory.allocateChunk()); // Must not be forgotten
///   }
/// }, undelivered: Memory.freeChunk);
/// ```
/// If a listener cancels the allocations stream after a chunk has been
/// allocated, but before it has been delivered, then the chunk is passed
/// to `Memory.freeChunk` when the stream has been cancelled.
Stream<T> ensureDelivery<T>(Stream<T> Function(T Function(T) register) source,
    {required void Function(T) undelivered}) {
  final c = StreamController<T>(sync: true);
  c.onListen = () {
    final sentEvents = Queue<T>();
    final subscription = source((T event) {
      sentEvents.add(event);
      return event;
    }).listen(null);
    c
      ..onPause = subscription.pause
      ..onResume = subscription.resume
      ..onCancel = () => subscription.cancel().whenComplete(() {
            while (sentEvents.isNotEmpty) {
              undelivered(sentEvents.removeFirst());
            }
          });

    subscription
      ..onData((T value) {
        if (sentEvents.isNotEmpty && identical(value, sentEvents.first)) {
          sentEvents.removeFirst();
        }
        c.add(value);
      })
      ..onError(c.addError)
      ..onDone(c.close);
  };
  return c.stream;
}

@lrhn lrhn added area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-async type-enhancement A request for a change that isn't a bug labels Apr 29, 2023
@AsturaPhoenix
Copy link
Contributor Author

I think I oversimplified my reduced example so that it no longer clearly shows the leak, and probably cannot leak as-is on VM (but does leak on dart2js due to #48749) so long as the consumer behaves like await for and pauses the stream while handling the event. However, it is very easy to violate this requirement; here's an example that does leak on VM as well.

import 'package:async/async.dart';

class MyAllocatedResource {
  static int unclosedInstances = 0;

  MyAllocatedResource() {
    ++unclosedInstances;
  }

  void close() {
    --unclosedInstances;
  }
}

Stream<MyAllocatedResource> multiOpen() async* {
  yield MyAllocatedResource();
}

void main() async {
  await for (final resource in StreamGroup.merge([
    multiOpen(),
    multiOpen(),
  ])) {
    try {
      break;
    } finally {
      resource.close();
    }
  }

  print('unclosedInstances: ${MyAllocatedResource.unclosedInstances}'); // 1
}

where an event is dropped because both async generators get to their yield before the StreamGroup is paused.

@AsturaPhoenix
Copy link
Contributor Author

AsturaPhoenix commented May 1, 2023

Also in case it helps, on Discord user @abitofevrything graciously surveyed the current behavior and deviations from the spec on VM and dart2js:

https://gist.github.com/abitofevrything/56e5d0b7b5c3bc25462bca5f334c4cb2

@lrhn
Copy link
Member

lrhn commented May 3, 2023

Even with a correctly implemented async* function, it's always possible to end up with a computed value that won't be delivered. The yield e; statement will evaluate e first, before the yield checks whether the stream is cancelled. If it was cancelled, that event won't be delivered. The only way to know that an event was delivered is that code after the yield is executed. (If implemented correctly!)

That's an inherent design in the async* implementation, it only synchronizes with the stream subscription at the yield statements. Code between those runs obliviously to whether someone is waiting for another event or not.
It's only when the event is completely evaluated and ready to send that the yield will check whether there is a receiver.

That's not easily changed. I wouldn't even know where to start.

It is a place where async* functions are inferior to code written directly using a StreamController, which can get callbacks immediately when someone calls pause or cancel on the subscription. The only way for an async* function to get information is to do a yield. (A yield* won't work, because there is always a delay after it, so the information you get from it running can be stale before you can use it.)

So maybe we should allow async* functions to register such callbacks, somehow? Or get access to the underlying stream controller, or at least something which exposes enough of it, while still being safe. (Not an easy thing to design properly.)

Just for the record, as specified (I hope, that was the intent), a yield doesn't have to pause the surrounding stream at a yield if it can deliver the event synchronously. It does mean that it must synchronously call the onData callback of the current invocation's associated stream subscription, then check right after whether the stream is paused or cancelled. If not, it's OK to continue synchronously executing the async* function body.

If the subscription is paused after the synchronous event delivery returns, then the sync* body should suspend at the yield and wait for a resume or cancel. It can then resume at (or probably at some time after) a resume or cancel, with the cancel behaving like return and the resume just continuing after the yield.

The function can also deliver the event asynchronously, and then it must suspend the async* function body until the event has been delivered, which is ... tricky to know if using a plain StreamController. Good thing it doesn't. (I'd recommend just delivering synchronously.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-async type-enhancement A request for a change that isn't a bug
Projects
None yet
Development

No branches or pull requests

2 participants