-
Notifications
You must be signed in to change notification settings - Fork 3
Writing operators
Writing operators for IAsyncEnumerable
is both easier and harder than working with IObservable
s or Reactive Streams Publisher
s. It is easier for intermediate operators such as Map
and Filter
where async
/await
and compiler generated state machines line up pretty well. It is harder for operators working with multiple sources or time because one can only await
one thing in an async method at a time and often there is no clear thing to await first or that how many awaitable primitives to introduce.
In this wiki, I'll explain how to write such easy and complicated operators.
Despite the popularity of Rx.NET for push-based declarative data processing and the success of the non-blocking backpressure-enabled RxJava libraries, there is always the buzz that maybe it is possible to have convenient backpressure via coroutines, continuations, async-await or fibers.
One option is to make IEnumerable
/IEnumerator
support async/await and let the compiler work out the resumption state machine. We only have to await the next item and the concurrency is taken care of.
There were several proposals about such an async enumerable, and the latest one that seems to be winning is defined as follows.
The proposal introduces 3 new interfaces backing the concept of async sequences as well as language/compiler changes. We don't care about the latter as we don't have any such changes ready, but we can work with the interfaces right away as the necessary async/await support is already part of the released C# language.
The new interfaces and their methods were named distinctively with an Async
word so that they can be implemented on a class which already supported the synchronous IEnumerable
/IEnumerator
without conflict.
The IAsyncDisposable
is the analogue of the IDisposable
interface, but instead of a blocking or fire-and-forget disposal, the DisposeAsync
has to be called and awaited to make sure resources associated with an async sequence are all released before continuing.
public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}
Note that the design uses ValueTask
instead of Task
as there is likely enough cases where the method can run synchronously and thus the execution could avoid creating task objects and continuation handlers altogether.
The IAsyncEnumerator
is analogous with IEnumerator
that allows moving for the next item, awaiting this to happen and then getting the actual value if there is any.
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
ValueTask<bool> MoveNextAsync();
T Current { get; }
}
The IAsyncEnumerator
extends the IAsyncDisposable
and the consumer is required to await DisposeAsync
whenever it is done with the async enumerator.
There are a couple rules to be honored, which will be quite important later when coordination is required:
-
MoveNextAsync
must be called non-concurrently and non-overlappingly. This means that one has to wait for the method's returned task to signal true before calling it again. When used with async/await, this is guaranteed by the compiler generated state machine. Otherwise, call to this method must be serialized externally. -
Current
is only valid if the task evaluates to true after aMoveNextAsync
. It is invalid to read it before the firstMoveNextAsync
call, after a false task or after the call toDisposeAsync
. -
DisposeAsync
cannot be called concurrently withMoveNextAsync
but only when the next task evaluates to true/false/failure. In addition, it has to be awaited and is expected to return only when the whole chain above it has been disposed as well in a cascading fashion. - The methods should not throw exceptions themselves but they can return a failed/failing task.
Unfortunately, DisposeAsync
does not serve as an anytime cancellation like IDisposable.Dispose()
in Rx.NET and since it must not run concurrently with MoveNextAsync
, we have to serialize their execution externally in case we have something that an async-await loop could handle.
Finally the IAsyncEnumerable
is the equivalent of IEnumerable
with a single method to get the IAsyncEnumerator
. This, in theory, allows lazy and deferred async sequences which can be run as many times as necessary and even concurrently with each other. Only interacting with the IAsyncEnumerator
has to be serialized via async/await or external means.
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator();
}
This method has to return the async enumerator synchronously and not throw any exception.
Unfortunately, this is the only level where eager cancellation support can be injected, but unlike the Rx.NET scheme, it can't be made known to the downstream consumers within the flow. One has to keep injecting CancellationToken
s into such operators.
There is an additional problem with such injections. One can't inject different CancellationToken
s into different realizations of GetAsyncEnumerator()
s: they have to live on the IAsyncEnumerable
level and cancelling a token will cancel for all running async enumerators. Usually one would like to correlate cancellation with running async enumerators (which one gets from IObservable.Subscribe
quite clearly) so only some runs could be cancelled.
The solution is most likely to defer the creation of the entire IAsyncEnumerable
itself so cancellation can be dedicated at the point when an async enumerator is requested by some consumer.
Async sources are IAsyncEnumerable
which are some kind of generators or bridges to non-async sequences. Perhaps the simples generator is the Range
operator that generates numbers on demand.
At this moment, there is no compiler support for writing an IAsyncEnumerable
in the style of a IEnumerator
: there is no yield return
or yield break
support, therefore, we'll have to write a bit of a boilerplate:
public sealed class Range : IAsyncEnumerable<int>
{
readonly int start;
readonly int end;
public Range(int start, int end)
{
this.start = start;
this.end = end;
}
public IAsyncEnumerator<int> GetAsyncEnumerator()
{
// TODO implement
}
}
The outer IAsyncEnumerator
usually hosts the readonly parameters of the sequence which then usually has to be passed to the async enumerator created:
public IAsyncEnumerator<int> GetAsyncEnumerator()
{
return new RangeEnumerator(start, end);
}
internal sealed class RangeEnumerator : IAsyncEnumerator<int>
{
readonly int end;
int index;
internal RangeEnumerator(int start, int end)
{
this.end = end;
this.index = start;
}
// TODO implement methods and properties
}
}
So far, nothing complicated. We need a mutable index
field to track where the range is currently at when the consumer requests the next value.
The first thing to implement is the Current
property:
public int Current { get; private set; }
Here, we run into a very common theme with async sequence state, hence the writable property: we have to store the value to be read by the consumer somewhere while the state could be prepared for retrieving the next value upon a call to MoveNextAsync
. In respect of our range operator, this should become apparent from the MoveNextAsync
implementation:
public ValueTask<bool> MoveNextAsync()
{
if (index == end)
{
return new ValueTask(false);
}
Current = index++;
return new ValueTask(true);
}
When we determine we haven't reached the end yet, we take the current index and save it into Current
, then move it forward by one so that the next invocation of MoveNextAsync
can test it against the end as well. If we simply returned index
as the Current
's getter, we'd have the wrong number available!
(Remark: in this particular case, one could simply have index = start - 1
and then have index++
at the beginning of MoveNextAsync
, but watch out for the integer underflow. Generally, we will likely not work with numbers or objects where the previous value is so easily made available.)
Interestingly, we don't need the async
keyword on the method because we don't do anything that goes asynchronous: we decide what the next result would be and return a ValueTask
structure avoiding allocation. We could have written the method as follows:
public async ValueTask<bool> MoveNextAsync()
{
if (index == end)
{
return false;
}
Current = index++;
return true;
}
so that the resulting boolean value is automatically turned into the right task structure/object. Unfortunately, marking a method async
and not doing any await
nets an annoying warning currently. Using the explicit return of ValueTask
s keeps the code warning free (without disabling the warning of course).
Finally, we have to implement the DisposeAsync
method, although we don't have any resources or things to do upon disposal:
public ValueTask DisposeAsync()
{
Current = default;
return new ValueTask();
}
Usually, when working with objects instead of ints, one would make sure their references are not kept beyond the lifecycle of the async sequence. This means, for example, clearing the last Current
value and/or links in other class fields.
Consuming the Range
operator at the end is generally done this way:
public async void Main(string[] args)
{
var enumerator = new Range(1, 6).GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
Console.WriteLine(enumerator.Current);
}
Console.WriteLine("Done!");
}
finally
{
await enumerator.DisposeAsync();
}
}
(Remark: there are plans to support a less verbose foreach async
and using async
expressions where one can focus on the loop body and the rest is generated automatically.
If we have to interface with some external source which supports async/await, we have often a quite easy job as we can write an async
marked MoveNextAsync
method and simply await
values right there.
For example, let's write a source that generates items after some time delay:
internal sealed class Interval : IAsyncEnumerator<long>
{
readonly TimeSpan delay;
internal Interval(TimeSpan delay)
{
this.delay = delay;
}
long next;
public long Current { get; private set; }
public async ValueTask<bool> MoveNextAsync()
{
await Task.Delay(delay);
Current = next++;
return true;
}
public ValueTask DisposeAsync()
{
return new ValueTask();
}
}
(Remark: from now on, the enclosing IAsyncEnumerable
is omitted if it is trivial to write based on what parameters the enumerator uses.)
If one worked with Task.Delay
, it becomes apparent that we didn't add a CancellationToken
parameter. The reason for it is that it makes no sense in this style of async sequence design. One would think to add a CancellationTokenSource
field and have it Cancel
ed in DisposeAsync
, but the specification forbids calling DisposeAsync
while MoveNextAsync
is in progress. The only way for an unaware consumer is to stop calling MoveNextAsync
after the previous call completed.
We'll see later on that this non-concurrent call requirement of MoveNextAsync
and DisposeAsync
makes things convoluted and may even elongate the lifecycle of async sequences in an undesirable manner.
So far, the example sources were of the pull kind: the consumer indicated when it wanted the next item and the async source generated it right there, on demand. Basically, we had a request-1 backpressure situation.
However, some sources (and intermediate operators) are required to run in a form of push: the value to be given to the consumer of the IAsyncEnumerator
may happen before the consumer has called MoveNextAsync
. Sometimes more than one value has to wait in this manner.
This is the traditional consumer-producer problem which is solved by using some blocking mechanisms: blocking queues or wait-notify pairs. Unfortunately, we are in the async/non-blocking word so these won't be good enough for us (and we don't have a runtime that magically saves and restores the call-stack upon blocking like Project Loom aims for).
Therefore, we need to work with a non-blocking and async friendly continuation mechanisms by using atomics (Interlocked
) and manual task completions (TaskCompletionSource
).
Currently, there are 3 kinds of such rendezvous situations to consider:
- "Ping-pong": where the producer and the consumer would wait for the other to be ready.
- "Unbounded-in" producer: where the producer runs with its own pace and the consumer will get items at its own pace (may be designed as lossy).
- "Prefetched" producer: the producer is allowed to generate up to a certain number of items and then pauses until the consumer has taken some/all items, after which the producer is resumed.
Practically, the "Ping-pong" case is equivalent to the "Prefetched" case where the number is 1. In backpressure terms, "Ping-pong" is request(1)
and "Prefetchedis
request(N)with
request(M)after
M` items have been consumed.
Tooling-vise, the simplest to implement is the "Ping-pong" case, but first, we need that async/non-blocking continuation scheme.
When a producer and consumer met exactly once, the solution would be quite straightforward:
readonly TaskCompletionSource<T> resume = new TaskCompletionSource<T>();
public void SendValue(T value)
{
resume.TrySetResult(value);
}
public async void ProcessValue() {
var value = await resume.Task;
// work with value
}
Unfortunately, this doesn't work when the exchange must happen more than once. For one, completing a TaskCompletionSource
makes if completed forever, thus when the next value is to be set, TrySetResult
won't overwrite the old one. Worse, the consumer side awaiting the TaskCompletionSource.Task
will immediately succeed and will keep getting that very first item over and over.
Therefore, we have to create a fresh TaskCompletionSource
once the previous was successfully awaited. However, there is a further complication: who should create that fresh instance, the consumer or the producer? Remember, they may run concurrently with each other so either one or the other may reach its respective line (resume.TrySetResult
vs await result.Task
) and they have to work with the same TaskCompletionSource
instance.
The solution has two parts: first make sure only one party can create a TaskCompletionSource
, then make sure only one party can renew it.
The first part can be done via atomics (so called Compare-and-Swap loop).
public static TaskCompletionSource<T> GetOrCreate<T>(ref TaskCompletionSource<T> field)
{
for (;;)
{
var current = Volatile.Read(ref field);
if (current != null)
{
return current;
}
var fresh = new TaskCompletionSource<T>();
if (Interlocked.CompareExchange(ref field, fresh, null) == null) {
return fresh;
}
}
}
If both producer and consumer call GetOrCreate
at the same time, this loop will make sure only one of them can create a new TaskCompletionSource
and both receive the same instance.
It is clear that once called, the method above will have a non-null TaskCompletionSource
in the target field
. However, once spent (TrySetResult
), it has to be removed (field
nulled out), but by who? In addition, what will prevent the producer from calling TrySetResult
again too early?
The solution: have notifications sent into both direction and each party is responsible for clearing the other party's TaskCompletionSource
(i.e., setting the field to null). Therefore, we need two TaskCompletionSource
fields:
TaskCompletionSource<bool> consumed;
TaskCompletionSource<bool> valueReady;
T value;
(Remark: generally, the state changes may be more involved than sending a single value of T
via TaskCompletionSource
, hence the use of the basic <bool>
typed and a dedicated value
field.)
Now the exchange can be organized as follows:
public async ValueTask Send(T value)
{
await GetOrCreate(ref consumed).Task;
consumed = null;
this.value = value;
GetOrCreate(ref valueReady).TrySetResult(true);
}
public async ValueTask<T> Receive()
{
GetOrCreate(ref consumed).TrySetResult(true);
await GetOrCreate(ref valueReady).Task;
valueReady = null;
return value;
}
The first rendezvous point is on the consumed
TaskCompletionSource
. The sender will wait for this to become completed. Once awaited, which can only happen if the consumer called TrySetResult
, it clears the consumed
field, sets the value and signals the valueReady
and returns. If the producer wanted to re-enter Send
, it would start waiting on now the fresh consumed
TaskCompletionSource
. We won't get any accidental fall-throughs because it was the prior call that nulled out the field and not the consumer side.
On the consumer side, the receiver signals the consumed
TaskCompletionSource
, unblocking a sender, then starts waiting for the valueReady
signal (set by the sender). Once awaited, the consumer clears the valueReady
field and returns the value
field. If the consumer is to receive again, it won't accidentally fall-through because valueReady
is cleared by the consumer in the previous call.
One property of this setup is that the sender has to wait for the consumer the first time it wants to send an item. Sometimes this is not desirable, and the sender should be allowed to set the first item without going await
. For such setup, we have to start out with an already completed TaskCompletionSource
in consumed
and have the Receive
do the signalling to consumed
be the last action:
TaskCompletionSource<bool> consumed;
// in the constructor
consumed = new TaskCompletionSource<bool>();
consumed.TrySetResult(true);
public async ValueTask Send(T value)
{
await GetOrCreate(ref consumed).Task;
consumed = null;
this.value = value;
GetOrCreate(ref valueReady).TrySetResult(true);
}
public async ValueTask<T> Receive()
{
await GetOrCreate(ref valueReady).Task;
valueReady = null;
var v = value;
GetOrCreate(ref consumed).TrySetResult(true);
return v;
}
In this setup, both start out waitng for the respective resumption signal, but given that consumed
starts out set, Send
will be able to set the first value and unblock Receive
. Then the notification exchange is the same "Ping-pong" between the two.
Now that the first primitive is established, we can now implement our first non-trivial operator: an imperative asynchronous generator. In essence, it is a push-like operation where new items are sent to a consumer:
public async ValueTask Generate()
{
await Send(1);
await Send(3);
await Send(6);
// sequence ends when the task ends or crashes
}
Now the core value return logic can be implemented with MoveNextAsync
, but it loses the imperative look:
int state;
public T Current { get; private set; }
public async ValueTask<bool> MoveNextAsync()
{
if (state == 0) {
state = 1;
Current = 1;
return true;
}
if (state == 1) {
state = 2;
Current = 3;
return true;
}
if (state == 2) {
state = 3;
Current = 6;
return true;
}
return false;
}
Instead, we have two options: an abstract IAsyncEnumerator
with the Send
mechanism or creating a IAsyncEnumerable
which can work with an async function.
public static IAsyncEnumerator<T> Generate<T>(Func<IAsyncEmitter<T>, Task> generator);
Generate(async emitter => {
await emitter.Send(1);
await emitter.Send(3);
await emitter.Send(6);
// task ends
});
Here in order to communicate with an actual consumer of the IAsyncEnumerator
, we need a mediator interface:
public interface IAsyncEmitter<in T>
{
ValueTask Send(T item);
}
This ensures that the generator
function will be invoked for each GetAsyncEnumerator
and thus each of them will have their personal generator logic running. In this setup, the producer side is the Send
method and the consumer side is the MoveNextAsync
method.
sealed class GenerateEnumerator : IAsyncEnumerator<T>, IAsyncEmitter<T>
{
TaskCompletionSource<bool> consumed;
TaskCompletionSource<bool> valueReady;
T value;
public T Current { get; private set; }
internal GenerateEnumerator()
{
this.consumed = new TaskCompletionSource<bool>();
this.consumed.TrySetResult(true);
}
public async ValueTask<bool> MoveNextAsync()
{
await GetOrCreate(ref valueReady).Task;
valueReady = null;
Current = value;
GetOrCreate(ref consumed).TrySetResult(true);
return true;
}
public async ValueTask Send(T item)
{
await GetOrCreate(ref consumed).Task;
consumed = null;
value = item;
GetOrCreate(ref valueReady).TrySetResult(true);
}
public ValueTask DisposeAsync()
{
return new ValueTask();
}
}
This time, the GetAsyncEnumerator
has an extra job to perform by calling the generator function:
readonly Func<IAsyncEmitter<T>, Task> generator;
// ...
public IAsyncEnumerator<T> GetAsyncEnumerator()
{
var enumerator = new GenerateEnumerator();
generator(enumerator);
return enumerator;
}
When run, this async source works but then never completes, even though the user-provided task ended. We need to detect when the task ends and then signal the MoveNextAsync
there won't be further items.
Unfortunately, we can't use await generator(enumerator)
to wait for the user-provided task to end because it would prevent us from returning the enumerator, thus the consumer would never get a hold of it, call MoveNextAsync
and the whole setup livelocks. Besides, GetAsyncEnumerator
is a non-async method thus await
is not allowed.
This is the point where we have to use the more involved task-continuation mechanisms of C#: Task.ContinueWith()
. When the task ends, we continue with a lambda that notifies the MoveNextAsync
about the end of items. Luckily, we can reuse valueReady
by sending true
for items (as done in Send
) and send false
when the sequence is stopped.
For these, we have to modify both GetAsyncEnumerator
and MoveNextAsync
a bit:
public IAsyncEnumerator<T> GetAsyncEnumerator()
{
var enumerator = new GetAsyncEnumerator();
generator(enumerator)
.ContinueWith(async t => Stop(t));
return enumerator;
}
public async ValueTask<bool> MoveNextAsync()
{
var hasValue = await GetOrCreate(ref valueReady).Task;
valueReady = null;
if (hasValue)
{
Current = value;
GetOrCreate(ref consumed).TrySetResult(true);
return true;
}
return false;
}
internal async void Stop(Task t)
{
await GetOrCreate(ref consumed).Task;
consumed = null;
if (t.IsFaulted)
{
GetOrCreate(ref valueReady).TrySetException(t.Exception);
}
else
{
GetOrCreate(ref valueReady).TrySetResult(false);
}
}
The code in Stop
will signal via consumed
: false
for normal completion and TrySetException
when the user-provided task crashed. We still have to wait for consumed
so that the last sent value is properly consumed before signaling the terminal state.
Now we can detect when the task ends, but what about the case when the consumer of the IAsyncEnumerable
wants to stop after some items and calls DisposeAsync
. Unfortunately, the user-provided task has currently no way of knowing it should stop. We have to communicate this intent to it, for example, by providing a flag on the IAsyncEmitter
:
interface IAsyncEmitter<in T>
{
ValueTask Send(T item);
bool IsCancellationRequested { get; }
}
A nice task would then check this flag:
Generate(async emitter =>
{
if (emitter.IsCancellationRequested) return;
emitter.Send(1);
if (emitter.IsCancellationRequested) return;
emitter.Send(3);
// etc.
});
Unfortunately, this is not enough. Since the consumer and the user-provided task may run concurrently, what if the task misses the check and gets stuck in Send
because the other side will never ever set consumed
again?
To solve this issue, we have to introduce checks in Send
as well as make sure DisposeAsync
sets consumed
to unblock as Send
which then quits too.
sealed class GenerateEnumerator: IAsyncEnumerator<T>, IAsyncEmitter<T>
{
volatile bool cancelled;
public bool IsCancellationRequested => cancelled; // volatile property...
public ValueTask DisposeAsync()
{
cancelled = true;
GetOrCreate(ref consumed).TrySetResult(true);
return new ValueTask();
}
public async ValueTask Send(T item)
{
if (cancelled) return;
await GetOrCreate(ref consumed).Task;
consumed = null;
if (cancelled) return;
value = item;
GetOrCreate(ref valueReady).TrySetResult(true);
}
}
(Remark: since the DisposeAsync
has to be called after MoveNextAsync
has completed, there is no risk of setting the consumed
again get lost as we don't wait for valueReady
this time.)
So far we have covered sending an item asynchronously, signaling a termination and recognizing cancellations. However, we have omitted one part of the requirements of the IAsyncEnumerator
API: DisposeAsync
should wait until all resources are released. In our case, we don't know if the user-provided task has resources, or if it actually responds to IsCancellationRequested
thus we'd have to wait for its task to end before letting DisposeAsync
continue.
To resolve this requirement, we can introduce a plain TaskCompletionSource
to let the IAsyncEnumerator.DisposeAsync
wait for the termination as well. Unfortunately, we can't reuse valueReady
because an in-progress Send
may trigger it before the task ends. We have to signal the termination ourselves in Stop
:
TaskCompletionSource<bool> taskDone;
internal async void Stop(Task t)
{
await GetOrCreate(ref consumed).Task;
consumed = null;
if (t.IsFaulted)
{
GetOrCreate(ref valueReady).TrySetException(t.Exception);
}
else
{
GetOrCreate(ref valueReady).TrySetResult(false);
}
GetOrCreate(ref taskDone).TrySetResult(true);
}
public ValueTask DisposeAsync()
{
cancelled = true;
GetOrCreate(ref consumed).TrySetResult(true);
return new ValueTask(GetOrCreate(ref taskDone).Task);
}
Now our generator is feature complete.
So far, the implementations were relatively simple. Keep awaiting if necessary, save the value and return a true task.