diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index cef799a3..70c1f5ec 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -86,6 +86,8 @@ namespace cppkafka { template class CPPKAFKA_API BufferedProducer { public: + enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker + Async }; ///< Empty the buffer and don't wait for acks /** * Concrete builder */ @@ -169,6 +171,13 @@ class CPPKAFKA_API BufferedProducer { * \remark This method throws cppkafka::HandleException on failure */ void produce(const Message& message); + + /** + * \brief Flushes all buffered messages and returns immediately. + * + * Similar to flush, it will send all messages but will not wait for acks to complete. + */ + void async_flush(); /** * \brief Flushes the buffered messages. @@ -220,6 +229,21 @@ class CPPKAFKA_API BufferedProducer { */ ssize_t get_max_buffer_size() const; + /** + * \brief Sets the method used to flush the internal buffer when 'max_buffer_size' is reached. + * Default is 'Sync' + * + * \param method The method + */ + void set_flush_method(FlushMethod method); + + /** + * \brief Gets the method used to flush the internal buffer. + * + * \return The method + */ + FlushMethod get_flush_method() const; + /** * \brief Get the number of messages not yet acked by the broker * @@ -391,6 +415,7 @@ class CPPKAFKA_API BufferedProducer { ProduceFailureCallback produce_failure_callback_; FlushFailureCallback flush_failure_callback_; ssize_t max_buffer_size_{-1}; + FlushMethod flush_method_{FlushMethod::Sync}; std::atomic pending_acks_{0}; std::atomic flushes_in_progress_{0}; std::atomic total_messages_produced_{0}; @@ -471,7 +496,7 @@ void BufferedProducer::produce(const Message& message) { } template -void BufferedProducer::flush() { +void BufferedProducer::async_flush() { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue { @@ -482,6 +507,11 @@ void BufferedProducer::flush() { async_produce(std::move(flush_queue.front()), false); flush_queue.pop_front(); } +} + +template +void BufferedProducer::flush() { + async_flush(); wait_for_acks(); } @@ -528,6 +558,17 @@ ssize_t BufferedProducer::get_max_buffer_size() const { return max_buffer_size_; } +template +void BufferedProducer::set_flush_method(FlushMethod method) { + flush_method_ = method; +} + +template +typename BufferedProducer::FlushMethod +BufferedProducer::get_flush_method() const { + return flush_method_; +} + template template void BufferedProducer::do_add_message(BuilderType&& builder, @@ -543,7 +584,12 @@ void BufferedProducer::do_add_message(BuilderType&& builder, } } if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { - flush(); + if (flush_method_ == FlushMethod::Sync) { + flush(); + } + else { + async_flush(); + } } }