Skip to content

Writing operators

David Karnok edited this page Oct 26, 2018 · 22 revisions

Introduction

Writing operators for IAsyncEnumerable is both easier and harder than working with IObservables or Reactive Streams Publishers. It is easier for intermediate operators such as Map and Filter where async/await and compiler generated state machines line up pretty well. It is harder for operators working with multiple sources or time because one can only await one thing in an async method at a time and often there is no clear thing to await first or that how many awaitable primitives to introduce.

In this wiki, I'll explain how to write such easy and complicated operators.

Table of contents

IAsyncEnumerable primer

Despite the popularity of Rx.NET for push-based declarative data processing and the success of the non-blocking backpressure-enabled RxJava libraries, there is always the buzz that maybe it is possible to have convenient backpressure via coroutines, continuations, async-await or fibers.

One option is to make IEnumerable/IEnumerator support async/await and let the compiler work out the resumption state machine. We only have to await the next item and the concurrency is taken care of.

There were several proposals about such an async enumerable, and the latest one that seems to be winning is defined as follows.

The proposal introduces 3 new interfaces backing the concept of async sequences as well as language/compiler changes. We don't care about the latter as we don't have any such changes ready, but we can work with the interfaces right away as the necessary async/await support is already part of the released C# language.

The new interfaces and their methods were named distinctively with an Async word so that they can be implemented on a class which already supported the synchronous IEnumerable/IEnumerator without conflict.

IAsyncDisposable

The IAsyncDisposable is the analogue of the IDisposable interface, but instead of a blocking or fire-and-forget disposal, the DisposeAsync has to be called and awaited to make sure resources associated with an async sequence are all released before continuing.

public interface IAsyncDisposable 
{
    ValueTask DisposeAsync();
}

Note that the design uses ValueTask instead of Task as there is likely enough cases where the method can run synchronously and thus the execution could avoid creating task objects and continuation handlers altogether.

IAsyncEnumerator

The IAsyncEnumerator is analogous with IEnumerator that allows moving for the next item, awaiting this to happen and then getting the actual value if there is any.

public interface IAsyncEnumerator<out T> : IAsyncDisposable 
{
     ValueTask<bool> MoveNextAsync();

     T Current { get; }
}

The IAsyncEnumerator extends the IAsyncDisposable and the consumer is required to await DisposeAsync whenever it is done with the async enumerator.

There are a couple rules to be honored, which will be quite important later when coordination is required:

  • MoveNextAsync must be called non-concurrently and non-overlappingly. This means that one has to wait for the method's returned task to signal true before calling it again. When used with async/await, this is guaranteed by the compiler generated state machine. Otherwise, call to this method must be serialized externally.
  • Current is only valid if the task evaluates to true after a MoveNextAsync. It is invalid to read it before the first MoveNextAsync call, after a false task or after the call to DisposeAsync.
  • DisposeAsync cannot be called concurrently with MoveNextAsync but only when the next task evaluates to true/false/failure. In addition, it has to be awaited and is expected to return only when the whole chain above it has been disposed as well in a cascading fashion.
  • The methods should not throw exceptions themselves but they can return a failed/failing task.

Unfortunately, DisposeAsync does not serve as an anytime cancellation like IDisposable.Dispose() in Rx.NET and since it must not run concurrently with MoveNextAsync, we have to serialize their execution externally in case we have something that an async-await loop could handle.

IAsyncEnumerable

Finally the IAsyncEnumerable is the equivalent of IEnumerable with a single method to get the IAsyncEnumerator. This, in theory, allows lazy and deferred async sequences which can be run as many times as necessary and even concurrently with each other. Only interacting with the IAsyncEnumerator has to be serialized via async/await or external means.

public interface IAsyncEnumerable<out T> 
{
    IAsyncEnumerator<T> GetAsyncEnumerator();    
}

This method has to return the async enumerator synchronously and not throw any exception.

Unfortunately, this is the only level where eager cancellation support can be injected, but unlike the Rx.NET scheme, it can't be made known to the downstream consumers within the flow. One has to keep injecting CancellationTokens into such operators.

There is an additional problem with such injections. One can't inject different CancellationTokens into different realizations of GetAsyncEnumerator()s: they have to live on the IAsyncEnumerable level and cancelling a token will cancel for all running async enumerators. Usually one would like to correlate cancellation with running async enumerators (which one gets from IObservable.Subscribe quite clearly) so only some runs could be cancelled.

The solution is most likely to defer the creation of the entire IAsyncEnumerable itself so cancellation can be dedicated at the point when an async enumerator is requested by some consumer.

Writing async sources

Async sources are IAsyncEnumerable which are some kind of generators or bridges to non-async sequences. Perhaps the simples generator is the Range operator that generates numbers on demand.

Synchronous generator

At this moment, there is no compiler support for writing an IAsyncEnumerable in the style of a IEnumerator: there is no yield return or yield break support, therefore, we'll have to write a bit of a boilerplate:

public sealed class Range : IAsyncEnumerable<int>
{
    readonly int start;
    readonly int end;

    public Range(int start, int end)
    {
        this.start = start;
        this.end = end;
    }

    public IAsyncEnumerator<int> GetAsyncEnumerator()
    {
        // TODO implement
    }
}

The outer IAsyncEnumerator usually hosts the readonly parameters of the sequence which then usually has to be passed to the async enumerator created:

    public IAsyncEnumerator<int> GetAsyncEnumerator()
    {
        return new RangeEnumerator(start, end);
    }

    internal sealed class RangeEnumerator : IAsyncEnumerator<int>
    {
        readonly int end;

        int index;

        internal RangeEnumerator(int start, int end) 
        {
             this.end = end;
             this.index = start;
        }  

        // TODO implement methods and properties
    }
}

So far, nothing complicated. We need a mutable index field to track where the range is currently at when the consumer requests the next value.

The first thing to implement is the Current property:

        public int Current { get; private set; }

Here, we run into a very common theme with async sequence state, hence the writable property: we have to store the value to be read by the consumer somewhere while the state could be prepared for retrieving the next value upon a call to MoveNextAsync. In respect of our range operator, this should become apparent from the MoveNextAsync implementation:

        public ValueTask<bool> MoveNextAsync()
        {
             if (index == end) 
             {
                 return new ValueTask(false);
             }
             Current = index++;
             return new ValueTask(true);
        }

When we determine we haven't reached the end yet, we take the current index and save it into Current, then move it forward by one so that the next invocation of MoveNextAsync can test it against the end as well. If we simply returned index as the Current's getter, we'd have the wrong number available!

(Remark: in this particular case, one could simply have index = start - 1 and then have index++ at the beginning of MoveNextAsync, but watch out for the integer underflow. Generally, we will likely not work with numbers or objects where the previous value is so easily made available.)

Interestingly, we don't need the async keyword on the method because we don't do anything that goes asynchronous: we decide what the next result would be and return a ValueTask structure avoiding allocation. We could have written the method as follows:

        public async ValueTask<bool> MoveNextAsync()
        {
             if (index == end) 
             {
                 return false;
             }
             Current = index++;
             return true;
        }

so that the resulting boolean value is automatically turned into the right task structure/object. Unfortunately, marking a method async and not doing any await nets an annoying warning currently. Using the explicit return of ValueTasks keeps the code warning free (without disabling the warning of course).

Finally, we have to implement the DisposeAsync method, although we don't have any resources or things to do upon disposal:

        public ValueTask DisposeAsync()
        {
            Current = default;
            return new ValueTask();
        }

Usually, when working with objects instead of ints, one would make sure their references are not kept beyond the lifecycle of the async sequence. This means, for example, clearing the last Current value and/or links in other class fields.

Consuming the Range operator at the end is generally done this way:

public async void Main(string[] args)
{
    var enumerator = new Range(1, 6).GetAsyncEnumerator();
    try 
    {
         while (await enumerator.MoveNextAsync())
         {
             Console.WriteLine(enumerator.Current);
         }
         Console.WriteLine("Done!");
    }
    finally
    {
        await enumerator.DisposeAsync();
    }
}

(Remark: there are plans to support a less verbose foreach async and using async expressions where one can focus on the loop body and the rest is generated automatically.

Asynchronous generator

If we have to interface with some external source which supports async/await, we have often a quite easy job as we can write an async marked MoveNextAsync method and simply await values right there.

For example, let's write a source that generates items after some time delay:

   internal sealed class Interval : IAsyncEnumerator<long>
   {
       readonly TimeSpan delay;

       internal Interval(TimeSpan delay)
       {
           this.delay = delay;
       }

       long next;

       public long Current { get; private set; }

       public async ValueTask<bool> MoveNextAsync()
       {
           await Task.Delay(delay);
           Current = next++;
           return true;
       }

       public ValueTask DisposeAsync()
       {
           return new ValueTask();
       }
   }

(Remark: from now on, the enclosing IAsyncEnumerable is omitted if it is trivial to write based on what parameters the enumerator uses.)

If one worked with Task.Delay, it becomes apparent that we didn't add a CancellationToken parameter. The reason for it is that it makes no sense in this style of async sequence design. One would think to add a CancellationTokenSource field and have it Canceled in DisposeAsync, but the specification forbids calling DisposeAsync while MoveNextAsync is in progress. The only way for an unaware consumer is to stop calling MoveNextAsync after the previous call completed.

We'll see later on that this non-concurrent call requirement of MoveNextAsync and DisposeAsync makes things convoluted and may even elongate the lifecycle of async sequences in an undesirable manner.

Push-pull adaptation

So far, the example sources were of the pull kind: the consumer indicated when it wanted the next item and the async source generated it right there, on demand. Basically, we had a request-1 backpressure situation.

However, some sources (and intermediate operators) are required to run in a form of push: the value to be given to the consumer of the IAsyncEnumerator may happen before the consumer has called MoveNextAsync. Sometimes more than one value has to wait in this manner.

This is the traditional consumer-producer problem which is solved by using some blocking mechanisms: blocking queues or wait-notify pairs. Unfortunately, we are in the async/non-blocking word so these won't be good enough for us (and we don't have a runtime that magically saves and restores the call-stack upon blocking like Project Loom aims for).

Therefore, we need to work with a non-blocking and async friendly continuation mechanisms by using atomics (Interlocked) and manual task completions (TaskCompletionSource).

Currently, there are 3 kinds of such rendezvous situations to consider:

  1. "Ping-pong": where the producer and the consumer would wait for the other to be ready.
  2. "Unbounded-in" producer: where the producer runs with its own pace and the consumer will get items at its own pace (may be designed as lossy).
  3. "Prefetched" producer: the producer is allowed to generate up to a certain number of items and then pauses until the consumer has taken some/all items, after which the producer is resumed.

Practically, the "Ping-pong" case is equivalent to the "Prefetched" case where the number is 1. In backpressure terms, "Ping-pong" is request(1) and "Prefetchedisrequest(N)withrequest(M)afterM` items have been consumed.

Tooling-vise, the simplest to implement is the "Ping-pong" case, but first, we need that async/non-blocking continuation scheme.

async Resume

Bridge to IObservable

So far, the implementations were relatively simple. Keep awaiting if necessary, save the value and return a true task.

Clone this wiki locally