Skip to content

API contract of Sink isn't well suited for thread-safe and Clone implementations #1312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Matthias247 opened this issue Nov 4, 2018 · 3 comments

Comments

@Matthias247
Copy link
Contributor

The API contract of Sink requires callers to call poll_ready before sending an item via start_send.

This is fine for implementations where the implementation is only used from a single thread.
However if an implementation is used from multiple threads, the API exposes a race condition: Both calls to poll_ready might return ready - but only a single thread might be able to send an item. The second call to start_send might return an error.

This could be worked around by resending the item in all cases until submitting the item succeeds. Something like this (pseudo-code):

while !submitted {
    await(sink.poll_ready()); // Not really, since poll_ready doesn't return a future
    if sink.start_send(item).is_ok() {
       submitted = true;
    }

However this doesn't work - since start_send will also consume the item in the failure case.

I am concerned a lot of implementations of Sink might get affected by this, unless they

  • implement infinite buffering internally
  • if they are clonable, increase the internal capacity per clone, and track ready status per clone
    The first solution is not desirable, the other one might not be followed by some implementations, where the Sender will just be a proxy object around a shared refcounted implementation.

I just tried to figure out what the mpsc channel in the repository does and whether it's affected by the described scenario. I think it might handle the Clone has correctly, because is stores some extra state in the Sender. I'm not fully sure about multithreaded usage. Issues might be avoided due to the fact that a mutable reference is required. However callers would need to use some kind of asynchronous mutex around the whole transaction.

I see the following ways to improve on this:

  1. Let start_send() return the item if sending doesn't succeed. Then retry loops as described above can be implemented.

However that means unless the user of the Sink isn't 100% that the Sink isn't a clone of another one that is somewhere else, they would always need to use the retry loop. Which would likely be not very discoverable by new users.

If users need to use a retry loop, the justification that Sink allows for lazy allocation of the sending data would diminish. Since it might always need to be produced before sending, in case it can't be sent immediately. In that case it might make sense to

  1. redefine Sink to contain a method which returns a Future that is fulfilled when the item is sent

E.g. to something like send_item(&self, item: Self::SinkItem) -> impl Future.
The returned future could either return () for signalling success, or the item for allowing to return the memory to the caller when no longer needed.

3.. Discourage Sinks from being thread-safe or Clone, due to the complex usage and implementation requirements.

@carllerche
Copy link
Member

This might be related: #984 (comment)

@tikue
Copy link
Contributor

tikue commented Nov 9, 2018

@Matthias247 start_send returning the item is how it works in Futures 0.1. In my experience it was burdensome. Consider the case where you're acting as a proxy, reading off one channel and writing to another. You end up having to have a temporary buffer at every level, which means you can't recv() until you've checked that you have space to buffer if start_send() fails.

  1. Check if buffer space is reserved.
  • false: start_send(buffered_item)
    • Ok(()) -> proceed to 2
    • Err(rejected) -> buffer(rejected); return NotReady
  • true: proceed to 2.
  1. item <- recv()
  2. start_send(item)
  • Ok(()) -> return
  • Err(rejected) -> buffer(rejected); return NotReady

Compare that to the poll_ready approach.

  1. poll_ready()?
  2. item <- recv()
  3. start_send(item)?

@cramertj
Copy link
Member

This hasn't seen any discussion in quite some time. I'm closing for now, but @Matthias247 if you have suggestions for improving the existing traits or documentation, feel free to comment here or send a PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants