Skip to content

Writing operators

David Karnok edited this page Oct 27, 2018 · 22 revisions

Introduction

Writing operators for IAsyncEnumerable is both easier and harder than working with IObservables or Reactive Streams Publishers. It is easier for intermediate operators such as Map and Filter where async/await and compiler generated state machines line up pretty well. It is harder for operators working with multiple sources or time because one can only await one thing in an async method at a time and often there is no clear thing to await first or that how many awaitable primitives to introduce.

In this wiki, I'll explain how to write such easy and complicated operators.

Table of contents

IAsyncEnumerable primer

Despite the popularity of Rx.NET for push-based declarative data processing and the success of the non-blocking backpressure-enabled RxJava libraries, there is always the buzz that maybe it is possible to have convenient backpressure via coroutines, continuations, async-await or fibers.

One option is to make IEnumerable/IEnumerator support async/await and let the compiler work out the resumption state machine. We only have to await the next item and the concurrency is taken care of.

There were several proposals about such an async enumerable, and the latest one that seems to be winning is defined as follows.

The proposal introduces 3 new interfaces backing the concept of async sequences as well as language/compiler changes. We don't care about the latter as we don't have any such changes ready, but we can work with the interfaces right away as the necessary async/await support is already part of the released C# language.

The new interfaces and their methods were named distinctively with an Async word so that they can be implemented on a class which already supported the synchronous IEnumerable/IEnumerator without conflict.

IAsyncDisposable

The IAsyncDisposable is the analogue of the IDisposable interface, but instead of a blocking or fire-and-forget disposal, the DisposeAsync has to be called and awaited to make sure resources associated with an async sequence are all released before continuing.

public interface IAsyncDisposable 
{
    ValueTask DisposeAsync();
}

The design uses ValueTask instead of Task as it can reduce the allocations in asynchronous completion cases. (As pointed out by a commenter in #7)

IAsyncEnumerator

The IAsyncEnumerator is analogous with IEnumerator that allows moving for the next item, awaiting this to happen and then getting the actual value if there is any.

public interface IAsyncEnumerator<out T> : IAsyncDisposable 
{
     ValueTask<bool> MoveNextAsync();

     T Current { get; }
}

The IAsyncEnumerator extends the IAsyncDisposable and the consumer is required to await DisposeAsync whenever it is done with the async enumerator.

There are a couple rules to be honored, which will be quite important later when coordination is required:

  • MoveNextAsync must be called non-concurrently and non-overlappingly. This means that one has to wait for the method's returned task to signal true before calling it again. When used with async/await, this is guaranteed by the compiler generated state machine. Otherwise, call to this method must be serialized externally.
  • Current is only valid if the task evaluates to true after a MoveNextAsync. It is invalid to read it before the first MoveNextAsync call, after a false task or after the call to DisposeAsync.
  • DisposeAsync cannot be called concurrently with MoveNextAsync but only when the next task evaluates to true/false/failure. In addition, it has to be awaited and is expected to return only when the whole chain above it has been disposed as well in a cascading fashion.
  • The methods should not throw exceptions themselves but they can return a failed/failing task.

Unfortunately, DisposeAsync does not serve as an anytime cancellation like IDisposable.Dispose() in Rx.NET and since it must not run concurrently with MoveNextAsync, we have to serialize their execution externally in case we have something that an async-await loop could handle.

IAsyncEnumerable

Finally the IAsyncEnumerable is the equivalent of IEnumerable with a single method to get the IAsyncEnumerator. This, in theory, allows lazy and deferred async sequences which can be run as many times as necessary and even concurrently with each other. Only interacting with the IAsyncEnumerator has to be serialized via async/await or external means.

public interface IAsyncEnumerable<out T> 
{
    IAsyncEnumerator<T> GetAsyncEnumerator();    
}

This method has to return the async enumerator synchronously and not throw any exception.

Unfortunately, this is the only level where eager cancellation support can be injected, but unlike the Rx.NET scheme, it can't be made known to the downstream consumers within the flow. One has to keep injecting CancellationTokens into such operators.

There is an additional problem with such injections. One can't inject different CancellationTokens into different realizations of GetAsyncEnumerator()s: they have to live on the IAsyncEnumerable level and cancelling a token will cancel for all running async enumerators. Usually one would like to correlate cancellation with running async enumerators (which one gets from IObservable.Subscribe quite clearly) so only some runs could be cancelled.

The solution is most likely to defer the creation of the entire IAsyncEnumerable itself so cancellation can be dedicated at the point when an async enumerator is requested by some consumer.

Writing async sources

Async sources are IAsyncEnumerable which are some kind of generators or bridges to non-async sequences. Perhaps the simples generator is the Range operator that generates numbers on demand.

Synchronous generator

At this moment, there is no compiler support for writing an IAsyncEnumerable in the style of a IEnumerator: there is no yield return or yield break support, therefore, we'll have to write a bit of a boilerplate:

public sealed class Range : IAsyncEnumerable<int>
{
    readonly int start;
    readonly int end;

    public Range(int start, int end)
    {
        this.start = start;
        this.end = end;
    }

    public IAsyncEnumerator<int> GetAsyncEnumerator()
    {
        // TODO implement
    }
}

The outer IAsyncEnumerator usually hosts the readonly parameters of the sequence which then usually has to be passed to the async enumerator created:

    public IAsyncEnumerator<int> GetAsyncEnumerator()
    {
        return new RangeEnumerator(start, end);
    }

    internal sealed class RangeEnumerator : IAsyncEnumerator<int>
    {
        readonly int end;

        int index;

        internal RangeEnumerator(int start, int end) 
        {
             this.end = end;
             this.index = start;
        }  

        // TODO implement methods and properties
    }
}

So far, nothing complicated. We need a mutable index field to track where the range is currently at when the consumer requests the next value.

The first thing to implement is the Current property:

        public int Current { get; private set; }

Here, we run into a very common theme with async sequence state, hence the writable property: we have to store the value to be read by the consumer somewhere while the state could be prepared for retrieving the next value upon a call to MoveNextAsync. In respect of our range operator, this should become apparent from the MoveNextAsync implementation:

        public ValueTask<bool> MoveNextAsync()
        {
             if (index == end) 
             {
                 return new ValueTask(false);
             }
             Current = index++;
             return new ValueTask(true);
        }

When we determine we haven't reached the end yet, we take the current index and save it into Current, then move it forward by one so that the next invocation of MoveNextAsync can test it against the end as well. If we simply returned index as the Current's getter, we'd have the wrong number available!

(Remark: in this particular case, one could simply have index = start - 1 and then have index++ at the beginning of MoveNextAsync, but watch out for the integer underflow. Generally, we will likely not work with numbers or objects where the previous value is so easily made available.)

Interestingly, we don't need the async keyword on the method because we don't do anything that goes asynchronous: we decide what the next result would be and return a ValueTask structure avoiding allocation. We could have written the method as follows:

        public async ValueTask<bool> MoveNextAsync()
        {
             if (index == end) 
             {
                 return false;
             }
             Current = index++;
             return true;
        }

so that the resulting boolean value is automatically turned into the right task structure/object. Unfortunately, marking a method async and not doing any await nets an annoying warning currently. Using the explicit return of ValueTasks keeps the code warning free (without disabling the warning of course).

Finally, we have to implement the DisposeAsync method, although we don't have any resources or things to do upon disposal:

        public ValueTask DisposeAsync()
        {
            Current = default;
            return new ValueTask();
        }

Usually, when working with objects instead of ints, one would make sure their references are not kept beyond the lifecycle of the async sequence. This means, for example, clearing the last Current value and/or links in other class fields.

Consuming the Range operator at the end is generally done this way:

public async void Main(string[] args)
{
    var enumerator = new Range(1, 6).GetAsyncEnumerator();
    try 
    {
         while (await enumerator.MoveNextAsync())
         {
             Console.WriteLine(enumerator.Current);
         }
         Console.WriteLine("Done!");
    }
    finally
    {
        await enumerator.DisposeAsync();
    }
}

(Remark: there are plans to support a less verbose foreach async and using async expressions where one can focus on the loop body and the rest is generated automatically.

Asynchronous generator

If we have to interface with some external source which supports async/await, we have often a quite easy job as we can write an async marked MoveNextAsync method and simply await values right there.

For example, let's write a source that generates items after some time delay:

   internal sealed class Interval : IAsyncEnumerator<long>
   {
       readonly TimeSpan delay;

       internal Interval(TimeSpan delay)
       {
           this.delay = delay;
       }

       long next;

       public long Current { get; private set; }

       public async ValueTask<bool> MoveNextAsync()
       {
           await Task.Delay(delay);
           Current = next++;
           return true;
       }

       public ValueTask DisposeAsync()
       {
           return new ValueTask();
       }
   }

(Remark: from now on, the enclosing IAsyncEnumerable is omitted if it is trivial to write based on what parameters the enumerator uses.)

If one worked with Task.Delay, it becomes apparent that we didn't add a CancellationToken parameter. The reason for it is that it makes no sense in this style of async sequence design. One would think to add a CancellationTokenSource field and have it Canceled in DisposeAsync, but the specification forbids calling DisposeAsync while MoveNextAsync is in progress. The only way for an unaware consumer is to stop calling MoveNextAsync after the previous call completed.

We'll see later on that this non-concurrent call requirement of MoveNextAsync and DisposeAsync makes things convoluted and may even elongate the lifecycle of async sequences in an undesirable manner.

Push-pull adaptation

So far, the example sources were of the pull kind: the consumer indicated when it wanted the next item and the async source generated it right there, on demand. Basically, we had a request-1 backpressure situation.

However, some sources (and intermediate operators) are required to run in a form of push: the value to be given to the consumer of the IAsyncEnumerator may happen before the consumer has called MoveNextAsync. Sometimes more than one value has to wait in this manner.

This is the traditional consumer-producer problem which is solved by using some blocking mechanisms: blocking queues or wait-notify pairs. Unfortunately, we are in the async/non-blocking word so these won't be good enough for us (and we don't have a runtime that magically saves and restores the call-stack upon blocking like Project Loom aims for).

Therefore, we need to work with a non-blocking and async friendly continuation mechanisms by using atomics (Interlocked) and manual task completions (TaskCompletionSource).

Currently, there are 3 kinds of such rendezvous situations to consider:

  1. "Ping-pong": where the producer and the consumer would wait for the other to be ready.
  2. "Unbounded-in" producer: where the producer runs with its own pace and the consumer will get items at its own pace (may be designed as lossy).
  3. "Prefetched" producer: the producer is allowed to generate up to a certain number of items and then pauses until the consumer has taken some/all items, after which the producer is resumed.

Practically, the "Ping-pong" case is equivalent to the "Prefetched" case where the number is 1. In backpressure terms, "Ping-pong" is request(1) and "Prefetchedisrequest(N)withrequest(M)afterM` items have been consumed.

Tooling-vise, the simplest to implement is the "Ping-pong" case, but first, we need that async/non-blocking continuation scheme.

Async Resumption

When a producer and consumer met exactly once, the solution would be quite straightforward:

readonly TaskCompletionSource<T> resume = new TaskCompletionSource<T>();

public void SendValue(T value)
{
    resume.TrySetResult(value);
}

public async void ProcessValue() {
    var value = await resume.Task;
    // work with value
}

Unfortunately, this doesn't work when the exchange must happen more than once. For one, completing a TaskCompletionSource makes if completed forever, thus when the next value is to be set, TrySetResult won't overwrite the old one. Worse, the consumer side awaiting the TaskCompletionSource.Task will immediately succeed and will keep getting that very first item over and over.

Therefore, we have to create a fresh TaskCompletionSource once the previous was successfully awaited. However, there is a further complication: who should create that fresh instance, the consumer or the producer? Remember, they may run concurrently with each other so either one or the other may reach its respective line (resume.TrySetResult vs await result.Task) and they have to work with the same TaskCompletionSource instance.

The solution has two parts: first make sure only one party can create a TaskCompletionSource, then make sure only one party can renew it.

The first part can be done via atomics (so called Compare-and-Swap loop).

public static TaskCompletionSource<T> GetOrCreate<T>(ref TaskCompletionSource<T> field)
{
    for (;;) 
    {
        var current = Volatile.Read(ref field);
        if (current != null)
        {
            return current;
        }
        var fresh = new TaskCompletionSource<T>();
        if (Interlocked.CompareExchange(ref field, fresh, null) == null) {
            return fresh;
        }
    }
}

If both producer and consumer call GetOrCreate at the same time, this loop will make sure only one of them can create a new TaskCompletionSource and both receive the same instance.

It is clear that once called, the method above will have a non-null TaskCompletionSource in the target field. However, once spent (TrySetResult), it has to be removed (field nulled out), but by who? In addition, what will prevent the producer from calling TrySetResult again too early?

The solution: have notifications sent into both direction and each party is responsible for clearing the other party's TaskCompletionSource (i.e., setting the field to null). Therefore, we need two TaskCompletionSource fields:

TaskCompletionSource<bool> consumed;

TaskCompletionSource<bool> valueReady;

T value;

(Remark: generally, the state changes may be more involved than sending a single value of T via TaskCompletionSource, hence the use of the basic <bool> typed and a dedicated value field.)

Now the exchange can be organized as follows:

public async ValueTask Send(T value) 
{
    await GetOrCreate(ref consumed).Task;
    consumed = null;

    this.value = value;

    GetOrCreate(ref valueReady).TrySetResult(true);
}

public async ValueTask<T> Receive() 
{
    GetOrCreate(ref consumed).TrySetResult(true);

    await GetOrCreate(ref valueReady).Task;
    valueReady = null;

    return value;
}

The first rendezvous point is on the consumed TaskCompletionSource. The sender will wait for this to become completed. Once awaited, which can only happen if the consumer called TrySetResult, it clears the consumed field, sets the value and signals the valueReady and returns. If the producer wanted to re-enter Send, it would start waiting on now the fresh consumed TaskCompletionSource. We won't get any accidental fall-throughs because it was the prior call that nulled out the field and not the consumer side.

On the consumer side, the receiver signals the consumed TaskCompletionSource, unblocking a sender, then starts waiting for the valueReady signal (set by the sender). Once awaited, the consumer clears the valueReady field and returns the value field. If the consumer is to receive again, it won't accidentally fall-through because valueReady is cleared by the consumer in the previous call.

One property of this setup is that the sender has to wait for the consumer the first time it wants to send an item. Sometimes this is not desirable, and the sender should be allowed to set the first item without going await. For such setup, we have to start out with an already completed TaskCompletionSource in consumed and have the Receive do the signalling to consumed be the last action:

TaskCompletionSource<bool> consumed;

// in the constructor
consumed = new TaskCompletionSource<bool>();
consumed.TrySetResult(true);

public async ValueTask Send(T value) 
{
    await GetOrCreate(ref consumed).Task;
    consumed = null;

    this.value = value;

    GetOrCreate(ref valueReady).TrySetResult(true);
}

public async ValueTask<T> Receive() 
{
    await GetOrCreate(ref valueReady).Task;
    valueReady = null;

    var v = value;

    GetOrCreate(ref consumed).TrySetResult(true);

    return v;
}

In this setup, both start out waitng for the respective resumption signal, but given that consumed starts out set, Send will be able to set the first value and unblock Receive. Then the notification exchange is the same "Ping-pong" between the two.

Imperative async generator

Now that the first primitive is established, we can now implement our first non-trivial operator: an imperative asynchronous generator. In essence, it is a push-like operation where new items are sent to a consumer:

public async ValueTask Generate() 
{
    await Send(1);
    await Send(3);
    await Send(6);
    // sequence ends when the task ends or crashes
}

Now the core value return logic can be implemented with MoveNextAsync, but it loses the imperative look:

int state;

public T Current { get; private set; }

public async ValueTask<bool> MoveNextAsync()
{
    if (state == 0) {
        state = 1;
        Current = 1;
        return true;
    }
    if (state == 1) {
        state = 2;
        Current = 3;
        return true;
    }        
    if (state == 2) {
        state = 3;
        Current = 6;
        return true;
    }        
    return false;
}

Instead, we have two options: an abstract IAsyncEnumerator with the Send mechanism or creating a IAsyncEnumerable which can work with an async function.

public static IAsyncEnumerator<T> Generate<T>(Func<IAsyncEmitter<T>, Task> generator);

Generate(async emitter => {
    await emitter.Send(1);
    await emitter.Send(3);
    await emitter.Send(6);
    // task ends
});

Here in order to communicate with an actual consumer of the IAsyncEnumerator, we need a mediator interface:

public interface IAsyncEmitter<in T>
{
    ValueTask Send(T item);
}

This ensures that the generator function will be invoked for each GetAsyncEnumerator and thus each of them will have their personal generator logic running. In this setup, the producer side is the Send method and the consumer side is the MoveNextAsync method.

Basic implementation

sealed class GenerateEnumerator : IAsyncEnumerator<T>, IAsyncEmitter<T>
{
    TaskCompletionSource<bool> consumed;
    TaskCompletionSource<bool> valueReady;

    T value;

    public T Current { get; private set; }

    internal GenerateEnumerator() 
    {
        this.consumed = new TaskCompletionSource<bool>();
        this.consumed.TrySetResult(true);
    }

    public async ValueTask<bool> MoveNextAsync() 
    {
        await GetOrCreate(ref valueReady).Task;
        valueReady = null;

        Current = value;
        GetOrCreate(ref consumed).TrySetResult(true);
        return true;
    }

    public async ValueTask Send(T item)
    {
        await GetOrCreate(ref consumed).Task;
        consumed = null;

        value = item;

        GetOrCreate(ref valueReady).TrySetResult(true);
    }

    public ValueTask DisposeAsync() 
    {
        return new ValueTask();
    }
}

This time, the GetAsyncEnumerator has an extra job to perform by calling the generator function:

readonly Func<IAsyncEmitter<T>, Task> generator;

// ...

public IAsyncEnumerator<T> GetAsyncEnumerator()
{
    var enumerator = new GenerateEnumerator();
    generator(enumerator);
    return enumerator;
}

Detecting completion

When run, this async source works but then never completes, even though the user-provided task ended. We need to detect when the task ends and then signal the MoveNextAsync there won't be further items.

Unfortunately, we can't use await generator(enumerator) to wait for the user-provided task to end because it would prevent us from returning the enumerator, thus the consumer would never get a hold of it, call MoveNextAsync and the whole setup livelocks. Besides, GetAsyncEnumerator is a non-async method thus await is not allowed.

This is the point where we have to use the more involved task-continuation mechanisms of C#: Task.ContinueWith(). When the task ends, we continue with a lambda that notifies the MoveNextAsync about the end of items. Luckily, we can reuse valueReady by sending true for items (as done in Send) and send false when the sequence is stopped.

For these, we have to modify both GetAsyncEnumerator and MoveNextAsync a bit:

public IAsyncEnumerator<T> GetAsyncEnumerator() 
{
    var enumerator = new GetAsyncEnumerator();
    generator(enumerator)
        .ContinueWith(async t => Stop(t));
    return enumerator;
}

public async ValueTask<bool> MoveNextAsync()
{
    var hasValue = await GetOrCreate(ref valueReady).Task;
    valueReady = null;

    if (hasValue)
    {
        Current = value;
        GetOrCreate(ref consumed).TrySetResult(true);
        return true;
    }
    return false;
}

internal async void Stop(Task t)
{
    await GetOrCreate(ref consumed).Task;
    consumed = null;

    if (t.IsFaulted)
    {
        GetOrCreate(ref valueReady).TrySetException(t.Exception);
    }
    else
    {
        GetOrCreate(ref valueReady).TrySetResult(false);
    }
}

The code in Stop will signal via consumed: false for normal completion and TrySetException when the user-provided task crashed. We still have to wait for consumed so that the last sent value is properly consumed before signaling the terminal state.

Detecting cancellation

Now we can detect when the task ends, but what about the case when the consumer of the IAsyncEnumerable wants to stop after some items and calls DisposeAsync. Unfortunately, the user-provided task has currently no way of knowing it should stop. We have to communicate this intent to it, for example, by providing a flag on the IAsyncEmitter:

interface IAsyncEmitter<in T>
{
    ValueTask Send(T item);
    
    bool IsCancellationRequested { get; }
}

A nice task would then check this flag:

Generate(async emitter =>
{
    if (emitter.IsCancellationRequested) return;

    emitter.Send(1);

    if (emitter.IsCancellationRequested) return;

    emitter.Send(3);

    // etc.
});

Unfortunately, this is not enough. Since the consumer and the user-provided task may run concurrently, what if the task misses the check and gets stuck in Send because the other side will never ever set consumed again?

To solve this issue, we have to introduce checks in Send as well as make sure DisposeAsync sets consumed to unblock as Send which then quits too.

sealed class GenerateEnumerator: IAsyncEnumerator<T>, IAsyncEmitter<T>
{

    volatile bool cancelled;

    public bool IsCancellationRequested => cancelled; // volatile property...

    public ValueTask DisposeAsync()
    {
        cancelled = true;
        GetOrCreate(ref consumed).TrySetResult(true);
        return new ValueTask();
    }

    public async ValueTask Send(T item)
    {
        if (cancelled) return;

        await GetOrCreate(ref consumed).Task;
        consumed = null;

        if (cancelled) return;

        value = item;

        GetOrCreate(ref valueReady).TrySetResult(true);
    }
}

(Remark: since the DisposeAsync has to be called after MoveNextAsync has completed, there is no risk of setting the consumed again get lost as we don't wait for valueReady this time.)

Waiting for DisposeAsync

So far we have covered sending an item asynchronously, signaling a termination and recognizing cancellations. However, we have omitted one part of the requirements of the IAsyncEnumerator API: DisposeAsync should wait until all resources are released. In our case, we don't know if the user-provided task has resources, or if it actually responds to IsCancellationRequested thus we'd have to wait for its task to end before letting DisposeAsynccontinue.

To resolve this requirement, we can introduce a plain TaskCompletionSource to let the IAsyncEnumerator.DisposeAsync wait for the termination as well. Unfortunately, we can't reuse valueReady because an in-progress Send may trigger it before the task ends. We have to signal the termination ourselves in Stop:

TaskCompletionSource<bool> taskDone;

internal async void Stop(Task t)
{
    await GetOrCreate(ref consumed).Task;
    consumed = null;

    if (t.IsFaulted)
    {
        GetOrCreate(ref valueReady).TrySetException(t.Exception);
    }
    else
    {
        GetOrCreate(ref valueReady).TrySetResult(false);
    }
    GetOrCreate(ref taskDone).TrySetResult(true);
}

public ValueTask DisposeAsync()
{
    cancelled = true;
    GetOrCreate(ref consumed).TrySetResult(true);
    return new ValueTask(GetOrCreate(ref taskDone).Task);
}

Now our generator is feature complete.

Unbounded-in async source

In certain situations, we can't make the value generator get suspended either because it doesn't support some form of async/await or non-blocking backpressue/resumption. Sometimes we don't really want to hold back the producer as we plan to hold onto all items anyway (caching and replaying), or we will sample/skip items based on external stimulus (debouncing, sampling).

This situation is similar to the previous "Ping-pong" scenario, but the Send won't wait for the consumed to be set, yet the consumer should be notified when new value(s) have become available.

Perhaps the easiest way to demonstrate such operators is an IObservable bridge to IAsyncEnumerable. As a reminder, IObservable is a form of push-based asynchrony where there is no non-blocking way of preventing the call to IObserver.OnNext. In case we don't want to lose items, we have to use a queue instead of the single value field so far.

We can combine IObserver and IAsyncEnumerator into a single class for this to begin with:

sealed class ObserverEnumerator : IAsyncEnumerator<T>, IObserver<T>, IDisposable
{
    // TODO implement
}

The IObserver protocol requires the consumer calls Dispose() if the consumption has ended.

readonly IObservable<T> source;

public IAsyncEnumerator<T> GetAsyncEnumerator()
{
    var enumerator = new ObserverEmitter();
    enumerator.upstream = source.Subscribe(enumerator);
    return enumerator;
}

sealed class ObserverEnumerator : IAsyncEnumerator<T>, IObserver<T>
{
    internal IDisposable upstream;

    // TODO implement rest
}

Since Subscribe() is synchronous, we can have the upstream field set before the consumer of the IAsyncEnumerator has chance to call DisposeAsync.

Now the IObserver will receive signals without slowing down so we have to keep collecting those items and the terminal events. Since there can be only one exception or one normal completion, we don't have to make an union of value-exception-complete to queue up the signals; any exception or completion signal can have its Exception error and bool done fields:

sealed class ObserverEmitter : IAsyncEnumerator<T>, IObserver<T>, IDisposable
{
    // deferred cancellation ...

    // queue up signals

    readonly ConcurrentQueue<T> queue;
    Exception error;
    volatile bool done;

}

Note that we plan to read done in MoveNextAsync in an ordered fashion for the following invariants to hold:

  1. if done == true and queue.IsEmpty == true, the sequence is finished
  2. if done == true, it is safe to read error.

The ordering is important here because if we check queue first, then done, there is a small window where in between the two, more items could arrive and done set to true. The consumer side then would conclude the sequence has finished yet the queue still holds those last items.

Consuming the IObservable

We will need a way to notify the consumer side in MoveNextAsync, but unfortunately, the previous consumed - valueReady pair no longer works because there is no way to await the consumed signal without blocking the IObserver methods (they are not defined as async after all).

Interestingly, all we need is valueReady and a loop in MoveNextAsync that tries to extract the signals or goes to await on valueReady.

TaskCompletionSource<bool> valueReady;

public void OnNext(T item)
{ 
    queue.Enqueue(item);
    GetOrCreate(ref valueReady).TrySetResult(true);
}

public void OnError(Exception ex)
{ 
    error = ex;
    done = true;
    GetOrCreate(ref valueReady).TrySetResult(true);
}

public void OnCompleted()
{
    done = true;
    GetOrCreate(ref valueReady).TrySetResult(true);
}

Consuming signals

The MoveNextAsync can be implemented in two ways: expecting to wait or expecting to work with signals. It affects where we await valueReady. Perhaps the more performant one is the latter where we avoid await if there is actually items ready and return immediately:

public T Current { get; private set; }

public async ValueTask<bool> MoveNextAsync()
{
    for (;;)
    {
        var isDone = done;
        var hasValue = queue.TryDequeue(out var value);

        if (isDone && !hasValue)
        {
            if (error != null) throw error;
            return false;
        }
        else if (hasValue)
        {
            Current = value;
            return true;
        }
       
        await GetOrCreate(ref valueReady).Task;
        Interlocked.Exchange(ref valueReady, null);
    }
}

In MoveNextAsync, we check the done flag and if the queue is empty. This makes sure items will be processed before completing or throwing the error from the IObservable. If both are true and error is not null, we throw it, which will crash the consumer's await MoveNextAsync() call, yielding the exception. Otherwise, there was no error and we return false to indicate the async sequence has ended.

If there was a value, we set Current and return true, indicating the value can now be consumed via IAsyncEnumerator.Current.

If the IObservable is neither done nor has available items, we have to wait for the valueReady signal. Similar to the "Ping-pong" case, once awaited, we have to clear valueReady but this time nulling out has to be atomic so that the GetOrCreate can properly see the reference change to null.

(Remark: The current library implementation uses a work-in-progress long field in addition to the valueReady field. It does work and avoids recreating the valueReady task too frequently and also avoids awaiting when it is known to have more things to do. In principle, this extended scheme may not be necessary after all.)

Dispose management

Lastly, we have to wire the dispose mechanism in:

public ValueTask DisposeAsync()
{
    upstream.Dispose();
    return new ValueTask();
}

Disposing an IObservable is synchronous and also fire-and-forget, thus there is nothing we could actually wait for in DisposeAsync, hence the immediate completion.

Writing intermediate operators

Given the sources so far, we'd like to have transformations, aka typical LINQ operators, and coordinated sequences on top of those. Let's begin with the usually simpler transformative operators.

Mapping async sequences

The most common operator is to turn items of a source sequence into other types or different values. It is usually called Select or Map in various libraries.

Writing this kind of operator is quite straightforward, but the boilerplate of defining the outer IAsyncEnumerable is still required:

public sealed class Map<T, R> : IAsyncEnumerable<R>
{
    readonly IAsyncEnumerable<T> source;

    readonly Func<T, R> mapper;

    // constructor for the readonly fields...

    public IAsyncEnumerator<R> GetAsyncEnumerator() 
    {
        return new MapEnumerator(
           source.GetAsyncEnumerator(),
           mapper
        );
    }

    internal sealed class MapEnumerator : IAsyncEnumerator<R>
    {
        readonly IAsyncEnumerator<T> source;

        readonly Func<T, R> mapper;

        // constructor for the readonly fields ...
    }
}

Intermediate operators take one or more source IAsyncEnumerables and the intermedate enumerators need to call GetAsyncEnumerator() to chain onto their upstream.

For the Map-like operators, the implementation of the IAsyncEnumerator methods are relatively boilerplate-free:

public R Current { get; private set; }

public async ValueTask<bool> MoveNextAsync()
{
    if (await source.MoveNextAsync())
    {
        Current = mapper(source.Current);
        return true;
    }
    return false;
}

public ValueTask DisposeAsync()
{
    Current = default;
    return source.DisposeAsync();
}

Here shines the async-await the best: we await the next item from the source, and if there is any, we map it via the function and save it into Current. All the state machine and continuations are generated by the compiler for us. In addition, we don't need try-catch around mapper because the state machine will crash the returned task for us.

In DisposeAsync, we can mostly delegate to the source's DisposeAsync, there is no need to async-await in this situation.

Now imagine we'd like to do the mapping itself asynchronously. No problem as it needs only a small signature change to the mapper function and an extra await:

readonly Func<T, Task<R>> mapperAsync;

public async ValueTask<bool> MoveNextAsync()
{
    if (await source.MoveNextAsync())
    {
        Current = await mapperAsync(source.Current);
        return true;
    }
    return false;
}

Since the logical flow is sequential and we have to await one thing at a time, it makes the implementation easy to adjust.

Filtering async sequences

The next common thing is to not relay items given some circumstances. Such operators are called Where or Filter in libraries.

They take a predicate and call it for each incoming item to decide which to deliver. However, unlike Map, if the predicate tells not to deliver, the implementor of MoveNextAsync still has to produce an item, returning false in this case is often not desired yet. Therefore, the source has to be pulled over and over until a suitable item arrives:

readonly Func<T, bool> predicate;

public T Current => source.Current;

public ValueTask<bool> DisposeAsync() => source.DisposeAsync();

public async ValueTask<bool> MoveNextAsync()
{
    while (await source.MoveNextAsync())
    {
        if (predicate(source.Current))
        {
            return true;
        }
    }
    return false;
}

In this setup, both Current and DisposeAsync can be delegated back to the upstream source as we don't change value types here. The loop in MoveNextAsync keeps iterating if the predicate returned false until either there is a source item that passes, quitting the loop, or there are no more items left.

Turning this into an async predicate is straightforward again:

readonly Func<T, Task<bool>> predicate;

public async ValueTask<bool> MoveNextAsync()
{
    while (await source.MoveNextAsync())
    {
        if (await predicate(source.Current))
        {
            return true;
        }
    }
    return false;
}

Stopping a sequence in time

So far, the intermediate operators had to deal with one IAsyncEnumerable source and no other external factors. The more interesting operators though involve more than one such source and/or time.

For example, we may want to take items for a specified duration from an async source. This is often called Take in libraries with time as its argument. The Task.Delay is a natural way for doing timed operations, however, we run into a problem: we can only await one thing in an async method:

public async ValueTask MoveNextAsync()
{
    either 
    {
        await source.MoveNextAsync();
        return true; // ???
    }
    or
    {
        await Task.Delay(time);
        return false; // ???
    }
}

If we await the source first, the sequence could be live for too long. If we wait for the delay, we can't keep requesting items from the source.

This is a situation where we have to use ContinueWith on both the source.MoveNextAsync and Task.Delay to let each run with its own place. Unfortunately, this brings us away from the nice compiler-generated state machines and guaranteed non-reentrance.

One of the complication is that Task.Delay should end the sequence by making MoveNextAsync return false while source.MoveNextAsync may wish to signal a new item. Consequently, if Delay wins, the consumer may call source.DisposeAsync while source.MoveNextAsync is in progress, which is not allowed. Let's see how to resolve both problems.

Basic setup

We'll need our friend the valueReady TaskCompletionSource to wait for the signal that may come from the delay or the next available item from the source:

sealed class Take : IAsyncEnumerator<T>
{
    readonly IAsyncEnumerator<T> source;

    TaskCompletionSource<bool> valueReady;

    public T Current { get; private set; }

    public async ValueTask<bool> MoveNextAsync()
    {
         if (await CreateOrGet(ref valueReady).Task)
         {
              Current = source.Current;
              return false;
         }
         return true;
    }
}

For one, we need the source.MoveNextAsync to be executed, but we can't await it in our MoveNextAsync as it would prevent the signal from Delay to also unblock valueReady. Therefore, we need a method to handle the completion of source.MoveNextAsync and make it run in the first place.

public IAsyncEnumerator<T> GetAsyncEnumerator() 
{
    var enumerator = new TakeEnumerator(source.GetAsyncEnumerator());
    enumerator.source.MoveNextAsync()
       .AsTask()
       .ContinueWith(t => enumerator.SourceReady(t));
    return enumerator; 
}

internal void SourceReady(Task<bool> t)
{
    if (t.IsFaulted)
    {
        GetOrCreate(ref valueReady).TrySetException(t.Exception);
    }
    else
    {
        GetOrCreate(ref valueReady).TrySetResult(t.Result);
    }
}

This will produce one item and then stop because we have to keep calling source.MoveNextAsync.

Reentrance problem

Unfortunately, there is a small complication. MoveNextAsync may complete synchronously thus the continuation may run on the current thread. Calling source.MoveNextAsync again would enter into a recursion and eventually fill in the stack.

To solve this problem, we have to introduce a so-called trampoline; a non-blocking construct that makes sure the recursion doesn't happen but the fact that source.MoveNextAsync has to be called again is remembered.

public IAsyncEnumerator<T> GetAsyncEnumerator() 
{
    var enumerator = new TakeEnumerator(source.GetAsyncEnumerator());
    enumerator.GetNext();
    return enumerator; 
}

internal sealed class TakeEnumerator : IAsyncEnumerator<T>
{
    // everything else

    int wip;

    public void GetNext() {
        if (Interlocked.Increment(ref wip) == 1)
        {
            do
            {
                 source.MoveNextAsync()
                    .AsTask()
                    .ContinueWith(t => SourceReady(t));
            }
            where (Interlocked.Decrement(ref wip) != 0);
        }
    }
}

Since the change in wip from zero to one is guaranteed to be atomic, there is only one thread at a time that can enter the trampoline, yet entry attempts are remembered in wip > 1 values. The do-while loop then decrements until all such attempts are fulfilled. This has the added benefit of working for mixed synchronous and asynchronous GetNext invocations; we don't really know if the ValueTask<bool> of MoveNextAsync will complete synchronously or asynchronously as it can change from invocation to invocation.

Now that we can safely get the next value, we have to keep asking for the next value until valueReady ends up false.

public async ValueTask<bool> MoveNextAsync()
{
    if (await CreateOrGet(ref valueReady).Task)
    {
        valueReady = null;
        Current = source.Current;
        GetNext();
        return false;
    }
    return true;
}

Invoking GetNext will start fetching the next value from the source while the consumer is busy with the current one. Clearing valueReady, is safe as the next ready signal can only come after GetNext and its MoveNextAsync() call completes.

Adding a time limit

The current setup will deliver all items now, but we still have to stop it after some time delay. Let's add a Task.Delay into the mix. Remember, we can't await it either as it would block out the other activities.

public IAsyncEnumerator<T> GetAsyncEnumerator() 
{
    var enumerator = new TakeEnumerator(source.GetAsyncEnumerator());
    enumerator.StartTime();
    enumerator.GetNext();
    return enumerator; 
}

// ...

readonly TimeSpan time;

internal void StartTime() 
{
    Task.Delay(time)
    .ContinueWith(t => OnTime());
}

void OnTime()
{
    GetOrCreate(ref valueReady).TrySetResult(false);
}

We call StartTime before GetNext in GetAsyncEnumerator so that the delay has chance to start in case MoveNextAsync and thus SourceReady end up running synchronously.

This setup has two problems though:

  • The delay should be stopped if the source async sequence ends on its own or gets stopped via DisposeAsync.
  • The stop signal may get lost as now two threads may race for the right TrySetResult call, the source trying to set it to true and the delay to set it false. In consequence, the stop indication of the delay can get lost.

Cancelling the delay

Let's address the first problem by introducing a CancellationTokenSource which will provide the token for the cancellation as well as means to trigger the canellation from DisposeAsync.

readonly CancellationTokenSource stop;

internal void StartTime() 
{
    Task.Delay(time, stop.Token)
    .ContinueWith(t => OnTime(), stop.Token);
}

public ValueTask DisposeAsync()
{
    stop.Cancel();
    return source.DisposeAsync();
}

Unfortunately, this replaces one problem with another. The call to DisposeAsync may be the result of the delay firing and the consumer of the TakeEnumerator ending the sequence while a GetNext and thus the source.MoveNextAsync() is in progress. Since this is not allowed by the API specification, we have to add some mechanism to prevent such overlapping calls.

Here comes trampolining again, but we need a separate one from wip that tracks GetNext invocations: we don't want to run GetNext, on the contrary, we want to stop it and also make sure source.DisposeAsync gets called once and after a running source.MoveNextAsync() completes. Therefore, we introduce the disposeWip and atomic changes to it will indicate if MoveNextAsync can progress or that source.DisposeAsync should be called.

int disposeWip;

public async ValueTask DisposeAsync()
{
    stop.Cancel();
    if (Interlocked.Increment(ref disposeWip) == 1)
    {
        return source.DisposeAsync();
    }
    return new ValueTask();
}

public void GetNext()
{
    if (Interlocked.Increment(ref wip) == 1)
    {
        do
        {
            if (Interlocked.Increment(ref disposeWip) == 1)
            {
                 source.MoveNextAsync()
                    .AsTask()
                    .ContinueWith(t => SourceReady(t));
            }
            else
            {
                break;
            }
        }
        where (Interlocked.Decrement(ref wip) != 0);
    }
}

internal void SourceReady(Task<bool> t)
{
    if (Interlocked.Decrement(ref disposeWip) != 0)
    {
        source.DisposeAsync();
    } 
    else
    if (t.IsFaulted)
    {
        GetOrCreate(ref valueReady).TrySetException(t.Exception);
    }
    else
    {
        GetOrCreate(ref valueReady).TrySetResult(t.Result);
    }
}

In DisposeAsync, the atomic state transition from zero to one indicates the source can be safely disposed. If disposeWip is set to one by GetNext, it prevents DisposeAsync from disposing as the transition would be from one to two. As a consequence, the disposeWip should be decremented once MoveNextAsync completes, thus SourceReady performs a decrement. If this decrement, however, doesn't reach zero, it means DisposeAsync was attempted during the execution of MoveNextAsync and now the source can be safely disposed.

Awaiting disposition

We can't be happy yet because the API specification of DisposeAsync requires it to complete only when the upstream has also completed disposing. In the latest setup, if the DisposeAsync does not itself call source.DisposeAsync, it has to work for SourceReady's call to source.DisposeAsync to complete in order to complete itself.

Therefore, we have to introduce yet another ready signal: disposeTask as TaskCompletionSource and have it complete when source.DisposeAsync() completes via ContinueWith again. This case may not trigger after all, thus we can use GetOrCreate with disposeTask to create it on demand.

TaskCompletionSource<bool> disposeTask;

public ValueTask DisposeAsync()
{
    if (Interlocked.Increment(ref disposeWip) == 1)
    {
        return source.DisposeAsync();
    }
    return new ValueTask(GetOrCreate(ref disposeTask).Task);
}

internal void SourceReady(Task<bool> t)
{
    if (Interlocked.Decrement(ref disposeWip) != 0)
    {
        source.DisposeAsync()
            .AsTask()
            .ContinueWith(x => DisposeReady(x));
    } 
    else
    if (t.IsFaulted)
    {
        GetOrCreate(ref valueReady).TrySetException(t.Exception);
    }
    else
    {
        GetOrCreate(ref valueReady).TrySetResult(t.Result);
    }
}

void DisposeReady(Task t)
{
    if (t.IsFaulted)
    {
        GetOrCreate(ref disposeTask).TrySetException(t.Exception);
    }
    else
    {
        GetOrCreate(ref disposeTask).TrySetException(true); // or false, doesn't matter
    }
}

Not losing the delay signal

To resolve the last issue with this operator, we have to remember the delay signal separately as well by introducing a volatile boolean field that can be checked before and/or after the await in MoveNextAsync:

volatile boolean timerSignalled;

void OnTime()
{
    timerSignalled = true;
    GetOrCreate(ref valueReady).TrySetResult(false);
}

public async ValueTask<bool> MoveNextAsync()
{

    if (!timerSignalled 
           && await CreateOrGet(ref valueReady).Task 
           && !timerSignalled)
    {
        Interlocked.Exchange(ref valueReady, null);
        Current = source.Current;
        GetNext();
        return false;
    }
    return true;
}

The check of !timerSingalled after await will act as a guard if OnTime lost the TrySetResult race after all. Note that setting valueReady to null with atomics will make sure OnTime sees valueReady up-to-date as well.

Disposing multiple sources

One would think the dispose trampolining and Dispose resumption is complicated enough, but there is a follow-up situation where multiple source IAsyncEnumerators have to be disposed. For one, each source has to be protected by its own dedicated disposeWip counter. Then the main DisposeAsync has to be prepared for waiting for all the other IAsyncEnumerator.DisposeAsync() to complete before completing itself.

Here, we can't use a shared disposeTask directly, but we must protect it until all sources have completed by using a thread-safe counting method (after all, DisposeAsyncs may run concurrently with each other).

Let's assume we have two source async enumerators. They have their own disposeWip1 and disposeWip2 respectively, but they can share a single DisposeReady:

int disposeWip1;

int disposeWip2;

int disposeWipBoth;

TaskCompletionSource<bool> disposeTask;

void DisposeReady(Task t)
{
    if (Interlocked.Increment(ref disposeWipBoth) == 2)
    {
         GetOrCreate(ref disposeTask).TrySetResult(true);
    }
}

If the number of sources is known, Interlocked.Increment works well. In contrast, if the number of sources changes over time (i.e., when FlatMap-ing a variable number of sources), one would use Interlocked.Decrement and check against zero:

int disposeWipBoth = 1;

CopyOnWriteArrayList<IAsyncEnumerator<T>> sources;

void AddSource(IAsyncEnumerator<T> source)
{
    Interlocked.Increment(ref disposeWipAll);
    sources.Add(source);
}

void RemoveSource(IAsyncEnumerator<T> source)
{
    sources.Remove(source);
    source.DisposeAsync()
       .AsTask()
       .ContinueWith(t => DisposeReady(t));
}

void MainDone()
{
    if (Interlocked.Decrement(ref disposeWipAll) == 0)
    {
         GetOrCreate(ref disposeTask).TrySetResult(true);
    }
}

void DisposeReady(Task t)
{
    if (Interlocked.Decrement(ref disposeWipAll) == 0)
    {
         GetOrCreate(ref disposeTask).TrySetResult(true);
    }
}

If there is a main source (such as with FlatMap), the disposeWipAll should start out with 1. As sources get added and removed, the disposeWipAll keeps track of the current count. If the main finishes, one should call MainDone to have that initial 1 removed and possibly signal to the disposeTask. Either MainDone gets called before all sources complete disposing, or the other way around. In all cases, once every source has declared completion, disposeTask will be marked completed as well.

Error aggregation

The API specification of DisposeAsync implies its task can fail. In the solo case, we relayed this Exception to the disposeTask. However, if there are multiple sources involved, each of them may fail with DisposeAsync and it would be rude to lose those Exceptions. Therefore, we need to aggregate such exceptions before disposeTask is notified at the end in a thread safe way nonetheless.

There are many ways to do this, i.e., queueing up exceptions or using atomics. We'll see the latter approach as the pattern itself is more interesting.

First of all, let's define a method that atomically aggregates Exceptions into AggregateExceptions if there are more of them already:

void AddException(ref Exception field, Exception ex)
{
    var next = default(Exception);
    for (;;)
    {
        var current = Volatile.Read(ref field);
        if (current == null)
        {
            next = ex;
        }
        else
        if (current is AggregateException g)
        {
            var list = new List<Exception>(g.InnerExceptins);
            list.Add(ex);
            next = new AggregateException(list);
        }
        else
        {
            next = new AggregateException(current, ex);
        }
        if (Interlocked.CompareExchange(ref field, next, current) == current)
        {
            break;
        }
    }
}

There are 3 cases to handle:

  1. There was no previous Exception in field, thus there is no need for an AggregateException yet.
  2. There was a previous non-AggregateException, therefore, make a new AggregateException with the current and the new one.
  3. There was a previous AggregateException; get its inner exceptions into a list, add the new one and create a fresh AggregateException (as they are immutable).

Now we can aggregate and later signal the exception if any:

Exception errors;

void MainDone()
{
    if (Interlocked.Decrement(ref disposeWipAll) == 0)
    {
        AllDisposeReady();
    }
}

void DisposeReady(Task t)
{
    if (t.IsFaulted)
    {
        AddException(ref errors, t.Exception);
    }
    if (Interlocked.Decrement(ref disposeWipAll) == 0)
    {
         AllDisposeReady();
    }
}

void AllDisposeReady()
{
    if (errors != null) 
    {
        GetOrCreate(ref disposeTask).TrySetException(errors);
    }
    else
    {
        GetOrCreate(ref disposeTask).TrySetResult(true);
    }
}

Prefetching an async sequence

Now that we can safely get the next source item outside of an async/await structure and also we can queue up items to be consumed by MoveNextAsync, let's see if we can combine the two and prefetch items from an async sequence.

Why would we do that? MoveNextAsync provides a natural backpressure. Yes, for one item at a time and in a "Ping-pong" manner. Further items wouldn't be generated until the consumer calls MoveNextAsync which generation itself could take some time. This may introduce unwanted latency we could avoid by letting the producer run as fast as possible for a limited number of items, and let the consumer catch up by itself.

This is the default premise in Reactive Streams and non-blocking backpressure. There is a request channel and consumers frequently signal request(N) to get more items generated. There is no such channel in async sequences but we can emulate the effect by doing some request accounting similar to how RxJava does it, albeit turned a bit inside out.

First we need the usual queue, done and error fields to hold onto the source IAsyncEnumerator's outcome and signal:

sealed class Prefetch : IAsyncEnumerator<T>
{
    readonly ConcurrentQueue<T> queue;
    volatile bool done;
    Exception error;

    TaskCompletionSource<bool> valueReady;
}

Then, we need the trampolined, dispose-excluded interaction with the source IAsyncEnumerator:

readonly IAsyncEnumerator<T> source;

int wip;

int disposeWip;

TaskCompletionSource<bool> taskDispose;

And finally, the accounting for how many items to get from the source async sequence via repeated calls to MoveNextAsync():

readonly int limit;

int consumed;

int outstanding;

internal PrefetchEnumerator(IAsyncEnumerator<T> source, int prefetch)
{
    this.source = source;
    this.outstanding = prefetch;
    this.limit = prefetch - (prefetch >> 2); // = prefetch * 0.75
}

The initial outstanding request amount is the prefetch amount so that the operator can start immediately pulling source items. The limit is used for deciding when to and how many more items to request, keeping the source busy while the consumer still works with the previously requested items while also making sure there are no more than prefetch number of items in a queue. This prevents unbounded memory usage by the queue.

public IAsyncEnumerator<T> GetAsyncEnumerator()
{
    var enumerator = new PrefetchEnumerator(source.GetAsyncEnumerator(), prefetch);
    enumerator.GetNext();
    return enumerator;
}

// ...

void SourceReady(Task<bool> t)
{
     if (Interlocked.Decrement(ref disposeWip) != 0)
     {
         source.DisposeAsync()
            .AsTask()
            .ContinueWith(t => DisposeReady(t));
     }
     else if (t.IsFaulted)
     {
         error = t.Exception;
         done = true;
         GetOrCreate(ref valueReady).TrySetResult(true);
     }
     else if (t.Result)
     {
         queue.Enqueue(source.Current);
         GetOrCreate(ref valueReady).TrySetResult(true);
     
         if (Interlocked.Decrement(ref outstanding) != 0)
         {
             GetNext();
         }
     }
     else
     {
         done = true;
         GetOrCreate(ref valueReady).TrySetResult(true);
     }
}

Other than signaling for the next item, error or completion, we decrement the outstanding request amount and stop requesting if it reaches zero. This means the consumer is not consuming all the prefetch items readily available for some reason.

Finally, let's see the way MoveNextAsync is implemented:

public async ValueTask<bool> MoveNextAsync()
{
    for (;;)
    {
        var isDone = done;
        var hasValue = queue.TryDequeue(our var value));

        if (isDone && !hasValue)
        {
            if (error != null) throw error;
            return false;
        }
        else if (hasValue)
        {
            Current = value;
            int c = consumed + 1;
            if (c == limit)
            {
                consumed = 0;
                if (Interlocked.Add(ref outstanding, c) == c)
                {
                     GetNext();
                }
            }
            else
            {
                consumed = c;
            }
            return true;
        }
        await GetOrCreate(ref valueReady).Task;
        Interlocked.Exchange(ref valueReady, null);
    }
}

