Skip to content

Added queue full notification #149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 7, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions include/cppkafka/utils/buffered_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,15 @@ template <typename BufferType,
typename Allocator = std::allocator<ConcreteMessageBuilder<BufferType>>>
class CPPKAFKA_API BufferedProducer {
public:
enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker
Async }; ///< Empty the buffer and don't wait for acks
enum class FlushMethod {
Sync, ///< Empty the buffer and wait for acks from the broker.
Async ///< Empty the buffer and don't wait for acks.
};
enum class QueueFullNotification {
None, ///< Don't notify (default).
OncePerMessage, ///< Notify once per message.
EachOccurence ///< Notify on each occurence.
};
/**
* Concrete builder
*/
Expand Down Expand Up @@ -137,6 +144,14 @@ class CPPKAFKA_API BufferedProducer {
* then FlushFailureCallback should not be set.
*/
using FlushTerminationCallback = std::function<void(const MessageBuilder&, Error error)>;

/**
* Callback to indicate a RD_KAFKA_RESP_ERR__QUEUE_FULL was received when producing.
*
* The MessageBuilder instance represents the message which triggered the error. This callback will be called
* according to the set_queue_full_notification() setting.
*/
using QueueFullCallback = std::function<void(const MessageBuilder&)>;

/**
* \brief Constructs a buffered producer using the provided configuration
Expand Down Expand Up @@ -358,6 +373,18 @@ class CPPKAFKA_API BufferedProducer {
* Simple helper to construct a builder object
*/
Builder make_builder(std::string topic);

/**
* Set the type of notification when RD_KAFKA_RESP_ERR__QUEUE_FULL is received.
*
* This will call the error callback for this producer. By default this is set to QueueFullNotification::None.
*/
void set_queue_full_notification(QueueFullNotification notification);
Copy link
Owner

@mfontanini mfontanini Jan 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the setter for the notifiaction enum and the callback should be done in just one? You'll always call either both or none of them so I think that would make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the setter of the notification type? I think there might be cases when the app wants to set the notifocation to some enum and then it might want to turn it off (None) for a period and re-enable it later. In that case it would have to set the callback each time...prob not desirable.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, fair enough. Merging this one, thanks!


/**
* Get the queue full notification type.
*/
QueueFullNotification get_queue_full_notification() const;

/**
* \brief Sets the message produce failure callback
Expand Down Expand Up @@ -423,6 +450,18 @@ class CPPKAFKA_API BufferedProducer {
*/
void set_flush_termination_callback(FlushTerminationCallback callback);

/**
* \brief Sets the local queue full error callback
*
* This callback will be called when local message production fails during a produce() operation according to the
* set_queue_full_notification() setting.
*
* \param callback
*
* \warning Do not call any method on the BufferedProducer while inside this callback
*/
void set_queue_full_callback(QueueFullCallback callback);

struct TestParameters {
bool force_delivery_error_;
bool force_produce_error_;
Expand Down Expand Up @@ -497,6 +536,7 @@ class CPPKAFKA_API BufferedProducer {
ProduceTerminationCallback produce_termination_callback_;
FlushFailureCallback flush_failure_callback_;
FlushTerminationCallback flush_termination_callback_;
QueueFullCallback queue_full_callback_;
ssize_t max_buffer_size_{-1};
FlushMethod flush_method_{FlushMethod::Sync};
std::atomic<size_t> pending_acks_{0};
Expand All @@ -505,6 +545,7 @@ class CPPKAFKA_API BufferedProducer {
std::atomic<size_t> total_messages_dropped_{0};
int max_number_retries_{0};
bool has_internal_data_{false};
QueueFullNotification queue_full_notification_{QueueFullNotification::None};
#ifdef KAFKA_TEST_INSTANCE
TestParameters* test_params_;
#endif
Expand Down Expand Up @@ -798,6 +839,17 @@ BufferedProducer<BufferType, Allocator>::make_builder(std::string topic) {
return Builder(std::move(topic));
}

template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_queue_full_notification(QueueFullNotification notification) {
queue_full_notification_ = notification;
}

template <typename BufferType, typename Allocator>
typename BufferedProducer<BufferType, Allocator>::QueueFullNotification
BufferedProducer<BufferType, Allocator>::get_queue_full_notification() const {
return queue_full_notification_;
}

template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(ProduceFailureCallback callback) {
produce_failure_callback_ = std::move(callback);
Expand All @@ -823,10 +875,16 @@ void BufferedProducer<BufferType, Allocator>::set_flush_termination_callback(Flu
flush_termination_callback_ = std::move(callback);
}

template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_queue_full_callback(QueueFullCallback callback) {
queue_full_callback_ = std::move(callback);
}

template <typename BufferType, typename Allocator>
template <typename BuilderType>
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
using builder_type = typename std::decay<BuilderType>::type;
bool queue_full_notify = queue_full_notification_ != QueueFullNotification::None;
while (true) {
try {
MessageInternalGuard<builder_type> internal_guard(const_cast<builder_type&>(builder));
Expand All @@ -840,6 +898,11 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil
if (ex.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
// If the output queue is full, then just poll
producer_.poll();
// Notify application so it can slow-down production
if (queue_full_notify) {
queue_full_notify = queue_full_notification_ == QueueFullNotification::EachOccurence;
CallbackInvoker<QueueFullCallback>("queue full", queue_full_callback_, &producer_)(builder);
}
}
else {
throw;
Expand Down