@@ -87,8 +87,15 @@ template <typename BufferType,
87
87
typename Allocator = std::allocator<ConcreteMessageBuilder<BufferType>>>
88
88
class CPPKAFKA_API BufferedProducer {
89
89
public:
90
- enum class FlushMethod { Sync, // /< Empty the buffer and wait for acks from the broker
91
- Async }; // /< Empty the buffer and don't wait for acks
90
+ enum class FlushMethod {
91
+ Sync, // /< Empty the buffer and wait for acks from the broker.
92
+ Async // /< Empty the buffer and don't wait for acks.
93
+ };
94
+ enum class QueueFullNotification {
95
+ None, // /< Don't notify
96
+ EdgeTriggered, // /< Notify once. Application must call queue_full_trigger_reset() to enable again.
97
+ EachOccurence // /< Notify on each occurence.
98
+ };
92
99
/* *
93
100
* Concrete builder
94
101
*/
@@ -333,6 +340,25 @@ class CPPKAFKA_API BufferedProducer {
333
340
* Simple helper to construct a builder object
334
341
*/
335
342
Builder make_builder (std::string topic);
343
+
344
+ /* *
345
+ * Set the type of notification when RD_KAFKA_RESP_ERR__QUEUE_FULL is received.
346
+ *
347
+ * This will call the error callback for this producer. By default this is set to QueueFullNotification::None.
348
+ */
349
+ void set_queue_full_notification (QueueFullNotification notification);
350
+
351
+ /* *
352
+ * Get the queue full notification type.
353
+ */
354
+ QueueFullNotification get_queue_full_notification () const ;
355
+
356
+ /* *
357
+ * Reset the queue full notification trigger.
358
+ *
359
+ * This function has no effect unless QueueFullNotification == EdgeTriggered.
360
+ */
361
+ void queue_full_trigger_reset ();
336
362
337
363
/* *
338
364
* \brief Sets the message produce failure callback
@@ -453,6 +479,8 @@ class CPPKAFKA_API BufferedProducer {
453
479
std::atomic<size_t > total_messages_dropped_{0 };
454
480
int max_number_retries_{0 };
455
481
bool has_internal_data_{false };
482
+ QueueFullNotification queue_full_notification_{QueueFullNotification::None};
483
+ bool queue_full_trigger_{true };
456
484
#ifdef KAFKA_TEST_INSTANCE
457
485
TestParameters* test_params_;
458
486
#endif
@@ -740,6 +768,22 @@ BufferedProducer<BufferType, Allocator>::make_builder(std::string topic) {
740
768
return Builder (std::move (topic));
741
769
}
742
770
771
+ template <typename BufferType, typename Allocator>
772
+ void BufferedProducer<BufferType, Allocator>::set_queue_full_notification(QueueFullNotification notification) {
773
+ queue_full_notification_ = notification;
774
+ }
775
+
776
+ template <typename BufferType, typename Allocator>
777
+ typename BufferedProducer<BufferType, Allocator>::QueueFullNotification
778
+ BufferedProducer<BufferType, Allocator>::get_queue_full_notification() const {
779
+ return queue_full_notification_;
780
+ }
781
+
782
+ template <typename BufferType, typename Allocator>
783
+ void BufferedProducer<BufferType, Allocator>::queue_full_trigger_reset() {
784
+ queue_full_trigger_ = true ;
785
+ }
786
+
743
787
template <typename BufferType, typename Allocator>
744
788
void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(ProduceFailureCallback callback) {
745
789
produce_failure_callback_ = std::move (callback);
@@ -759,6 +803,9 @@ template <typename BufferType, typename Allocator>
759
803
template <typename BuilderType>
760
804
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
761
805
using builder_type = typename std::decay<BuilderType>::type;
806
+ bool queue_full_notify = (queue_full_notification_ == QueueFullNotification::None) ? false :
807
+ (queue_full_notification_ == QueueFullNotification::EdgeTriggered) ?
808
+ queue_full_trigger_ : true ;
762
809
while (true ) {
763
810
try {
764
811
MessageInternalGuard<builder_type> internal_guard (const_cast <builder_type&>(builder));
@@ -772,6 +819,13 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil
772
819
if (ex.get_error () == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
773
820
// If the output queue is full, then just poll
774
821
producer_.poll ();
822
+ // Notify application so it can slow-down production
823
+ if (queue_full_notify) {
824
+ queue_full_notify = queue_full_trigger_ = false ; // clear trigger and local state
825
+ CallbackInvoker<Configuration::ErrorCallback>
826
+ (" error" , get_producer ().get_configuration ().get_error_callback (), &get_producer ())
827
+ (get_producer (), static_cast <int >(ex.get_error ().get_error ()), ex.what ());
828
+ }
775
829
}
776
830
else {
777
831
throw ;
0 commit comments