Skip to content

Commit 157b7ec

Browse files
authored
Merge pull request #79 from accelerated/purge
Added purge (aka async_flush) functionality
2 parents 0c7a3b0 + f220062 commit 157b7ec

File tree

1 file changed

+48
-2
lines changed

1 file changed

+48
-2
lines changed

Diff for: include/cppkafka/utils/buffered_producer.h

+48-2
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ namespace cppkafka {
8686
template <typename BufferType>
8787
class CPPKAFKA_API BufferedProducer {
8888
public:
89+
enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker
90+
Async }; ///< Empty the buffer and don't wait for acks
8991
/**
9092
* Concrete builder
9193
*/
@@ -169,6 +171,13 @@ class CPPKAFKA_API BufferedProducer {
169171
* \remark This method throws cppkafka::HandleException on failure
170172
*/
171173
void produce(const Message& message);
174+
175+
/**
176+
* \brief Flushes all buffered messages and returns immediately.
177+
*
178+
* Similar to flush, it will send all messages but will not wait for acks to complete.
179+
*/
180+
void async_flush();
172181

173182
/**
174183
* \brief Flushes the buffered messages.
@@ -220,6 +229,21 @@ class CPPKAFKA_API BufferedProducer {
220229
*/
221230
ssize_t get_max_buffer_size() const;
222231

232+
/**
233+
* \brief Sets the method used to flush the internal buffer when 'max_buffer_size' is reached.
234+
* Default is 'Sync'
235+
*
236+
* \param method The method
237+
*/
238+
void set_flush_method(FlushMethod method);
239+
240+
/**
241+
* \brief Gets the method used to flush the internal buffer.
242+
*
243+
* \return The method
244+
*/
245+
FlushMethod get_flush_method() const;
246+
223247
/**
224248
* \brief Get the number of messages not yet acked by the broker
225249
*
@@ -391,6 +415,7 @@ class CPPKAFKA_API BufferedProducer {
391415
ProduceFailureCallback produce_failure_callback_;
392416
FlushFailureCallback flush_failure_callback_;
393417
ssize_t max_buffer_size_{-1};
418+
FlushMethod flush_method_{FlushMethod::Sync};
394419
std::atomic<size_t> pending_acks_{0};
395420
std::atomic<size_t> flushes_in_progress_{0};
396421
std::atomic<size_t> total_messages_produced_{0};
@@ -471,7 +496,7 @@ void BufferedProducer<BufferType>::produce(const Message& message) {
471496
}
472497

473498
template <typename BufferType>
474-
void BufferedProducer<BufferType>::flush() {
499+
void BufferedProducer<BufferType>::async_flush() {
475500
CounterGuard<size_t> counter_guard(flushes_in_progress_);
476501
QueueType flush_queue; // flush from temporary queue
477502
{
@@ -482,6 +507,11 @@ void BufferedProducer<BufferType>::flush() {
482507
async_produce(std::move(flush_queue.front()), false);
483508
flush_queue.pop_front();
484509
}
510+
}
511+
512+
template <typename BufferType>
513+
void BufferedProducer<BufferType>::flush() {
514+
async_flush();
485515
wait_for_acks();
486516
}
487517

@@ -528,6 +558,17 @@ ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const {
528558
return max_buffer_size_;
529559
}
530560

561+
template <typename BufferType>
562+
void BufferedProducer<BufferType>::set_flush_method(FlushMethod method) {
563+
flush_method_ = method;
564+
}
565+
566+
template <typename BufferType>
567+
typename BufferedProducer<BufferType>::FlushMethod
568+
BufferedProducer<BufferType>::get_flush_method() const {
569+
return flush_method_;
570+
}
571+
531572
template <typename BufferType>
532573
template <typename BuilderType>
533574
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
@@ -543,7 +584,12 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
543584
}
544585
}
545586
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
546-
flush();
587+
if (flush_method_ == FlushMethod::Sync) {
588+
flush();
589+
}
590+
else {
591+
async_flush();
592+
}
547593
}
548594
}
549595

0 commit comments

Comments
 (0)