Here we account the number of consumed items and once the low-water mark, specified by limit, we request by adding the limit to the outstanding requests. This will hopefully keep the SourceReady calling GetNext. If the outstanding count has been completely fulfilled (due to slow consumer calling this MoveNextAsync too infrequently), the change from zero to limit should restart the polling again by triggering GetNext from within MoveNextAsync.

Reactive Streams Interop

When interfacing with a Reactive Streams producer, the outstanding accounting is not necessary as it is done by the Publisher when Subscription.request(prefetch) and Subscription.request(limit) is performed:

public IAsyncEnumerator<T> GetAsyncEnumerator()
{
    var enumerator = new SubscriberEnumerator();
    source.Subscribe(enumerator);
    return enumerator;
}

internal sealed class SubscriberEnumerator : IAsyncEnumerator<T>, ISubscriber<T>

    Subscription upstream;

    // ...

    public void OnSubscribe(Subscription s)
    {
        upstream = s;
        s.Request(prefetch);
    }

    public void OnNext(T item)
    {
        queue.Enqueue(item);
        GetOrCreate(ref valueReady).TrySetResult(true);
    }

    public void OnError(Exception ex)
    {
        upstream = null;
        error = ex;
        done = true
        GetOrCreate(ref valueReady).TrySetResult(true);
    }

    public void OnComplete()
    {
        upstream = null;
        done = true
        GetOrCreate(ref valueReady).TrySetResult(true);
    }

    public ValueTask DisposeAsync()
    {
        upstream?.Cancel();
        return new ValueTask();
    }

    public async ValueTask<bool> MoveNextAsync()
    {
        for (;;)
        {
            var isDone = done;
            var hasValue = queue.TryDequeue(our var value));

            if (isDone && !hasValue)
            {
                if (error != null) throw error;
                return false;
            }
            else if (hasValue)
            {
                Current = value;
                int c = consumed + 1;
                if (c == limit)
                {
                    consumed = 0;
                    upstream.Request(c);
                }
                else
                {
                    consumed = c;
                }
                return true;
            }
            await GetOrCreate(ref valueReady).Task;
            Interlocked.Exchange(ref valueReady, null);
        }
    }
}

It's less complicated but looks quite similar to the prefetch case.

Conclusion

On one hand, writing certain operators requires less boilerplate than, for example, Rx.NET or RxJava. On the other hand, the coordination, resumption, mutually excluding MoveNextAsync with DisposeAsync can get quite complicated. Needless to say, it is possible to create most Rx operators with IAsyncEnumerables.

There are a few unknowns at the moment:

  1. An operator like Never may be not possible. Since it never completes its MoveNextAsync, one can't call DisposeAsync ever and thus no way to stop such sequence.
  2. The Task/ValueTask have the ConfigureAwait method that lets one define if the continuation should capture the current concurrency context or not. I.e., if await would resume on the same thread that started waiting. It is unclear when these ConfigureAwait(false)s should be applied or if it should actually be the default.
  3. Turning ValueTasks into Tasks by AsTask may incur additional overhead when working with ContinueWith follow-ups. Since Task and ValueTask are not compatible as method parameters, one would need to write, for example, SourceReady(Task<bool> t) and SourceReady(ValueTast<bool> t) handlers twice.
  4. Overall performance may be nearly not as good compared to the Reactive Streams after all. Each signal type almost certainly incurs two atomic operations when working with multiple sources and/or time.
  5. Will the specification remain as is? Certainly the DisposeAsync not being a cancel-anytime is an issue sometimes but I don't have an idea what else should be if we still want to await it.