Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered producer thread safe #72

Merged

Conversation

accelerated
Copy link
Contributor

The following behavior for thread safety was introduced in the buffered producer class:

  • Client application can produce or add messages to the buffer from multiple threads.
  • Client application can flush the buffered messages from any thread.
  • Producing and flushing can occur at the same time. When flush is called, only those messages currently buffered at the time of the call will be flushed.
  • clear can be called from any thread, however no producing or flushing can happen while it's being called.

Design considerations:

Single mutex (worse performance)

Coarse lock, flush can block all producer threads for long time.

void BufferedProducer<BufferType>::clear() {
    // Can block for long time if flush has a large queue to purge!!
    std::lock_guard<std::mutex> require(exclusive_access_);
    QueueType tmp;
    std::swap(tmp, messages_);
}

void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {
    // Can block for long time if flush has a large queue to purge!!
    std::lock_guard<std::mutex> require(exclusive_access_); 
    messages_.push(std::move(builder));
}

void BufferedProducer<BufferType>::flush() {
    {
        std::lock_guard<std::mutex> require(exclusive_access_); 
        while (!messages_.empty()) { 
            produce_message(messages_.front());
            messages_.pop();
        }
    }
    wait_for_acks();
}

Single mutex (slightly better performance)

Issue is that flush can run continuously if producers produce at the same rate or higher
as messages_.empty() will never be true.

void BufferedProducer<BufferType>::flush() {
    while (!messages_.empty()) { 
        // If producers keep adding messages to buffer, this loop will run for long time
        std::lock_guard<std::mutex> require(exclusive_access_); 
        if (!messages_.empty()) { // check in case clear was called
            produce_message(messages_.front());
            messages_.pop();
        }
    }
    wait_for_acks();
}

Final version (improved performance)

Capture queue size before flush starts. Prevents running flush endlessly but produce_message() is still a long
running operation since it may call poll(). Producers cannot enqueue during this period. If rdkafka queue is full, the poll will be called several times.

void BufferedProducer<BufferType>::flush() {
    {
        size_t num_messages = messages_.size();
        while (num_messages--) { 
            std::lock_guard<std::mutex> require(exclusive_access_); 
            if (messages_.empty()) {
            	break; //in case clear was called
            }
            produce_message(messages_.front()); //potentially long running
            messages_.pop();
        }
    }
    wait_for_acks();
}

Best version (fastest but uses 2 mutexes)

Most performant. Producer only takes mutex while popping or pushing. Clear is blocked with shared mutex
(not available in C++11 hence use of boost). I decided to abandon this version as it imposes a runtime
dependecy on Boost::Thread and Boost::System libs. If you're ok with this, then I can back-out my last commit. Perhaps since you're already using boost::program_options in 'examples' it's not so bad.

void BufferedProducer<BufferType>::clear() {
    boost::unique_lock<boost::shared_mutex> restrict(shared_access_);
    QueueType tmp;
    std::swap(tmp, messages_);
}

void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {
    boost::shared_lock<boost::shared_mutex> grant(shared_access_);
    boost::lock_guard<boost::mutex> require(exclusive_access_);
    messages_.push(std::move(builder));
}

void BufferedProducer<BufferType>::flush() {
    {
        boost::shared_lock<boost::shared_mutex> grant(shared_access_);
        size_t num_messages = messages_.size();
        while (num_messages--) {
            produce_message(messages_.front());
            boost::lock_guard<boost::mutex> require(exclusive_access_);
            messages_.pop();
        }
    }
    wait_for_acks();
}

}

template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
while (!messages_.empty()) {
size_t num_messages = messages_.size();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a deeper look later and I don't sure if this would cause some issues with the expected ack number but I normally handle these sort of blocking operations like this:

// We'll lock and move the messages out. This makes pending_messages
// have all current messages and messages_ is left empty
QueueType pending_messages = [&]() {
    lock_guard<mutex> _(mutex_);
    return move(messages_);
}();
while (!pending_messages.empty()) {
    // same loop, no lock
}

This way you have no potentially blocking or long operations inside the critical section while achieving the same. Do you think this would work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though about a similar swap approach with a temp queue (like you use in clear) but from my experience it's more hassle as now you have to deal with potentially two queues. What if after the swap the produce fails? Say the failure callback the user has provided returns false, then you have to stop retrying. Now you have 2 queues. Do you merge them back? Next time you flush again you risk having to flush 2 queues...not worth it IMHO. I've done this in the past and the management overhead is too much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, note there's only a single ack counter in my revision, since producers and the flush threads are both operating in parallel. The test cases I provided are pretty reliable.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If production fails and the callback returns false then you just discard the message, right? That's how it works right now. If a message keeps failing for a particular reason (say the message size is larger than the max allowed in the config for that topic), then you want to discard it, otherwise you'll be stuck forever.

Since you're using a single counter that gets incremented on produce and decremented on ack (both on fail and success), the counter will have the right value. I think there's no need to handle 2 queues, unless I'm missing something,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the issue is not on the on_delivery_report side, it's on the produce_message side. Currently there's a possibility that produce() throws (if error != RD_KAFKA_RESP_ERR__QUEUE_FULL) in which case your message is never popped from the queue. At the same time flush also exists so now you're stuck with 2 queues when the application recovers from the exception.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point and actually, on_delivery_report shouldn't throw. That's inside rdkafka's stack and that will probably break things if it happens. Not sure if we want to allow exceptions to even happen within the callback context, that sounds very dangerous.

Copy link
Contributor Author

@accelerated accelerated May 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this brings another good point, the failure callback is optional, if you don't supply it you will be stuck forever in case of an error. There should be some kind of failsafe mechanism OR force failure callback as part of the constructor. And yes, the exception has to be caught inside on_delivery_report

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I think probably it makes sense to force the callback to be set. I don't think there's a sane strategy otherwise, given we can't really keep track of messages between produce calls so from cppkafka's perspective it's not easy to know how many times a single message has been re-produced so as to have some sort of limit on how many times this can happen.

size_t messages_acked_{0};
std::atomic_ulong expected_acks_{0};
std::atomic_ullong total_messages_acked_{0};
std::atomic_ushort rollover_counter_{0};
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really necessary? I understand that if you want to be "correct" this is the right way to go. If you explicitly use something like atomic<uint64_t> for the total messages acked member, you know there's absolutely no way you would ever produce that amount of messages and this would remove the need to have this counter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove it. I guess I was trying to be correct rather than practical :)

@accelerated accelerated force-pushed the buffered_producer_thread_safe branch from adf0c57 to 4c1d107 Compare May 25, 2018 12:28
@accelerated accelerated force-pushed the buffered_producer_thread_safe branch from eaa96cb to ef5ed27 Compare May 25, 2018 15:57
while (!messages_.empty()) {
produce_message(messages_.front());
messages_.pop();
QueueType flush_queue; // flush from temporary queue
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there should be a loop here that makes sure that when flush exits, any previously added messages are indeed flushed. Right now if you flush and a message fails, it will be left in the queue and only flushed again after flush is called again. My expectation is that after calling flush, I actually flushed the writer and everything in it is written.

e.g.

while (true) {
    QueueType flush_queue;
    // {  lock & swap }
    if (flush_queue.empty()) {
        return;
    }
    // your loop...
    // wait for acks...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the kind of thing i am trying to avoid, if you have this ongoing flush, while other threads (including delivery failures) keep enqueuing at the same or higher rate, you have now created an open faucet situation, where nothing is buffered. My expectation is that 1) flush is called periodically so re-enqueued messages will eventually be delivered 2) flush only flushes what's in the buffer at the time of the call and not behaving like an ongoing flush.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that's fair. I guess if you want to ensure everything is flushed you as a user should loop calling flush while there's still messages pending.

private:
using QueueType = std::queue<Builder>;
using QueueType = std::list<Builder>;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think deque is a better choice here. list is considered to be a quite terrible container and should only be used when you really need it. deque has fast insertion and deletion on its head/tail so it fits nicely here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sure, good point. List implementations vary though...it's like size () who used to have linear performance until c++17, which i try avoiding at all costs.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, O(1) size() was actually enforced in C++11. gcc was still using O(N) until a while later.

Copy link
Owner

@mfontanini mfontanini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the changes, just a couple of minor comments

@accelerated
Copy link
Contributor Author

Ok, there is one issue here and it's related to the swap of the flush queue...the problem is that it becomes hard for the application to determine when it's safe to release the buffers. Before you could query the queue size and if empty, it was safe. Now the queue can be empty and still have pending messages. Also checking for messages non-acked is not safe either as the counter is incremented inside produce. So immediately after flush is called, the queue is empty and also the messages in flight are 0. However checking for both messages in flight and queue size gives a good enough guarantee, especially if called on the same thread as flush, and right after flush returns. I want to make a is_safe_to_delete () function which would check just that. Thoughts?

A second smaller issue is that if multiple threads call flush, each wait_for_ack will wait for the total amount of sent messages, not just the ones sent by this particular call to flush, as only a single counter is used.

So in the light of this, we can probably document that flush should ideally be called from a single thread.

@accelerated
Copy link
Contributor Author

Oh and just to be sure, you did notice the change about the payload strategy? I am not allowing rdkakfa to make a copy anymore since i need to re-enqueue the message. Just wanna make sure you are aware.

@mfontanini
Copy link
Owner

Where would this is_safe_to_delete function be? What if we add a callback, just like the one for failures, but for successful messages produced. This way you can be notified that the message was sent and delete whatever buffers you had (or ignore if you were copying).

Regarding the wait_for_acks remark, yes, that's how it would work now but I think it's fine. That's thread safe so if there's N threads waiting for that, it should work eventually.

Regarding the PASSTHROUGH flag, is this right? I think this has a memory leak now. e.g.

  • Add a message (this gets converted to a Builder that stores payloads as strings inside).
  • Then you flush, this pops the message from the queue, meaning those string payloads are deleted.
  • If you get the message back on the delivery report callback, the buffer/key pointers will still point to those in the strings that belonged to the message but those are free'd now, so you're effectively pointing at garbage.

Is there an issue with enforcing COPY semantics on the producer? I think that should be safe. And sure, it's not the most efficient but it should work.

key_(Buffer(message.get_key().get_data(), message.get_key().get_size())),
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
user_data_(message.get_user_data())
{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the timestamp should be here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted

@accelerated
Copy link
Contributor Author

Yes i think there's an issue with the COPY because the pointers captured in the Message object when the delivery callback gets called are the copies of the original payload, and rdkafka probably frees them right after the callback completes. So whatever gets captured in the new Builder will be junk. That's why i believe with the PASSTHROUGH there are no copies made, hence it's still the original application payloads. No leaks.
In reference to adding a new callback for successes, there is no need, my previous PR calls the user specified callback before checking for errors.

@mfontanini
Copy link
Owner

The Builder is not a BasicBuilder<Buffer>, this is a BasicBuilder<string> meaning it owns its payloads. So the sequence with copy is:

  • add_message is called, this creates a Builder that owns the key/payload.
  • flush is called, this writes the message and pops the builder. The builder is gone but rdkafka copied the payloads because say COPY was used, hence it's fine because key/payload have now been copied by rdkafka into the message.
  • delivery report callback is called and let's assume it failed, now the message argument contains the pointer rdkafka provided which are deep copies of the original ones. We add the message back into the queue. In order to do that, we create a Builder (which again, copies and owns the key/payload into its internal strings) and store it in the queue, this is safe.
  • the delivery report callback finishes, rdkafka frees the message and the payload but this is fine because those were copies.

Now if you don't use the copy flag:

  • add_message is called, this creates a Builder that owns the key/payload.
  • flush is called, this writes the message and pops the builder. The builder is gone and rdkafka didn't the payloads because we said PASSTHROUGH. Now the original message is gone and the message we produced (the one somewhere inside rdkafka's internals) has pointers to the key/payload the builder had, which have now been cleared.
  • delivery report callback is called and let's assume it failed, now the message argument contains the pointer rdkafka provided which still points to the already deleted strings inside the original builder. We add the message back into the queue. In order to do that, we create a Builder (which again, copies and owns the key/payload into its internal strings) but the pointer we use are the ones rdkafka provides which still point to garbage. We store the Builder back into the queue and we're storing garbage by now.

@accelerated
Copy link
Contributor Author

Yeah this makes sense...for some reason i made the whole design assumption on Buffer container not string. Yes, then the COPY payload must be used but that means you can end up with at most 3 copies from the moment you produce, another inside rdkafka and then a third inside the callback. And once again 2 copies for each subsequent retry/fail cycle. I feel that for an application which tries to be as performant as possible, it would be nice to have the Buffer container option with full passthrough. Do you think we could templetize the BufferedProducer?

@mfontanini
Copy link
Owner

So while I wrote that, I forgot BufferedProducer was already a template class and the buffer type is a template parameter. But yeah, my overall thinking was towards using BufferedProducer<something that owns its payload>.

Using Buffer as the template argument is very risky, as you could easily end up blowing your foot out. Especially since you're "persistently" (at least for a while) storing something that doesn't own its contents and you're going away until someone decides to flush the buffer. Right now you can achieve the behavior you want by just using BufferedProducer<Buffer> and setting PASSTHROUGH policy on the producer, right?

@accelerated
Copy link
Contributor Author

So while I wrote that, I forgot BufferedProducer was already a template class and the buffer type is a template parameter

Yeah me too :).

Ok so I made last set of changes based on your comments. I also removed the user-supplied delivery callback and instead replaced it by a separate success delivery callback so now it's more symmetric with the failure one. Lmk what you think of this last update.

@accelerated accelerated force-pushed the buffered_producer_thread_safe branch from 9881efb to 9e92f22 Compare May 26, 2018 14:57
@accelerated accelerated force-pushed the buffered_producer_thread_safe branch from c902b97 to 6e0247f Compare May 27, 2018 15:30
*
* \return The number of outstanding flush operations.
*/
size_t get_flushes_in_progress() const;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a worth having metric? You can't really make a decision based on this as by the time the function returns, any in-progress flush may have already finished. I imagine you added this because you found a use case for it. If so, do you mind sharing it?

Copy link
Contributor Author

@accelerated accelerated May 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the idea is well explained in the header of the class. By itself it's not too useful, you have to pair it with get_buffer_size() which if both return 0 is a good indication that everything has been pushed to the broker. The use case is that I'm writing a library on top of yours with a much higher level of abstraction (i.e. imagine dozens of producers and consumers maybe more and very generic so it's usable on multiple different projects), and I'm taking in messages in native format, serialize them (via transformer callbacks the user registers) and buffer them in chunks because I'm pulling them from application queues, and then flush them so I need a deterministic way of knowing when to delete the original (non-serialized) data - held in unique ptrs. If it turns out that it's not a reliable process or if I change my design later on, I will prob have some sort of correlation hash map based on the message handle and use that along with the success callback to delete each message as it's acked. But I kinda want to avoid all the look-ups.
In any case I'm using stack based allocators for the native message types and then i want zero copy until the message is delivered. The application has to be as low-latency as possible. Same thing in reverse will happen on the consumer side i.e. message is deserialized once via another transformer callback then moved out via unique ptrs, so my library relinquishes all ownership.

Otherwise, it may be good to wait for all flushes to end before shutting down the producer for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to answer:

You can't really make a decision based on this as by the time the function returns, any in-progress flush may have already finished

Exactly and that's fine. I'm checking when flushes get to 0, I don't care about how many are at any point in time.

@mfontanini mfontanini merged commit 429ec92 into mfontanini:master May 29, 2018
@accelerated accelerated deleted the buffered_producer_thread_safe branch May 29, 2018 13:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants