Skip to content

Added purge (aka async_flush) functionality #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 12, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions include/cppkafka/utils/buffered_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ namespace cppkafka {
template <typename BufferType>
class CPPKAFKA_API BufferedProducer {
public:
enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker
Async }; ///< Empty the buffer and don't wait for acks
/**
* Concrete builder
*/
Expand Down Expand Up @@ -169,6 +171,13 @@ class CPPKAFKA_API BufferedProducer {
* \remark This method throws cppkafka::HandleException on failure
*/
void produce(const Message& message);

/**
* \brief Flushes all buffered messages and returns immediately.
*
* Similar to flush, it will send all messages but will not wait for acks to complete.
*/
void async_flush();

/**
* \brief Flushes the buffered messages.
Expand Down Expand Up @@ -220,6 +229,21 @@ class CPPKAFKA_API BufferedProducer {
*/
ssize_t get_max_buffer_size() const;

/**
* \brief Sets the method used to flush the internal buffer when 'max_buffer_size' is reached.
* Default is 'Sync'
*
* \param method The method
*/
void set_flush_method(FlushMethod method);

/**
* \brief Gets the method used to flush the internal buffer.
*
* \return The method
*/
FlushMethod get_flush_method() const;

/**
* \brief Get the number of messages not yet acked by the broker
*
Expand Down Expand Up @@ -391,6 +415,7 @@ class CPPKAFKA_API BufferedProducer {
ProduceFailureCallback produce_failure_callback_;
FlushFailureCallback flush_failure_callback_;
ssize_t max_buffer_size_{-1};
FlushMethod flush_method_{FlushMethod::Sync};
std::atomic<size_t> pending_acks_{0};
std::atomic<size_t> flushes_in_progress_{0};
std::atomic<size_t> total_messages_produced_{0};
Expand Down Expand Up @@ -471,7 +496,7 @@ void BufferedProducer<BufferType>::produce(const Message& message) {
}

template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
void BufferedProducer<BufferType>::async_flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
{
Expand All @@ -482,6 +507,11 @@ void BufferedProducer<BufferType>::flush() {
async_produce(std::move(flush_queue.front()), false);
flush_queue.pop_front();
}
}

template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
async_flush();
wait_for_acks();
}

Expand Down Expand Up @@ -528,6 +558,17 @@ ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const {
return max_buffer_size_;
}

template <typename BufferType>
void BufferedProducer<BufferType>::set_flush_method(FlushMethod method) {
flush_method_ = method;
}

template <typename BufferType>
typename BufferedProducer<BufferType>::FlushMethod
BufferedProducer<BufferType>::get_flush_method() const {
return flush_method_;
}

template <typename BufferType>
template <typename BuilderType>
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
Expand All @@ -543,7 +584,12 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
}
}
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
flush();
if (flush_method_ == FlushMethod::Sync) {
flush();
}
else {
async_flush();
}
}
}

Expand Down