Skip to content

Added a high-priority queue to BufferedProducer to avoid message re-ordering #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 15, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 85 additions & 47 deletions include/cppkafka/utils/buffered_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,10 @@ class CPPKAFKA_API BufferedProducer {
#endif

private:
enum class MessagePriority { Low, High };
enum class SenderType { Sync, Async };

enum class QueueKind { Retry, Regular };
enum class FlushAction { DontFlush, DoFlush };

template <typename T>
struct CounterGuard{
CounterGuard(std::atomic<T>& counter) : counter_(counter) { ++counter_; }
Expand Down Expand Up @@ -519,18 +520,21 @@ class CPPKAFKA_API BufferedProducer {
return nullptr;
}
template <typename BuilderType>
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
void do_add_message(BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action);
template <typename BuilderType>
void produce_message(BuilderType&& builder);
Configuration prepare_configuration(Configuration config);
void on_delivery_report(const Message& message);
template <typename BuilderType>
void async_produce(BuilderType&& message, bool throw_on_error);

static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex);

// Members
Producer producer_;
QueueType messages_;
QueueType retry_messages_;
mutable std::mutex mutex_;
mutable std::mutex retry_mutex_;
ProduceSuccessCallback produce_success_callback_;
ProduceFailureCallback produce_failure_callback_;
ProduceTerminationCallback produce_termination_callback_;
Expand Down Expand Up @@ -565,7 +569,8 @@ template <typename BufferType, typename Allocator>
BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config,
const Allocator& alloc)
: producer_(prepare_configuration(std::move(config))),
messages_(alloc) {
messages_(alloc),
retry_messages_(alloc) {
producer_.set_payload_policy(get_default_payload_policy<BufferType>());
#ifdef KAFKA_TEST_INSTANCE
test_params_ = nullptr;
Expand All @@ -580,7 +585,7 @@ void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder&
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
add_tracker(SenderType::Async, builder);
do_add_message(move(builder), MessagePriority::Low, true);
do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush);
}

template <typename BufferType, typename Allocator>
Expand Down Expand Up @@ -624,30 +629,36 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::async_flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(messages_, flush_queue);
}
while (!flush_queue.empty()) {
async_produce(std::move(flush_queue.front()), false);
flush_queue.pop_front();
}
QueueType flush_queue; // flush from temporary queue
swap_queues(queue, flush_queue, mutex);

while (!flush_queue.empty()) {
async_produce(std::move(flush_queue.front()), false);
flush_queue.pop_front();
}
};
queue_flusher(retry_messages_, retry_mutex_);
queue_flusher(messages_, mutex_);
}

template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(messages_, flush_queue);
}
while (!flush_queue.empty()) {
sync_produce(flush_queue.front());
flush_queue.pop_front();
}
QueueType flush_queue; // flush from temporary queue
swap_queues(queue, flush_queue, mutex);

while (!flush_queue.empty()) {
sync_produce(flush_queue.front());
flush_queue.pop_front();
}
};
queue_flusher(retry_messages_, retry_mutex_);
queue_flusher(messages_, mutex_);
}
else {
async_flush();
Expand All @@ -661,25 +672,42 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
swap_queues(messages_, flush_queue, mutex_);
QueueType retry_flush_queue; // flush from temporary retry queue
swap_queues(retry_messages_, retry_flush_queue, retry_mutex_);

auto queue_flusher = [this](QueueType& queue)->bool
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(messages_, flush_queue);
}
if (!queue.empty()) {
sync_produce(queue.front());
queue.pop_front();
return true;
}
return false;
};
auto remaining = timeout;
auto start_time = std::chrono::high_resolution_clock::now();
do {
sync_produce(flush_queue.front());
flush_queue.pop_front();
if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this statement should be !(queue_flusher(retry_flush_queue) || queue_flusher(flush_queue)) because the way it's written now both flushers will be evaluated which means you will flush alternatively from each queue. The retry queue should be fully flushed before the main queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition statements "(!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)" and "!(queue_flusher(retry_flush_queue) || queue_flusher(flush_queue))" are equivalent.

In the former condition, if queue_flusher(retry_flush_queue) returns true, then the negation of it will evaluate to false, hence because of the short-circuit implementation, the second part of the condition queue_flusher(flush_queue) will not be evaluated because it's clear that the entire statement will evaluate to false regardless. So queue_flusher(flush_queue) will be evaluated if and only if queue_flusher(retry_flush_queue) returns false.

However, if you think that changing the way you suggested will improve the readability, I'm ready to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right, that makes sense. I somehow only thought of the false case.

break;
}
// calculate remaining time
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
(std::chrono::high_resolution_clock::now() - start_time);
} while (!flush_queue.empty() && (remaining.count() > 0));
} while (remaining.count() > 0);

// Re-enqueue remaining messages in original order
if (!flush_queue.empty()) {
std::lock_guard<std::mutex> lock(mutex_);
messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end()));
}
auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void
{
if (!src_queue.empty()) {
std::lock_guard<std::mutex> lock(mutex);
dst_queue.insert(dst_queue.begin(),
std::make_move_iterator(src_queue.begin()),
std::make_move_iterator(src_queue.end()));
}
};
re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_);
re_enqueuer(flush_queue, messages_, mutex_);
}
else {
async_flush();
Expand Down Expand Up @@ -732,14 +760,15 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise

template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
QueueType tmp;
std::swap(tmp, messages_);
swap_queues(messages_, tmp, mutex_);
QueueType retry_tmp;
swap_queues(retry_messages_, retry_tmp, retry_mutex_);
}

template <typename BufferType, typename Allocator>
size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
return messages_.size();
return messages_.size() + retry_messages_.size();
}

template <typename BufferType, typename Allocator>
Expand Down Expand Up @@ -769,18 +798,20 @@ BufferedProducer<BufferType, Allocator>::get_flush_method() const {
template <typename BufferType, typename Allocator>
template <typename BuilderType>
void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& builder,
MessagePriority priority,
bool do_flush) {
{
QueueKind queue_kind,
FlushAction flush_action) {
if (queue_kind == QueueKind::Retry) {
std::lock_guard<std::mutex> lock(retry_mutex_);
retry_messages_.emplace_back(std::forward<BuilderType>(builder));
}
else {
std::lock_guard<std::mutex> lock(mutex_);
if (priority == MessagePriority::High) {
messages_.emplace_front(std::forward<BuilderType>(builder));
}
else {
messages_.emplace_back(std::forward<BuilderType>(builder));
}
messages_.emplace_back(std::forward<BuilderType>(builder));
}
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {

// Flush the queues only if a regular message is added. Retry messages may be added
// from rdkafka callbacks, and flush/async_flush is a user-level call
if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
if (flush_method_ == FlushMethod::Sync) {
flush();
}
Expand Down Expand Up @@ -928,7 +959,7 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
if (tracker && tracker->num_retries_ > 0) {
--tracker->num_retries_;
do_add_message(std::forward<BuilderType>(builder), MessagePriority::High, false);
do_add_message(std::forward<BuilderType>(builder), QueueKind::Retry, FlushAction::DontFlush);
return;
}
}
Expand Down Expand Up @@ -967,7 +998,7 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
--tracker->num_retries_;
if (tracker->sender_ == SenderType::Async) {
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
do_add_message(Builder(message), MessagePriority::High, false);
do_add_message(Builder(message), QueueKind::Retry, FlushAction::DontFlush);
}
should_retry = true;
}
Expand Down Expand Up @@ -999,6 +1030,13 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
}
}

template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1, BufferedProducer<BufferType, Allocator>::QueueType & queue2, std::mutex & mutex)
{
std::lock_guard<std::mutex> lock(mutex);
std::swap(queue1, queue2);
}

} // cppkafka

#endif // CPPKAFKA_BUFFERED_PRODUCER_H