Skip to content

Commit ad9a1e4

Browse files
acceleratedmfontanini
authored andcommitted
If timeout is 0, the function should at least run once (#123)
1 parent 416a7d4 commit ad9a1e4

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed

include/cppkafka/utils/buffered_producer.h

+9-7
Original file line numberDiff line numberDiff line change
@@ -564,21 +564,23 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
564564

565565
template <typename BufferType, typename Allocator>
566566
bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout,
567-
bool preserve_order) {
567+
bool preserve_order) {
568568
if (preserve_order) {
569569
CounterGuard<size_t> counter_guard(flushes_in_progress_);
570570
QueueType flush_queue; // flush from temporary queue
571571
{
572572
std::lock_guard<std::mutex> lock(mutex_);
573573
std::swap(messages_, flush_queue);
574574
}
575+
auto remaining = timeout;
575576
auto start_time = std::chrono::high_resolution_clock::now();
576-
while (!flush_queue.empty() &&
577-
(std::chrono::duration_cast<std::chrono::milliseconds>
578-
(std::chrono::high_resolution_clock::now() - start_time) < timeout)) {
577+
do {
579578
sync_produce(flush_queue.front());
580579
flush_queue.pop_front();
581-
}
580+
// calculate remaining time
581+
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
582+
(std::chrono::high_resolution_clock::now() - start_time);
583+
} while (!flush_queue.empty() && (remaining.count() > 0));
582584
}
583585
else {
584586
async_flush();
@@ -608,7 +610,7 @@ template <typename BufferType, typename Allocator>
608610
bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::milliseconds timeout) {
609611
auto remaining = timeout;
610612
auto start_time = std::chrono::high_resolution_clock::now();
611-
while ((pending_acks_ > 0) && (remaining.count() > 0)) {
613+
do {
612614
try {
613615
producer_.flush(remaining);
614616
}
@@ -625,7 +627,7 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise
625627
// calculate remaining time
626628
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
627629
(std::chrono::high_resolution_clock::now() - start_time);
628-
}
630+
} while ((pending_acks_ > 0) && (remaining.count() > 0));
629631
return (pending_acks_ == 0);
630632
}
631633

0 commit comments

Comments
 (0)