-
Notifications
You must be signed in to change notification settings - Fork 766
Bug in Merge for USE_FAIR_AND_CHEAPER_MERGE #2134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Thanks for the offer of help. Presumably you're building your own version of Ix? As far as I can tell, that I have to admit I've not looked at this part of the code at all so far. I'm not familiar with the problems this was meant to address. We're trying to bring in a practice of quantifying the effects of performance-related changes, as we did in the changes to fix #2005 so if we were going to do this, I'd want to begin by adding (or finding existing) benchmarks that characterise the performance of the relevant code under the anticipated workloads, and then run those benchmarks before and after the change. Is that something you'd be prepared to do as part of opening up a PR? If not, do you have specific scenarios in mind that this change would address? |
Hi,
I don't really want to speak for the original authors here of course, but I think it boils down to two things: Fairness
In the current implementation, the merged observable would pretty much only ever yield items from its left source. Efficiency It's probably worth considering two things though:
With that in mind, happy to hear your thoughts. If there's still interest in this, I can try coming up with a PR & benchmarks. |
I started looking into this a little further, and I now think there may be an additional problem. The code includes this line: var whenAny = TaskExt.WhenAny(moveNextTasks); which is a
If I've understood his argument correctly, there are two critical issues here:
Now it does look like The fact that you can't unregister a completion callback doesn't need to be a problem because this design for So the argument Stephen Toub presents for why you can't have a However, this is the kind of code that's extremely easy to get wrong. The early shutdown logic (in reactive/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Merge.cs Lines 137 to 138 in 4f341fc
Although it is not literally using That seems like it's probably a bug, although I've been unable to work out from the documentation whether the rule against "awaiting it twice" means you're not allowed to call either
does seem to say that adding a second continuation is going to be bad. I've not yet found the exact bit of the documentation that specifically forbids multiple callbacks in particular (rather than multiple awaits), but few people understand this with as much depth as @stephentoub so if he thinks it's something you shouldn't do, I'd say we shouldn't do it... So that implies that the In your latest reply, you said you're happy to help:
I think you may be the first person to raise this since that experimental code was added over 5 years ago, so I'd have to say that the level of interest is probably low. Then again, there are multiple
So on that basis, if you are happy to create a PR with benchmarks as the first step, that would be awesome. Equally, if you decide on the basis that you're apparently the first person to ask about this in over half a decade, that this might not be worth your time, that would be wholly understandable! If you do decide to proceed, be aware that right now there are a load of problems building Ix.NET on .NET SDK 8.0. #2135 addresses these, but as I write this it is still open. We're hoping to merge it in the next day or two, but if you're looking at doing any dev work at all on Ix.NET, I'd recommend waiting until that's done. |
Thanks for having a look at this. I absolutely agree with all of what you said, including the ambiguities of "don't await twice" and what that exactly boils down to in terms of awaiters, and the apparent dodgyness of the current implementation adding multiple continuations. The current sketch I'd have in mind is to ensure the Merge operator only ever awaits the WhenAny object, including in its finally block - and never one of the MoveNext ValueTasks directly. I believe the finally block just needs to ensure that no MoveNext is in-flight anymore, so awaiting WhenAny until all tasks are done should be enough (though bit of extra logic might be needed if wanting to maintain the current deterministic order of DisposeAsync calls). Definitely agree this would ultimately need a very thorough review!
That probably works in my favor, because it'll likely take me a little while to get around to this :) |
Hi, apologies for the delay here. I've been running some prod apps for a while now with a customized version of a Merge operator. It ended up looking a bit different than "just" the existing one adjusted for ValueTasks, so I thought I'd channel back that feedback first before thinking about what, if any of these extensions would be interesting to have (or if Merging async enumerables is inherently so "ambiguous" that maybe a standard implementation isn't even too helpful?). Based on the fair merge strategy, here were two major pain points I ran into: CancellationToken behaviour
We'll inevitably hang as we're trying to break out of the iteration.. why? Because Merge() gets the enumerators from seq1 and seq2, and calls MoveNext() on both of them, and per contract it needs to make sure both calls are fully awaited before it can dispose the merged sequence. In this tiny example, you could awkwardly try to pass down some CancellationToken to the merged sequence that you cancel as you're about to break out of the loop, but really that doesn't help much still .. for example, what if seq1 throws an exception? Again, the merged sequence has to wait for termination of the second MoveNext call before it surfaces the exception to the merged sequence - so at this point, we don't even have a nice way of identifying when we "should" attempt to cancel the token from the outside. In the 4-line example above, this might be easy or at least possible to spot, but for more complex sequences, this becomes a bit of a hell-ish source of potential (extremely hard to debug) bugs where things just grind to a halt. So personally, I've changed the implementation so that the Merge operator itself creates a linked CT, which it internally cancels A) when the merged sequence is being disposed, and/or B) When any of the source streams throws. Enumeration speed Let's say you merge 3 sequences. Say those represent streams over some fast moving source with in-built backpressure handling.
What are you going to print to the console? Surely a lot of ~5 second long timestamps, right? Alas, no - individual Poll-streams would be moved forward every ~15 seconds instead of every ~5 seconds. The enumeration speed here decreases with the number of streams passed into the Merge function. Why's that? Because each iteration of the merged sequence calls MoveNext on at most one of the underlying streams. So, one source stream is moved forward -> caller waits 5 seconds -> next stream is moved forward -> and so on. The caller pretty much loses control of the real enumeration speed entirely here. In my case, I've solved/circumventented this by letting Merge return a collection of completed items. So, on iterating, the merge operator drains and publishes the next element from ALL available source streams to the consumer, rather than exactly one. In the dummy example above, "update" would therefore typically be a collection of 3 TimeSpans, and moving forward the merged enumerator would typically move forward all three source streams, meaning you'd typically get TimeSpans of ~5 seconds from each. Both of these things are potentially rather opinionated choices, so I don't know how well they'd work for a standard library. |
Uh oh!
There was an error while loading. Please reload this page.
I think the Merge implementation when using USE_FAIR_AND_CHEAPER_MERGE may be incorrect.
Consider this (fairly bad) unit test; this does not throw an AggregateException with an inner "test"-Exception, but rather the following:
I think that is because this implementation may await a given ValueTask up to three times - and worse so, the latter awaits may be on an already completed ValueTask, which to the best of my knowledge is not allowed / causes undefined behaviour (given the backing state machine in the async case may be recycled for other ValueTasks after the first await).
As a simple example, consider the following chain of events. For simplicity, say the input argument to
Merge
is a array that contains only a single IAsyncEnumerable.MoveNext()
is called, and stored inmoveNextTasks
IAsyncEnumerable
now throws an exceptionIAsyncEnumerable
completed by throwing its exception)MoveNext
-ValueTask again. This is the very same value task that was already awaited by theWhenAny
construct - therefore, we now await an already completed ValueTask for the second time.If there's any interest in doing so, I'd be happy to open up a PR and either attempt to fix this, or possibly remove this implementation altogether - depending on whether there's any interest of switching on USE_FAIR_AND_CHEAPER_MERGE for NuGet releases in the foreseeable future?
The text was updated successfully, but these errors were encountered: