Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a high-priority queue to BufferedProducer to avoid message re-ordering #153

Merged
merged 4 commits into from
Jan 15, 2019

Conversation

demin80
Copy link
Contributor

@demin80 demin80 commented Jan 7, 2019

Currently, when a BufferedProducer is used, and produced messages are failed to be sent (async_produce) or failed to be delivered (on_delivery_report), then do_add_message will enqueue the messages to the messsages_ queue with the MessagePriority::High priority. However, the existing code enqueues them to the front of the queue via emplace_front. This can cause reversing the original order of messages: e.g., failing to send messages M1, M2, M3 will result in having M3, M2, M1 in messages_. So if the next attempt to flush the queue succeeds, the messages will be sent in the reverse order w.r.t. the original. This pull request adds a high-priority message queue to BufferedProducer: high-priority messages are always enqueued to the back of the queue, and this queue is always flushed before the low-priority message queue.

@accelerated
Copy link
Contributor

This is definitely an issue with the current code. The proposed changes look good to me...

Copy link
Owner

@mfontanini mfontanini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments here, looks good overall. Thanks for the PR!

std::lock_guard<std::mutex> lock(mutex_);
messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end()));
}
if (!!hi_pri_flush_queue.empty()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a double ! here, this is actually being executed if the queue is empty.

}
else {
messages_.emplace_back(std::forward<BuilderType>(builder));
}
}
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {

if (priority == MessagePriority::Low && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment in the code on why this is being done? If I read this without knowing what priorities mean in this context, this is confusing/counter intuitive: "if you're adding a message with high priority, then don't flush. Only do that if it's low priority". A comment would clarify that.

@@ -530,6 +530,7 @@ class CPPKAFKA_API BufferedProducer {
// Members
Producer producer_;
QueueType messages_;
QueueType hi_pri_messages_;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of abbreviations. I'd rather this be "high_priority_messages_" rather than "hi_pri" which is not very self describing.

@@ -625,10 +627,16 @@ template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::async_flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with "hi_pri" here

std::swap(messages_, flush_queue);
}
while (!hi_pri_flush_queue.empty()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably create a private function async_produce(QueueType queue, bool throw_on_error) and call it twice to avoid code duplication. Otherwise at least create a local lambda and call it with each queue, that also works.

std::swap(messages_, flush_queue);
}
while (!hi_pri_flush_queue.empty()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment avoid code duplication here.

messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end()));
}
if (!!hi_pri_flush_queue.empty()) {
hi_pri_messages_.insert(hi_pri_messages_.begin(),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment avoid code duplication as well.

@@ -735,11 +769,13 @@ void BufferedProducer<BufferType, Allocator>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
QueueType tmp;
std::swap(tmp, messages_);
QueueType hi_pri_tmp;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"hi_pri" again here

@demin80
Copy link
Contributor Author

demin80 commented Jan 8, 2019

Thank you guys for your comments. I've updated the implementation and hopefully addressed all the suggestions received so far:

  1. Removed the code duplication via lambdas;
  2. Renamed the abbreviated hi_pri_messages_ to retry_messages_;
  3. Replaced MessagePriority with a boolean is_retry, as MessagePriority::High means "retry sending"
  4. Added the comment explaining why we're not flushing the queue when a retry message is added. Essentially, queue flushing involves locking the mutex, and it's not a good idea to lock the producer in an rdkafka callback (which is where replay messages may be added from).

Copy link
Owner

@mfontanini mfontanini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor style comment. @accelerated can you have a look at the new changes on this one?

std::swap(queue, flush_queue);
}
while (!flush_queue.empty()) {
sync_produce(flush_queue.front());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's 2 spaces rather than 4 on both this line and the one below

auto remaining = timeout;
auto start_time = std::chrono::high_resolution_clock::now();
do {
sync_produce(flush_queue.front());
flush_queue.pop_front();
if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this statement should be !(queue_flusher(retry_flush_queue) || queue_flusher(flush_queue)) because the way it's written now both flushers will be evaluated which means you will flush alternatively from each queue. The retry queue should be fully flushed before the main queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition statements "(!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)" and "!(queue_flusher(retry_flush_queue) || queue_flusher(flush_queue))" are equivalent.

In the former condition, if queue_flusher(retry_flush_queue) returns true, then the negation of it will evaluate to false, hence because of the short-circuit implementation, the second part of the condition queue_flusher(flush_queue) will not be evaluated because it's clear that the entire statement will evaluate to false regardless. So queue_flusher(flush_queue) will be evaluated if and only if queue_flusher(retry_flush_queue) returns false.

However, if you think that changing the way you suggested will improve the readability, I'm ready to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right, that makes sense. I somehow only thought of the false case.

auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue)->void
{
if (!src_queue.empty()) {
std::lock_guard<std::mutex> lock(mutex_);
Copy link
Contributor

@accelerated accelerated Jan 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[suggestion] Since the logic for flushing/re-enqueuing has now been made generic into lambdas, I wonder if it's not better to have 2 mutexes (one for each queue) and just pass it in the lambda as well as a parameter. There is no point in blocking operations on one queue when the other one is being accessed. Especially since the retry_queue_ is populated primarily from the rdkafka callback, you don't want to block it unnecessarily when the main queue is being used.
When flushing, the swap operation is very fast, so a single mutex doesn't hurt there...but the re-enqueuing is slow.

@@ -519,17 +518,18 @@ class CPPKAFKA_API BufferedProducer {
return nullptr;
}
template <typename BuilderType>
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
void do_add_message(BuilderType&& builder, bool is_retry, bool do_flush);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[suggestion for readability] I think that using too many bools in the function signature makes it hard to understand when you read code. What is do_add_message(builder, true, false)? or do_add_message(builder, false, false)? difficult to tell unless you know the parameters beforehand. I would be ideal to have enums and use something like:
do_add_message(builder, QueueType::RetryQueue, FlushAction::flush)

This would also apply to void async_produce(BuilderType&& message, bool throw_on_error) where the throw_on_error would be made into an enum like ThrowAction::OnError or something like that. I would leave this to @demin80 to decide.

@demin80
Copy link
Contributor Author

demin80 commented Jan 10, 2019

As pointed by mfontanini:

  1. The spacing was fixed;
    and as suggested by accelerated:
  2. retry_mutex_ was added to protect retry_messages_;
  3. Some bools were replaced with enums.

@accelerated
Copy link
Contributor

LGTM

@demin80
Copy link
Contributor Author

demin80 commented Jan 14, 2019

Any more comments here?

@mfontanini
Copy link
Owner

I haven't had time to look at this again in detail. I'll have a look tonight and merge it if it looks good.

@demin80
Copy link
Contributor Author

demin80 commented Jan 14, 2019

Thanks Matias!

@mfontanini mfontanini merged commit 05cc830 into mfontanini:master Jan 15, 2019
@mfontanini
Copy link
Owner

Thanks for the fix!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants