Skip to content

Commit 71c4e02

Browse files
committed
Revised the implementation based on the reviewers' response
1 parent 00370c9 commit 71c4e02

File tree

1 file changed

+64
-66
lines changed

1 file changed

+64
-66
lines changed

Diff for: include/cppkafka/utils/buffered_producer.h

+64-66
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,6 @@ class CPPKAFKA_API BufferedProducer {
482482
#endif
483483

484484
private:
485-
enum class MessagePriority { Low, High };
486485
enum class SenderType { Sync, Async };
487486

488487
template <typename T>
@@ -519,18 +518,18 @@ class CPPKAFKA_API BufferedProducer {
519518
return nullptr;
520519
}
521520
template <typename BuilderType>
522-
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
521+
void do_add_message(BuilderType&& builder, bool is_retry, bool do_flush);
523522
template <typename BuilderType>
524523
void produce_message(BuilderType&& builder);
525524
Configuration prepare_configuration(Configuration config);
526525
void on_delivery_report(const Message& message);
527526
template <typename BuilderType>
528527
void async_produce(BuilderType&& message, bool throw_on_error);
529-
528+
530529
// Members
531530
Producer producer_;
532531
QueueType messages_;
533-
QueueType hi_pri_messages_;
532+
QueueType retry_messages_;
534533
mutable std::mutex mutex_;
535534
ProduceSuccessCallback produce_success_callback_;
536535
ProduceFailureCallback produce_failure_callback_;
@@ -567,7 +566,7 @@ BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config,
567566
const Allocator& alloc)
568567
: producer_(prepare_configuration(std::move(config))),
569568
messages_(alloc),
570-
hi_pri_messages_(alloc) {
569+
retry_messages_(alloc) {
571570
producer_.set_payload_policy(get_default_payload_policy<BufferType>());
572571
#ifdef KAFKA_TEST_INSTANCE
573572
test_params_ = nullptr;
@@ -582,7 +581,7 @@ void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder&
582581
template <typename BufferType, typename Allocator>
583582
void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
584583
add_tracker(SenderType::Async, builder);
585-
do_add_message(move(builder), MessagePriority::Low, true);
584+
do_add_message(move(builder), false, true);
586585
}
587586

588587
template <typename BufferType, typename Allocator>
@@ -626,42 +625,40 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
626625
template <typename BufferType, typename Allocator>
627626
void BufferedProducer<BufferType, Allocator>::async_flush() {
628627
CounterGuard<size_t> counter_guard(flushes_in_progress_);
629-
QueueType flush_queue; // flush from temporary queue
630-
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
628+
auto queue_flusher = [this](QueueType& queue)->void
631629
{
632-
std::lock_guard<std::mutex> lock(mutex_);
633-
std::swap(hi_pri_messages_, hi_pri_flush_queue);
634-
std::swap(messages_, flush_queue);
635-
}
636-
while (!hi_pri_flush_queue.empty()) {
637-
async_produce(std::move(hi_pri_flush_queue.front()), false);
638-
hi_pri_flush_queue.pop_front();
639-
}
640-
while (!flush_queue.empty()) {
641-
async_produce(std::move(flush_queue.front()), false);
642-
flush_queue.pop_front();
643-
}
644-
}
645-
646-
template <typename BufferType, typename Allocator>
647-
void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
648-
if (preserve_order) {
649-
CounterGuard<size_t> counter_guard(flushes_in_progress_);
650630
QueueType flush_queue; // flush from temporary queue
651-
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
652631
{
653632
std::lock_guard<std::mutex> lock(mutex_);
654-
std::swap(hi_pri_messages_, hi_pri_flush_queue);
655-
std::swap(messages_, flush_queue);
633+
std::swap(queue, flush_queue);
656634
}
657-
while (!hi_pri_flush_queue.empty()) {
658-
sync_produce(hi_pri_flush_queue.front());
659-
hi_pri_flush_queue.pop_front();
660-
}
661635
while (!flush_queue.empty()) {
662-
sync_produce(flush_queue.front());
636+
async_produce(std::move(flush_queue.front()), false);
663637
flush_queue.pop_front();
664638
}
639+
};
640+
queue_flusher(retry_messages_);
641+
queue_flusher(messages_);
642+
}
643+
644+
template <typename BufferType, typename Allocator>
645+
void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
646+
if (preserve_order) {
647+
CounterGuard<size_t> counter_guard(flushes_in_progress_);
648+
auto queue_flusher = [this](QueueType& queue)->void
649+
{
650+
QueueType flush_queue; // flush from temporary queue
651+
{
652+
std::lock_guard<std::mutex> lock(mutex_);
653+
std::swap(queue, flush_queue);
654+
}
655+
while (!flush_queue.empty()) {
656+
sync_produce(flush_queue.front());
657+
flush_queue.pop_front();
658+
}
659+
};
660+
queue_flusher(retry_messages_);
661+
queue_flusher(messages_);
665662
}
666663
else {
667664
async_flush();
@@ -675,24 +672,25 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
675672
if (preserve_order) {
676673
CounterGuard<size_t> counter_guard(flushes_in_progress_);
677674
QueueType flush_queue; // flush from temporary queue
678-
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
675+
QueueType retry_flush_queue; // flush from temporary retry queue
679676
{
680677
std::lock_guard<std::mutex> lock(mutex_);
681-
std::swap(hi_pri_messages_, hi_pri_flush_queue);
678+
std::swap(retry_messages_, retry_flush_queue);
682679
std::swap(messages_, flush_queue);
683680
}
681+
auto queue_flusher = [this](QueueType& queue)->bool
682+
{
683+
if (!queue.empty()) {
684+
sync_produce(queue.front());
685+
queue.pop_front();
686+
return true;
687+
}
688+
return false;
689+
};
684690
auto remaining = timeout;
685691
auto start_time = std::chrono::high_resolution_clock::now();
686692
do {
687-
if (!hi_pri_flush_queue.empty()) {
688-
sync_produce(hi_pri_flush_queue.front());
689-
hi_pri_flush_queue.pop_front();
690-
}
691-
else if (!flush_queue.empty()) {
692-
sync_produce(flush_queue.front());
693-
flush_queue.pop_front();
694-
}
695-
else {
693+
if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) {
696694
break;
697695
}
698696
// calculate remaining time
@@ -701,19 +699,17 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
701699
} while (remaining.count() > 0);
702700

703701
// Re-enqueue remaining messages in original order
704-
if (!hi_pri_flush_queue.empty() || !flush_queue.empty()) {
705-
std::lock_guard<std::mutex> lock(mutex_);
706-
if (!!hi_pri_flush_queue.empty()) {
707-
hi_pri_messages_.insert(hi_pri_messages_.begin(),
708-
std::make_move_iterator(hi_pri_flush_queue.begin()),
709-
std::make_move_iterator(hi_pri_flush_queue.end()));
710-
}
711-
if (!flush_queue.empty()) {
712-
messages_.insert(messages_.begin(),
713-
std::make_move_iterator(flush_queue.begin()),
714-
std::make_move_iterator(flush_queue.end()));
702+
auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue)->void
703+
{
704+
if (!src_queue.empty()) {
705+
std::lock_guard<std::mutex> lock(mutex_);
706+
dst_queue.insert(dst_queue.begin(),
707+
std::make_move_iterator(src_queue.begin()),
708+
std::make_move_iterator(src_queue.end()));
715709
}
716-
}
710+
};
711+
re_enqueuer(retry_flush_queue, retry_messages_);
712+
re_enqueuer(flush_queue, messages_);
717713
}
718714
else {
719715
async_flush();
@@ -769,13 +765,13 @@ void BufferedProducer<BufferType, Allocator>::clear() {
769765
std::lock_guard<std::mutex> lock(mutex_);
770766
QueueType tmp;
771767
std::swap(tmp, messages_);
772-
QueueType hi_pri_tmp;
773-
std::swap(hi_pri_tmp, hi_pri_messages_);
768+
QueueType retry_tmp;
769+
std::swap(retry_tmp, retry_messages_);
774770
}
775771

776772
template <typename BufferType, typename Allocator>
777773
size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
778-
return messages_.size() + hi_pri_messages_.size();
774+
return messages_.size() + retry_messages_.size();
779775
}
780776

781777
template <typename BufferType, typename Allocator>
@@ -805,19 +801,21 @@ BufferedProducer<BufferType, Allocator>::get_flush_method() const {
805801
template <typename BufferType, typename Allocator>
806802
template <typename BuilderType>
807803
void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& builder,
808-
MessagePriority priority,
804+
bool is_retry,
809805
bool do_flush) {
810806
{
811807
std::lock_guard<std::mutex> lock(mutex_);
812-
if (priority == MessagePriority::High) {
813-
hi_pri_messages_.emplace_back(std::forward<BuilderType>(builder));
808+
if (is_retry) {
809+
retry_messages_.emplace_back(std::forward<BuilderType>(builder));
814810
}
815811
else {
816812
messages_.emplace_back(std::forward<BuilderType>(builder));
817813
}
818814
}
819815

820-
if (priority == MessagePriority::Low && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
816+
// Flush the queues only if a regular message is added. Retry messages may be added
817+
// from rdkafka callbacks, and flush/async_flush is a user-level call
818+
if (!is_retry && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
821819
if (flush_method_ == FlushMethod::Sync) {
822820
flush();
823821
}
@@ -965,7 +963,7 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
965963
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
966964
if (tracker && tracker->num_retries_ > 0) {
967965
--tracker->num_retries_;
968-
do_add_message(std::forward<BuilderType>(builder), MessagePriority::High, false);
966+
do_add_message(std::forward<BuilderType>(builder), true, false);
969967
return;
970968
}
971969
}
@@ -1004,7 +1002,7 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
10041002
--tracker->num_retries_;
10051003
if (tracker->sender_ == SenderType::Async) {
10061004
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
1007-
do_add_message(Builder(message), MessagePriority::High, false);
1005+
do_add_message(Builder(message), true, false);
10081006
}
10091007
should_retry = true;
10101008
}

0 commit comments

Comments
 (0)