From ed81ce446d282e5c85254c926edf9c6797ddb85e Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Thu, 13 Dec 2018 13:05:11 -0500 Subject: [PATCH 1/3] Added queue full notification --- include/cppkafka/utils/buffered_producer.h | 58 +++++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 4083e71e..b2c3522e 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -87,8 +87,15 @@ template >> class CPPKAFKA_API BufferedProducer { public: - enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker - Async }; ///< Empty the buffer and don't wait for acks + enum class FlushMethod { + Sync, ///< Empty the buffer and wait for acks from the broker. + Async ///< Empty the buffer and don't wait for acks. + }; + enum class QueueFullNotification { + None, ///< Don't notify + EdgeTriggered, ///< Notify once. Application must call queue_full_trigger_reset() to enable again. + EachOccurence ///< Notify on each occurence. + }; /** * Concrete builder */ @@ -358,6 +365,25 @@ class CPPKAFKA_API BufferedProducer { * Simple helper to construct a builder object */ Builder make_builder(std::string topic); + + /** + * Set the type of notification when RD_KAFKA_RESP_ERR__QUEUE_FULL is received. + * + * This will call the error callback for this producer. By default this is set to QueueFullNotification::None. + */ + void set_queue_full_notification(QueueFullNotification notification); + + /** + * Get the queue full notification type. + */ + QueueFullNotification get_queue_full_notification() const; + + /** + * Reset the queue full notification trigger. + * + * This function has no effect unless QueueFullNotification == EdgeTriggered. + */ + void queue_full_trigger_reset(); /** * \brief Sets the message produce failure callback @@ -505,6 +531,8 @@ class CPPKAFKA_API BufferedProducer { std::atomic total_messages_dropped_{0}; int max_number_retries_{0}; bool has_internal_data_{false}; + QueueFullNotification queue_full_notification_{QueueFullNotification::None}; + bool queue_full_trigger_{true}; #ifdef KAFKA_TEST_INSTANCE TestParameters* test_params_; #endif @@ -798,6 +826,22 @@ BufferedProducer::make_builder(std::string topic) { return Builder(std::move(topic)); } +template +void BufferedProducer::set_queue_full_notification(QueueFullNotification notification) { + queue_full_notification_ = notification; +} + +template +typename BufferedProducer::QueueFullNotification +BufferedProducer::get_queue_full_notification() const { + return queue_full_notification_; +} + +template +void BufferedProducer::queue_full_trigger_reset() { + queue_full_trigger_ = true; +} + template void BufferedProducer::set_produce_failure_callback(ProduceFailureCallback callback) { produce_failure_callback_ = std::move(callback); @@ -827,6 +871,9 @@ template template void BufferedProducer::produce_message(BuilderType&& builder) { using builder_type = typename std::decay::type; + bool queue_full_notify = (queue_full_notification_ == QueueFullNotification::None) ? false : + (queue_full_notification_ == QueueFullNotification::EdgeTriggered) ? + queue_full_trigger_ : true; while (true) { try { MessageInternalGuard internal_guard(const_cast(builder)); @@ -840,6 +887,13 @@ void BufferedProducer::produce_message(BuilderType&& buil if (ex.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) { // If the output queue is full, then just poll producer_.poll(); + // Notify application so it can slow-down production + if (queue_full_notify) { + queue_full_notify = queue_full_trigger_ = false; //clear trigger and local state + CallbackInvoker + ("error", get_producer().get_configuration().get_error_callback(), &get_producer()) + (get_producer(), static_cast(ex.get_error().get_error()), ex.what()); + } } else { throw; From 97d1bb94349223ceaacd8dc379c473443245f092 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Sun, 6 Jan 2019 17:35:55 -0500 Subject: [PATCH 2/3] Added queue full notify callback --- include/cppkafka/utils/buffered_producer.h | 55 +++++++++++++--------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index b2c3522e..4a41e8e7 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -92,9 +92,9 @@ class CPPKAFKA_API BufferedProducer { Async ///< Empty the buffer and don't wait for acks. }; enum class QueueFullNotification { - None, ///< Don't notify - EdgeTriggered, ///< Notify once. Application must call queue_full_trigger_reset() to enable again. - EachOccurence ///< Notify on each occurence. + None, ///< Don't notify (default). + OncePerMessage, ///< Notify once per message. + EachOccurence ///< Notify on each occurence. }; /** * Concrete builder @@ -144,6 +144,14 @@ class CPPKAFKA_API BufferedProducer { * then FlushFailureCallback should not be set. */ using FlushTerminationCallback = std::function; + + /** + * Callback to indicate a queue full error was received when producing. + * + * The MessageBuilder instance represents the message which triggered the error. This callback will be called + * according to the set_queue_full_notification() setting. + */ + using QueueFullCallback = std::function; /** * \brief Constructs a buffered producer using the provided configuration @@ -377,13 +385,6 @@ class CPPKAFKA_API BufferedProducer { * Get the queue full notification type. */ QueueFullNotification get_queue_full_notification() const; - - /** - * Reset the queue full notification trigger. - * - * This function has no effect unless QueueFullNotification == EdgeTriggered. - */ - void queue_full_trigger_reset(); /** * \brief Sets the message produce failure callback @@ -449,6 +450,18 @@ class CPPKAFKA_API BufferedProducer { */ void set_flush_termination_callback(FlushTerminationCallback callback); + /** + * \brief Sets the local queue full error callback + * + * This callback will be called when local message production fails during a produce() operation according to the + * set_queue_full_notification() setting. + * + * \param callback + * + * \warning Do not call any method on the BufferedProducer while inside this callback + */ + void set_queue_full_callback(QueueFullCallback callback); + struct TestParameters { bool force_delivery_error_; bool force_produce_error_; @@ -523,6 +536,7 @@ class CPPKAFKA_API BufferedProducer { ProduceTerminationCallback produce_termination_callback_; FlushFailureCallback flush_failure_callback_; FlushTerminationCallback flush_termination_callback_; + QueueFullCallback queue_full_callback_; ssize_t max_buffer_size_{-1}; FlushMethod flush_method_{FlushMethod::Sync}; std::atomic pending_acks_{0}; @@ -532,7 +546,6 @@ class CPPKAFKA_API BufferedProducer { int max_number_retries_{0}; bool has_internal_data_{false}; QueueFullNotification queue_full_notification_{QueueFullNotification::None}; - bool queue_full_trigger_{true}; #ifdef KAFKA_TEST_INSTANCE TestParameters* test_params_; #endif @@ -837,11 +850,6 @@ BufferedProducer::get_queue_full_notification() const { return queue_full_notification_; } -template -void BufferedProducer::queue_full_trigger_reset() { - queue_full_trigger_ = true; -} - template void BufferedProducer::set_produce_failure_callback(ProduceFailureCallback callback) { produce_failure_callback_ = std::move(callback); @@ -867,13 +875,16 @@ void BufferedProducer::set_flush_termination_callback(Flu flush_termination_callback_ = std::move(callback); } +template +void BufferedProducer::set_queue_full_callback(QueueFullCallback callback) { + queue_full_callback_ = std::move(callback); +} + template template void BufferedProducer::produce_message(BuilderType&& builder) { using builder_type = typename std::decay::type; - bool queue_full_notify = (queue_full_notification_ == QueueFullNotification::None) ? false : - (queue_full_notification_ == QueueFullNotification::EdgeTriggered) ? - queue_full_trigger_ : true; + bool queue_full_notify = queue_full_notification_ != QueueFullNotification::None; while (true) { try { MessageInternalGuard internal_guard(const_cast(builder)); @@ -889,10 +900,8 @@ void BufferedProducer::produce_message(BuilderType&& buil producer_.poll(); // Notify application so it can slow-down production if (queue_full_notify) { - queue_full_notify = queue_full_trigger_ = false; //clear trigger and local state - CallbackInvoker - ("error", get_producer().get_configuration().get_error_callback(), &get_producer()) - (get_producer(), static_cast(ex.get_error().get_error()), ex.what()); + queue_full_notify = queue_full_notification_ == QueueFullNotification::EachOccurence; + CallbackInvoker("queue full", queue_full_callback_, &producer_)(builder); } } else { From 4a6b6779ad7c7aab686f110ba043027c75afcdd9 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Sun, 6 Jan 2019 17:40:39 -0500 Subject: [PATCH 3/3] Updated callback description --- include/cppkafka/utils/buffered_producer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 4a41e8e7..a241e7b6 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -146,7 +146,7 @@ class CPPKAFKA_API BufferedProducer { using FlushTerminationCallback = std::function; /** - * Callback to indicate a queue full error was received when producing. + * Callback to indicate a RD_KAFKA_RESP_ERR__QUEUE_FULL was received when producing. * * The MessageBuilder instance represents the message which triggered the error. This callback will be called * according to the set_queue_full_notification() setting.