Skip to content

Added a high-priority queue to BufferedProducer to avoid message re-ordering #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 15, 2019
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions include/cppkafka/utils/buffered_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ class CPPKAFKA_API BufferedProducer {
// Members
Producer producer_;
QueueType messages_;
QueueType hi_pri_messages_;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of abbreviations. I'd rather this be "high_priority_messages_" rather than "hi_pri" which is not very self describing.

mutable std::mutex mutex_;
ProduceSuccessCallback produce_success_callback_;
ProduceFailureCallback produce_failure_callback_;
Expand Down Expand Up @@ -565,7 +566,8 @@ template <typename BufferType, typename Allocator>
BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config,
const Allocator& alloc)
: producer_(prepare_configuration(std::move(config))),
messages_(alloc) {
messages_(alloc),
hi_pri_messages_(alloc) {
producer_.set_payload_policy(get_default_payload_policy<BufferType>());
#ifdef KAFKA_TEST_INSTANCE
test_params_ = nullptr;
Expand Down Expand Up @@ -625,10 +627,16 @@ template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::async_flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with "hi_pri" here

{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(hi_pri_messages_, hi_pri_flush_queue);
std::swap(messages_, flush_queue);
}
while (!hi_pri_flush_queue.empty()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably create a private function async_produce(QueueType queue, bool throw_on_error) and call it twice to avoid code duplication. Otherwise at least create a local lambda and call it with each queue, that also works.

async_produce(std::move(hi_pri_flush_queue.front()), false);
hi_pri_flush_queue.pop_front();
}
while (!flush_queue.empty()) {
async_produce(std::move(flush_queue.front()), false);
flush_queue.pop_front();
Expand All @@ -640,10 +648,16 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(hi_pri_messages_, hi_pri_flush_queue);
std::swap(messages_, flush_queue);
}
while (!hi_pri_flush_queue.empty()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment avoid code duplication here.

sync_produce(hi_pri_flush_queue.front());
hi_pri_flush_queue.pop_front();
}
while (!flush_queue.empty()) {
sync_produce(flush_queue.front());
flush_queue.pop_front();
Expand All @@ -661,25 +675,45 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(hi_pri_messages_, hi_pri_flush_queue);
std::swap(messages_, flush_queue);
}
auto remaining = timeout;
auto start_time = std::chrono::high_resolution_clock::now();
do {
sync_produce(flush_queue.front());
flush_queue.pop_front();
if (!hi_pri_flush_queue.empty()) {
sync_produce(hi_pri_flush_queue.front());
hi_pri_flush_queue.pop_front();
}
else if (!flush_queue.empty()) {
sync_produce(flush_queue.front());
flush_queue.pop_front();
}
else {
break;
}
// calculate remaining time
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
(std::chrono::high_resolution_clock::now() - start_time);
} while (!flush_queue.empty() && (remaining.count() > 0));
} while (remaining.count() > 0);

// Re-enqueue remaining messages in original order
if (!flush_queue.empty()) {
if (!hi_pri_flush_queue.empty() || !flush_queue.empty()) {
std::lock_guard<std::mutex> lock(mutex_);
messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end()));
}
if (!!hi_pri_flush_queue.empty()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a double ! here, this is actually being executed if the queue is empty.

hi_pri_messages_.insert(hi_pri_messages_.begin(),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment avoid code duplication as well.

std::make_move_iterator(hi_pri_flush_queue.begin()),
std::make_move_iterator(hi_pri_flush_queue.end()));
}
if (!flush_queue.empty()) {
messages_.insert(messages_.begin(),
std::make_move_iterator(flush_queue.begin()),
std::make_move_iterator(flush_queue.end()));
}
}
}
else {
async_flush();
Expand Down Expand Up @@ -735,11 +769,13 @@ void BufferedProducer<BufferType, Allocator>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
QueueType tmp;
std::swap(tmp, messages_);
QueueType hi_pri_tmp;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"hi_pri" again here

std::swap(hi_pri_tmp, hi_pri_messages_);
}

template <typename BufferType, typename Allocator>
size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
return messages_.size();
return messages_.size() + hi_pri_messages_.size();
}

template <typename BufferType, typename Allocator>
Expand Down Expand Up @@ -774,13 +810,14 @@ void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& build
{
std::lock_guard<std::mutex> lock(mutex_);
if (priority == MessagePriority::High) {
messages_.emplace_front(std::forward<BuilderType>(builder));
hi_pri_messages_.emplace_back(std::forward<BuilderType>(builder));
}
else {
messages_.emplace_back(std::forward<BuilderType>(builder));
}
}
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {

if (priority == MessagePriority::Low && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment in the code on why this is being done? If I read this without knowing what priorities mean in this context, this is confusing/counter intuitive: "if you're adding a message with high priority, then don't flush. Only do that if it's low priority". A comment would clarify that.

if (flush_method_ == FlushMethod::Sync) {
flush();
}
Expand Down