From a4d9b4c5084d6e5d61edf5456abbbf39dc5aa602 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Thu, 24 May 2018 12:16:55 -0400 Subject: [PATCH 1/5] Thread safe buffered producer --- CMakeLists.txt | 43 ++++++++-- README.md | 29 ++++--- examples/CMakeLists.txt | 37 ++++----- include/cppkafka/utils/buffered_producer.h | 86 +++++++++++++++---- tests/CMakeLists.txt | 4 +- tests/producer_test.cpp | 96 +++++++++++++++++++++- 6 files changed, 231 insertions(+), 64 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 30690fe4..0314b3d6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,8 +24,18 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib) # Build output checks +option(CPPKAFKA_CMAKE_VERBOSE "Generate verbose output." OFF) option(CPPKAFKA_BUILD_SHARED "Build cppkafka as a shared library." ON) option(CPPKAFKA_DISABLE_TESTS "Disable build of cppkafka tests." OFF) +option(CPPKAFKA_DISABLE_EXAMPLES "Disable build of cppkafka examples." OFF) +option(CPPKAFKA_BOOST_STATIC_LIBS "Link with Boost static libraries." ON) +option(CPPKAFKA_BOOST_USE_MULTITHREADED "Use Boost multithreaded libraries." ON) + +# Disable output from find_package macro +if (NOT CPPKAFKA_CMAKE_VERBOSE) + set(FIND_PACKAGE_QUIET QUIET) +endif() + if(CPPKAFKA_BUILD_SHARED) message(STATUS "Build will generate a shared library. " "Use CPPKAFKA_BUILD_SHARED=0 to perform a static build") @@ -37,16 +47,36 @@ else() endif() # Look for Boost (just need boost.optional headers here) -find_package(Boost REQUIRED) -find_package(RdKafka REQUIRED) +find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET}) +find_package(RdKafka REQUIRED ${FIND_PACKAGE_QUIET}) + +if (Boost_FOUND) + find_package(Boost COMPONENTS thread program_options ${FIND_PACKAGE_QUIET}) + set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS}) + set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED}) + include_directories(${Boost_INCLUDE_DIRS}) + link_directories(${Boost_LIBRARY_DIRS}) + if (CPPKAFKA_CMAKE_VERBOSE) + message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}") + message(STATUS "Boost library dir: ${Boost_LIBRARY_DIRS}") + message(STATUS "Boost use static libs: ${Boost_USE_STATIC_LIBS}") + message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}") + message(STATUS "Boost libraries: ${Boost_LIBRARIES}") + endif() +endif() add_subdirectory(src) add_subdirectory(include) -add_subdirectory(examples) +# Examples target +if (NOT CPPKAFKA_DISABLE_EXAMPLES AND Boost_PROGRAM_OPTIONS_FOUND) + add_subdirectory(examples) +else() + message(STATUS "Disabling examples") +endif() # Add a target to generate API documentation using Doxygen -find_package(Doxygen QUIET) +find_package(Doxygen ${FIND_PACKAGE_QUIET}) if(DOXYGEN_FOUND) configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/docs/Doxyfile.in @@ -61,16 +91,17 @@ if(DOXYGEN_FOUND) ) endif(DOXYGEN_FOUND) -if(NOT CPPKAFKA_DISABLE_TESTS) +if(NOT CPPKAFKA_DISABLE_TESTS AND Boost_THREAD_FOUND) set(CATCH_ROOT ${CMAKE_SOURCE_DIR}/third_party/Catch2) if(EXISTS ${CATCH_ROOT}/CMakeLists.txt) set(CATCH_INCLUDE ${CATCH_ROOT}/single_include) - enable_testing() add_subdirectory(tests) else() message(STATUS "Disabling tests because submodule Catch2 isn't checked out") endif() +else() + message(STATUS "Disabling tests") endif() if(NOT TARGET uninstall) diff --git a/README.md b/README.md index 5658b9bd..a2947a2b 100644 --- a/README.md +++ b/README.md @@ -55,25 +55,32 @@ In order to compile _cppkafka_ you need: * _CMake_ * A compiler with good C++11 support (e.g. gcc >= 4.8). This was tested successfully on _g++ 4.8.3_. -* The boost library. _cppkafka_ only requires boost.optional, which is a header only library, -so this doesn't add any additional runtime dependencies. +* The boost library. Now, in order to build, just run: ```Shell mkdir build cd build -cmake .. +cmake .. make ``` ## CMake options -If you have installed _librdkafka_ on a non standard directory, you can use the -`RDKAFKA_ROOT_DIR` cmake parameter when configuring the project: - +The following cmake options can be specified: +* `RDKAFKA_ROOT_DIR` : Specify a different librdkafka install directory. +* `BOOST_ROOT` : Specify a different Boost install directory. +* `CPPKAFKA_CMAKE_VERBOSE` : Generate verbose output. Default is `OFF`. +* `CPPKAFKA_BUILD_SHARED` : Build cppkafka as a shared library. Default is `ON`. +* `CPPKAFKA_DISABLE_TESTS` : Disable build of cppkafka tests. Default is `OFF`. +* `CPPKAFKA_DISABLE_EXAMPLES` : Disable build of cppkafka examples. Default is `OFF`. +* `CPPKAFKA_BOOST_STATIC_LIBS` : Link with Boost static libraries. Default is `ON`. +* `CPPKAFKA_BOOST_USE_MULTITHREADED` : Use Boost multi-threaded libraries. Default is `ON`. + +Example: ```Shell -cmake .. -DRDKAFKA_ROOT_DIR=/some/other/dir +cmake -DRDKAFKA_ROOT_DIR=/some/other/dir -DCPPKAFKA_BUILD_SHARED=OFF ... ``` Note that the `RDKAFKA_ROOT_DIR` must contain the following structure: @@ -86,13 +93,6 @@ ${RDKAFKA_ROOT_DIR}/ + lib/librdkafka.a ``` -By default, a shared library will be built. If you want to perform a static build, -use the `CPPKAFKA_BUILD_SHARED` parameter: - -```Shell -cmake .. -DCPPKAFKA_BUILD_SHARED=0 -``` - # Using If you want to use _cppkafka_, you'll need to link your application with: @@ -108,4 +108,3 @@ _Doxygen_ to be installed. The documentation will be written in html format at Make sure to check the [wiki](https://github.com/mfontanini/cppkafka/wiki) which includes some documentation about the project and some of its features. - diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5ca1ce97..9b73941c 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,22 +1,15 @@ -find_package(Boost COMPONENTS program_options) - -if (Boost_PROGRAM_OPTIONS_FOUND) - link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY}) - - include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) - include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) - - add_custom_target(examples) - macro(create_example example_name) - add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp") - add_dependencies(examples ${example_name}) - endmacro() - - create_example(kafka_producer) - create_example(kafka_consumer) - create_example(kafka_consumer_dispatcher) - create_example(metadata) - create_example(consumers_information) -else() - message(STATUS "Disabling examples since boost.program_options was not found") -endif() +link_libraries(cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) +include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR}) + +add_custom_target(examples) +macro(create_example example_name) + add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp") + add_dependencies(examples ${example_name}) +endmacro() + +create_example(kafka_producer) +create_example(kafka_consumer) +create_example(kafka_consumer_dispatcher) +create_example(metadata) +create_example(consumers_information) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index c6cb51f0..d7af44a0 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -37,6 +37,9 @@ #include #include #include +#include +#include +#include #include #include "../producer.h" #include "../message.h" @@ -159,11 +162,27 @@ class CPPKAFKA_API BufferedProducer { ssize_t get_max_buffer_size() const; /** - * \brief Get the number of messages in the buffer + * \brief Get the number of unsent messages in the buffer * * \return The number of messages */ size_t get_buffer_size() const; + + /** + * \brief Returns the total number of messages ack-ed by the broker + * + * \return The total number of messages since the beginning or since the last roll-over + * + * \remark Call get_rollover_count() to get the number of times the counter has rolled over + */ + size_t get_total_messages_acked() const; + + /** + * \brief Roll-over counter for get_total_messages_acked + * + * \return The number of rolls + */ + uint16_t get_rollover_count() const; /** * Gets the Producer object @@ -190,6 +209,7 @@ class CPPKAFKA_API BufferedProducer { * \param callback The callback to be set */ void set_produce_failure_callback(ProduceFailureCallback callback); + private: using QueueType = std::queue; @@ -204,10 +224,13 @@ class CPPKAFKA_API BufferedProducer { Configuration::DeliveryReportCallback delivery_report_callback_; Producer producer_; QueueType messages_; + mutable boost::mutex exclusive_access_; + mutable boost::shared_mutex shared_access_; ProduceFailureCallback produce_failure_callback_; - size_t expected_acks_{0}; - size_t messages_acked_{0}; ssize_t max_buffer_size_{-1}; + std::atomic_ulong expected_acks_{0}; + std::atomic_ullong total_messages_acked_{0}; + std::atomic_ushort rollover_counter_{0}; }; template @@ -230,7 +253,6 @@ void BufferedProducer::add_message(Builder builder) { template void BufferedProducer::produce(const MessageBuilder& builder) { produce_message(builder); - expected_acks_++; } template @@ -241,16 +263,21 @@ void BufferedProducer::produce(const Message& message) { template void BufferedProducer::flush() { - while (!messages_.empty()) { - produce_message(messages_.front()); - messages_.pop(); + { + boost::shared_lock grant(shared_access_); + size_t num_messages = messages_.size(); + while (num_messages--) { + produce_message(messages_.front()); + boost::lock_guard require(exclusive_access_); + messages_.pop(); + } } wait_for_acks(); } template void BufferedProducer::wait_for_acks() { - while (messages_acked_ < expected_acks_) { + while (expected_acks_ > 0) { try { producer_.flush(); } @@ -264,16 +291,13 @@ void BufferedProducer::wait_for_acks() { } } } - expected_acks_ = 0; - messages_acked_ = 0; } template void BufferedProducer::clear() { + boost::unique_lock restrict(shared_access_); QueueType tmp; std::swap(tmp, messages_); - expected_acks_ = 0; - messages_acked_ = 0; } template @@ -297,8 +321,11 @@ size_t BufferedProducer::get_buffer_size() const { template template void BufferedProducer::do_add_message(BuilderType&& builder) { - expected_acks_++; - messages_.push(std::forward(builder)); + { + boost::shared_lock grant(shared_access_); + boost::lock_guard require(exclusive_access_); + messages_.push(std::forward(builder)); + } if ((max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { flush(); } @@ -314,6 +341,21 @@ const Producer& BufferedProducer::get_producer() const { return producer_; } +template +size_t BufferedProducer::get_buffer_size() const { + return messages_.size(); +} + +template +size_t BufferedProducer::get_total_messages_acked() const { + return total_messages_acked_; +} + +template +uint16_t BufferedProducer::get_rollover_count() const { + return rollover_counter_; +} + template typename BufferedProducer::Builder BufferedProducer::make_builder(std::string topic) { @@ -345,6 +387,8 @@ void BufferedProducer::produce_message(const MessageType& message) { } } } + // Sent successfully + ++expected_acks_; } template @@ -357,10 +401,15 @@ Configuration BufferedProducer::prepare_configuration(Configuration template void BufferedProducer::on_delivery_report(const Message& message) { + // Decrement the expected acks + --expected_acks_; + assert(expected_acks_ != (unsigned long)-1); // Prevent underflow + // Call the user-supplied delivery report callback if any if (delivery_report_callback_) { delivery_report_callback_(producer_, message); } + // We should produce this message again if it has an error and we either don't have a // produce failure callback or we have one but it returns true bool should_produce = message.get_error() && @@ -369,9 +418,12 @@ void BufferedProducer::on_delivery_report(const Message& message) { produce_message(message); return; } - // If production was successful or the produce failure callback returned false, then - // let's consider it to be acked - messages_acked_++; + + // Increment the total successful transmissions + ++total_messages_acked_; + if (total_messages_acked_ == 0) { + ++rollover_counter_; + } } } // cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 317f7cd7..6833d1a5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,6 +1,6 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/) include_directories(SYSTEM ${CATCH_INCLUDE}) -include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) +include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR}) set(KAFKA_TEST_INSTANCE "kafka-vm:9092" CACHE STRING "The kafka instance to which to connect to run tests") @@ -8,7 +8,7 @@ add_custom_target(tests) include_directories(${CMAKE_CURRENT_SOURCE_DIR}) add_library(cppkafka-test EXCLUDE_FROM_ALL test_utils.cpp) -target_link_libraries(cppkafka-test cppkafka ${RDKAFKA_LIBRARY} pthread) +target_link_libraries(cppkafka-test cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread) add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index cda960dd..dde2a119 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -12,17 +12,19 @@ using std::string; using std::to_string; using std::set; +using std::vector; using std::tie; using std::move; using std::thread; +namespace this_thread = std::this_thread; using std::mutex; using std::unique_lock; using std::lock_guard; using std::condition_variable; - using std::chrono::system_clock; using std::chrono::seconds; using std::chrono::milliseconds; +using std::ref; using namespace cppkafka; @@ -48,6 +50,44 @@ static Configuration make_consumer_config() { return config; } +void producer_run(BufferedProducer& producer, + int& exit_flag, condition_variable& clear, + int num_messages, + int partition) { + MessageBuilder builder(KAFKA_TOPIC); + string key("wassup?"); + string payload("nothing much!"); + + builder.partition(partition).key(key).payload(payload); + for (int i = 0; i < num_messages; ++i) { + if (i == num_messages/2) { + clear.notify_one(); + } + producer.add_message(builder); + this_thread::sleep_for(milliseconds(10)); + } + exit_flag = 1; +} + +void flusher_run(BufferedProducer& producer, + int& exit_flag, + int num_flush) { + while (!exit_flag) { + if (producer.get_buffer_size() >= (size_t)num_flush) { + producer.flush(); + } + } + producer.flush(); +} + +void clear_run(BufferedProducer& producer, + condition_variable& clear) { + mutex m; + unique_lock lock(m); + clear.wait(lock); + producer.clear(); +} + TEST_CASE("simple production", "[producer]") { int partition = 0; @@ -234,7 +274,7 @@ TEST_CASE("multiple messages", "[producer]") { } } -TEST_CASE("buffered producer", "[producer]") { +TEST_CASE("buffered producer", "[producer][buffered_producer]") { int partition = 0; // Create a consumer and assign this topic/partition @@ -303,3 +343,55 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") { const auto& messages = runner.get_messages(); REQUIRE(messages.size() == producer.get_max_buffer_size()); } + +TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") { + int partition = 0; + vector threads; + int num_messages = 50; + int num_flush = 10; + int exit_flag = 0; + condition_variable clear; + + // Create a consumer and assign this topic/partition + Consumer consumer(make_consumer_config()); + consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + ConsumerRunner runner(consumer, num_messages, 1); + + BufferedProducer producer(make_producer_config()); + + threads.push_back(thread(producer_run, ref(producer), ref(exit_flag), ref(clear), num_messages, partition)); + threads.push_back(thread(flusher_run, ref(producer), ref(exit_flag), num_flush)); + + // Wait for completion + runner.try_join(); + for (auto&& thread : threads) { + thread.join(); + } + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == num_messages); + REQUIRE(producer.get_total_messages_acked() == num_messages); + REQUIRE(producer.get_rollover_count() == 0); + REQUIRE(producer.get_buffer_size() == 0); +} + +TEST_CASE("clear multi-threaded buffered producer", "[producer][buffered_producer]") { + int partition = 0; + vector threads; + int num_messages = 50; + int exit_flag = 0; + condition_variable clear; + + BufferedProducer producer(make_producer_config()); + + threads.push_back(thread(producer_run, ref(producer), ref(exit_flag), ref(clear), num_messages, partition)); + threads.push_back(thread(clear_run, ref(producer), ref(clear))); + + // Wait for completion + for (auto&& thread : threads) { + thread.join(); + } + + REQUIRE(producer.get_total_messages_acked() == 0); + REQUIRE(producer.get_rollover_count() == 0); + REQUIRE(producer.get_buffer_size() < num_messages); +} From 4c1d1078d3e2d5a41613a7c1bd26009d0f675284 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Thu, 24 May 2018 13:52:21 -0400 Subject: [PATCH 2/5] Using single mutex version --- CMakeLists.txt | 4 ++-- include/cppkafka/utils/buffered_producer.h | 27 ++++++++++------------ tests/CMakeLists.txt | 2 +- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0314b3d6..1ef0f337 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -51,7 +51,7 @@ find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET}) find_package(RdKafka REQUIRED ${FIND_PACKAGE_QUIET}) if (Boost_FOUND) - find_package(Boost COMPONENTS thread program_options ${FIND_PACKAGE_QUIET}) + find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET}) set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS}) set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED}) include_directories(${Boost_INCLUDE_DIRS}) @@ -91,7 +91,7 @@ if(DOXYGEN_FOUND) ) endif(DOXYGEN_FOUND) -if(NOT CPPKAFKA_DISABLE_TESTS AND Boost_THREAD_FOUND) +if(NOT CPPKAFKA_DISABLE_TESTS) set(CATCH_ROOT ${CMAKE_SOURCE_DIR}/third_party/Catch2) if(EXISTS ${CATCH_ROOT}/CMakeLists.txt) set(CATCH_INCLUDE ${CATCH_ROOT}/single_include) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index d7af44a0..2b9c6268 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -37,8 +37,7 @@ #include #include #include -#include -#include +#include #include #include #include "../producer.h" @@ -224,8 +223,7 @@ class CPPKAFKA_API BufferedProducer { Configuration::DeliveryReportCallback delivery_report_callback_; Producer producer_; QueueType messages_; - mutable boost::mutex exclusive_access_; - mutable boost::shared_mutex shared_access_; + mutable std::mutex mutex_; ProduceFailureCallback produce_failure_callback_; ssize_t max_buffer_size_{-1}; std::atomic_ulong expected_acks_{0}; @@ -263,14 +261,14 @@ void BufferedProducer::produce(const Message& message) { template void BufferedProducer::flush() { - { - boost::shared_lock grant(shared_access_); - size_t num_messages = messages_.size(); - while (num_messages--) { - produce_message(messages_.front()); - boost::lock_guard require(exclusive_access_); - messages_.pop(); + size_t num_messages = messages_.size(); + while (num_messages--) { + std::lock_guard lock(mutex_); + if (messages_.empty()) { + break; //perhaps clear() was called } + produce_message(messages_.front()); + messages_.pop(); } wait_for_acks(); } @@ -295,7 +293,7 @@ void BufferedProducer::wait_for_acks() { template void BufferedProducer::clear() { - boost::unique_lock restrict(shared_access_); + std::lock_guard lock(mutex_); QueueType tmp; std::swap(tmp, messages_); } @@ -322,9 +320,8 @@ template template void BufferedProducer::do_add_message(BuilderType&& builder) { { - boost::shared_lock grant(shared_access_); - boost::lock_guard require(exclusive_access_); - messages_.push(std::forward(builder)); + std::lock_guard lock(mutex_); + messages_.push(std::move(builder)); } if ((max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { flush(); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6833d1a5..a423739c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -8,7 +8,7 @@ add_custom_target(tests) include_directories(${CMAKE_CURRENT_SOURCE_DIR}) add_library(cppkafka-test EXCLUDE_FROM_ALL test_utils.cpp) -target_link_libraries(cppkafka-test cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread) +target_link_libraries(cppkafka-test cppkafka ${RDKAFKA_LIBRARY} pthread) add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") From ef5ed27cf2b87b4683dd1795b5d662fd854a076b Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Fri, 25 May 2018 11:51:24 -0400 Subject: [PATCH 3/5] Changed based on feedback --- include/cppkafka/message_builder.h | 19 +++ include/cppkafka/producer.h | 1 + include/cppkafka/utils/buffered_producer.h | 141 +++++++++++++-------- tests/producer_test.cpp | 2 - 4 files changed, 105 insertions(+), 58 deletions(-) diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 9e918846..80896455 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -34,6 +34,7 @@ #include "buffer.h" #include "topic.h" #include "macros.h" +#include "message.h" namespace cppkafka { @@ -49,6 +50,14 @@ class BasicMessageBuilder { * \param topic The topic into which this message would be produced */ BasicMessageBuilder(std::string topic); + + /** + * Construct a BasicMessageBuilder from a Message object + * + * \remark The application must guarantee the lifetime of the Message exceeds that of this + * BasicMessageBuilder as this class does not take ownership of any Message buffers + */ + BasicMessageBuilder(const Message& message); /** * \brief Construct a message builder from another one that uses a different buffer type @@ -177,6 +186,16 @@ BasicMessageBuilder::BasicMessageBuilder(std::string topic) : topic_(std::move(topic)) { } +template +BasicMessageBuilder::BasicMessageBuilder(const Message& message) +: topic_(message.get_topic()), + key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), + payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), + user_data_(message.get_user_data()) +{ + +} + template template BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& rhs) diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 404fbe4f..358a0fc6 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -82,6 +82,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase { * The policy to use for the payload. The default policy is COPY_PAYLOAD */ enum class PayloadPolicy { + PASSTHROUGH_PAYLOAD = 0, ///< Rdkafka will not copy nor free the payload. COPY_PAYLOAD = RD_KAFKA_MSG_F_COPY, ///< Means RD_KAFKA_MSG_F_COPY FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE }; diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 2b9c6268..04610848 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -31,7 +31,7 @@ #define CPPKAFKA_BUFFERED_PRODUCER_H #include -#include +#include #include #include #include @@ -55,10 +55,12 @@ namespace cppkafka { * produced messages (either in a buffer or non buffered way) are acknowledged by the kafka * brokers. * - * When producing messages, this class will handle cases where the producer's queue is full so it\ + * When producing messages, this class will handle cases where the producer's queue is full so it * will poll until the production is successful. * - * This class is not thread safe. + * \remark This class is thread safe + * + * \warning The application *MUST NOT* change the payload policy on the underlying Producer object. */ template class CPPKAFKA_API BufferedProducer { @@ -69,9 +71,14 @@ class CPPKAFKA_API BufferedProducer { using Builder = ConcreteMessageBuilder; /** - * Callback to indicate a message failed to be produced. + * Callback to indicate a message failed to be produced by the broker */ using ProduceFailureCallback = std::function; + + /** + * Callback to indicate a message failed to be flushed + */ + using FlushFailureCallback = std::function; /** * \brief Constructs a buffered producer using the provided configuration @@ -108,6 +115,8 @@ class CPPKAFKA_API BufferedProducer { * wait for it to be acknowledged. * * \param builder The builder that contains the message to be produced + * + * \remark This method throws cppkafka::HandleException on failure */ void produce(const MessageBuilder& builder); @@ -118,6 +127,8 @@ class CPPKAFKA_API BufferedProducer { * wait for it to be acknowledged. * * \param message The message to be produced + * + * \remark This method throws cppkafka::HandleException on failure */ void produce(const Message& message); @@ -168,20 +179,11 @@ class CPPKAFKA_API BufferedProducer { size_t get_buffer_size() const; /** - * \brief Returns the total number of messages ack-ed by the broker + * \brief Returns the total number of messages ack-ed by the broker since the beginning * - * \return The total number of messages since the beginning or since the last roll-over - * - * \remark Call get_rollover_count() to get the number of times the counter has rolled over + * \return The number of messages */ size_t get_total_messages_acked() const; - - /** - * \brief Roll-over counter for get_total_messages_acked - * - * \return The number of rolls - */ - uint16_t get_rollover_count() const; /** * Gets the Producer object @@ -206,46 +208,67 @@ class CPPKAFKA_API BufferedProducer { * false. Note that if the callback return false, then the message will be discarded. * * \param callback The callback to be set + * + * \remark It is *highly* recommended to set this callback as your message may be produced + * indefinitely if there's a remote error. + * + * \warning Do not call any method on the BufferedProducer while inside this callback. */ void set_produce_failure_callback(ProduceFailureCallback callback); + /** + * \brief Sets the local message produce failure callback + * + * This callback will be called when local message production fails during a flush() operation. + * Failure errors are typically payload too large, unknown topic or unknown partition. + * Note that if the callback returns false, the message will be dropped from the buffer, + * otherwise it will be re-enqueued for later retry. + * + * \param callback + * + * \warning Do not call any method on the BufferedProducer while inside this callback + */ + void set_flush_failure_callback(FlushFailureCallback callback); + private: - using QueueType = std::queue; + using QueueType = std::list; + enum class MessagePriority { Low, High }; template - void do_add_message(BuilderType&& builder); + void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); template void produce_message(const MessageType& message); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); - + // Members Configuration::DeliveryReportCallback delivery_report_callback_; Producer producer_; QueueType messages_; mutable std::mutex mutex_; ProduceFailureCallback produce_failure_callback_; + FlushFailureCallback flush_failure_callback_; ssize_t max_buffer_size_{-1}; std::atomic_ulong expected_acks_{0}; std::atomic_ullong total_messages_acked_{0}; - std::atomic_ushort rollover_counter_{0}; }; template BufferedProducer::BufferedProducer(Configuration config) : delivery_report_callback_(config.get_delivery_report_callback()), producer_(prepare_configuration(std::move(config))) { - + // Allow re-queuing failed messages + producer_.set_payload_policy(Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD); } template void BufferedProducer::add_message(const MessageBuilder& builder) { - do_add_message(builder); + do_add_message(builder, MessagePriority::Low, true); } template void BufferedProducer::add_message(Builder builder) { - do_add_message(move(builder)); + do_add_message(move(builder), MessagePriority::Low, true); } template @@ -256,19 +279,27 @@ void BufferedProducer::produce(const MessageBuilder& builder) { template void BufferedProducer::produce(const Message& message) { produce_message(message); - expected_acks_++; } template void BufferedProducer::flush() { - size_t num_messages = messages_.size(); - while (num_messages--) { + QueueType flush_queue; // flush from temporary queue + { std::lock_guard lock(mutex_); - if (messages_.empty()) { - break; //perhaps clear() was called + std::swap(messages_, flush_queue); + } + while (!flush_queue.empty()) { + try { + produce_message(flush_queue.front()); } - produce_message(messages_.front()); - messages_.pop(); + catch (const HandleException& ex) { + if (flush_failure_callback_ && + flush_failure_callback_(flush_queue.front(), ex.get_error())) { + // retry again later + do_add_message(std::move(flush_queue.front()), MessagePriority::Low, false); + } + } + flush_queue.pop_front(); } wait_for_acks(); } @@ -318,12 +349,19 @@ size_t BufferedProducer::get_buffer_size() const { template template -void BufferedProducer::do_add_message(BuilderType&& builder) { +void BufferedProducer::do_add_message(BuilderType&& builder, + MessagePriority priority, + bool do_flush) { { std::lock_guard lock(mutex_); - messages_.push(std::move(builder)); + if (priority == MessagePriority::High) { + messages_.emplace_front(std::move(builder)); + } + else { + messages_.emplace_back(std::move(builder)); + } } - if ((max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { + if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { flush(); } } @@ -338,21 +376,11 @@ const Producer& BufferedProducer::get_producer() const { return producer_; } -template -size_t BufferedProducer::get_buffer_size() const { - return messages_.size(); -} - template size_t BufferedProducer::get_total_messages_acked() const { return total_messages_acked_; } -template -uint16_t BufferedProducer::get_rollover_count() const { - return rollover_counter_; -} - template typename BufferedProducer::Builder BufferedProducer::make_builder(std::string topic) { @@ -364,18 +392,23 @@ void BufferedProducer::set_produce_failure_callback(ProduceFailureCa produce_failure_callback_ = std::move(callback); } +template +void BufferedProducer::set_flush_failure_callback(FlushFailureCallback callback) { + flush_failure_callback_ = std::move(callback); +} + template template void BufferedProducer::produce_message(const MessageType& message) { - bool sent = false; - while (!sent) { + while (true) { try { producer_.produce(message); - sent = true; + // Sent successfully + ++expected_acks_; + break; } catch (const HandleException& ex) { - const Error error = ex.get_error(); - if (error == RD_KAFKA_RESP_ERR__QUEUE_FULL) { + if (ex.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) { // If the output queue is full, then just poll producer_.poll(); } @@ -384,8 +417,6 @@ void BufferedProducer::produce_message(const MessageType& message) { } } } - // Sent successfully - ++expected_acks_; } template @@ -412,14 +443,12 @@ void BufferedProducer::on_delivery_report(const Message& message) { bool should_produce = message.get_error() && (!produce_failure_callback_ || produce_failure_callback_(message)); if (should_produce) { - produce_message(message); - return; + // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) + do_add_message(Builder(message), MessagePriority::High, false); } - - // Increment the total successful transmissions - ++total_messages_acked_; - if (total_messages_acked_ == 0) { - ++rollover_counter_; + else { + // Increment the total successful transmissions + ++total_messages_acked_; } } diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index dde2a119..73c744a3 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -370,7 +370,6 @@ TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") { const auto& messages = runner.get_messages(); REQUIRE(messages.size() == num_messages); REQUIRE(producer.get_total_messages_acked() == num_messages); - REQUIRE(producer.get_rollover_count() == 0); REQUIRE(producer.get_buffer_size() == 0); } @@ -392,6 +391,5 @@ TEST_CASE("clear multi-threaded buffered producer", "[producer][buffered_produce } REQUIRE(producer.get_total_messages_acked() == 0); - REQUIRE(producer.get_rollover_count() == 0); REQUIRE(producer.get_buffer_size() < num_messages); } From 9e92f22248882242f7ea78ce7a6a4503c7254648 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Sat, 26 May 2018 10:51:35 -0400 Subject: [PATCH 4/5] Changes based on latest review --- include/cppkafka/message_builder.h | 5 +- include/cppkafka/utils/buffered_producer.h | 69 +++++++++++++++++----- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 80896455..59b33657 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -53,9 +53,6 @@ class BasicMessageBuilder { /** * Construct a BasicMessageBuilder from a Message object - * - * \remark The application must guarantee the lifetime of the Message exceeds that of this - * BasicMessageBuilder as this class does not take ownership of any Message buffers */ BasicMessageBuilder(const Message& message); @@ -191,6 +188,8 @@ BasicMessageBuilder::BasicMessageBuilder(const Message& message) : topic_(message.get_topic()), key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), + timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : + std::chrono::milliseconds(0)), user_data_(message.get_user_data()) { diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 04610848..83e2870d 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -31,7 +31,7 @@ #define CPPKAFKA_BUFFERED_PRODUCER_H #include -#include +#include #include #include #include @@ -52,15 +52,24 @@ namespace cppkafka { * to produce them just as you would using the Producer class. * * When calling either flush or wait_for_acks, the buffered producer will block until all - * produced messages (either in a buffer or non buffered way) are acknowledged by the kafka - * brokers. + * produced messages (either buffered or sent directly) are acknowledged by the kafka brokers. * * When producing messages, this class will handle cases where the producer's queue is full so it * will poll until the production is successful. * * \remark This class is thread safe * - * \warning The application *MUST NOT* change the payload policy on the underlying Producer object. + * \warning + * Delivery Report Callback: This class makes internal use of this function and will overwrite anything + * the user has supplied as part of the configuration options. Instead user should call + * set_produce_success_callback() and set_produce_failure_callback() respectively. + * + * Payload Policy: For payload-owning BufferTypes such as std::string or std::vector the default + * policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type + * cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka + * shall not make any internal copies of the message and it is the application's responsability to free + * the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory + * corruptions. */ template class CPPKAFKA_API BufferedProducer { @@ -69,6 +78,11 @@ class CPPKAFKA_API BufferedProducer { * Concrete builder */ using Builder = ConcreteMessageBuilder; + + /** + * Callback to indicate a message was delivered to the broker + */ + using ProduceSuccessCallback = std::function; /** * Callback to indicate a message failed to be produced by the broker @@ -137,6 +151,10 @@ class CPPKAFKA_API BufferedProducer { * * This will send all messages and keep waiting until all of them are acknowledged (this is * done by calling wait_for_acks). + * + * \remark Although it is possible to call flush from multiple threads concurrently, better + * performance is achieved when called from the same thread or when serialized + * with respect to other threads. */ void flush(); @@ -216,6 +234,15 @@ class CPPKAFKA_API BufferedProducer { */ void set_produce_failure_callback(ProduceFailureCallback callback); + /** + * \brief Sets the successful delivery callback + * + * The user can use this function to cleanup any application-owned message buffers. + * + * \param callback The callback to be set + */ + void set_produce_success_callback(ProduceSuccessCallback callback); + /** * \brief Sets the local message produce failure callback * @@ -231,7 +258,7 @@ class CPPKAFKA_API BufferedProducer { void set_flush_failure_callback(FlushFailureCallback callback); private: - using QueueType = std::list; + using QueueType = std::deque; enum class MessagePriority { Low, High }; template @@ -242,10 +269,10 @@ class CPPKAFKA_API BufferedProducer { void on_delivery_report(const Message& message); // Members - Configuration::DeliveryReportCallback delivery_report_callback_; Producer producer_; QueueType messages_; mutable std::mutex mutex_; + ProduceSuccessCallback produce_success_callback_; ProduceFailureCallback produce_failure_callback_; FlushFailureCallback flush_failure_callback_; ssize_t max_buffer_size_{-1}; @@ -253,12 +280,20 @@ class CPPKAFKA_API BufferedProducer { std::atomic_ullong total_messages_acked_{0}; }; +template +Producer::PayloadPolicy get_default_payload_policy() { + return Producer::PayloadPolicy::COPY_PAYLOAD; +} + +template <> inline +Producer::PayloadPolicy get_default_payload_policy() { + return Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD; +} + template BufferedProducer::BufferedProducer(Configuration config) -: delivery_report_callback_(config.get_delivery_report_callback()), - producer_(prepare_configuration(std::move(config))) { - // Allow re-queuing failed messages - producer_.set_payload_policy(Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD); +: producer_(prepare_configuration(std::move(config))) { + producer_.set_payload_policy(get_default_payload_policy()); } template @@ -392,6 +427,11 @@ void BufferedProducer::set_produce_failure_callback(ProduceFailureCa produce_failure_callback_ = std::move(callback); } +template +void BufferedProducer::set_produce_success_callback(ProduceSuccessCallback callback) { + produce_success_callback_ = std::move(callback); +} + template void BufferedProducer::set_flush_failure_callback(FlushFailureCallback callback) { flush_failure_callback_ = std::move(callback); @@ -433,11 +473,6 @@ void BufferedProducer::on_delivery_report(const Message& message) { --expected_acks_; assert(expected_acks_ != (unsigned long)-1); // Prevent underflow - // Call the user-supplied delivery report callback if any - if (delivery_report_callback_) { - delivery_report_callback_(producer_, message); - } - // We should produce this message again if it has an error and we either don't have a // produce failure callback or we have one but it returns true bool should_produce = message.get_error() && @@ -447,6 +482,10 @@ void BufferedProducer::on_delivery_report(const Message& message) { do_add_message(Builder(message), MessagePriority::High, false); } else { + // Successful delivery + if (produce_success_callback_) { + produce_success_callback_(message); + } // Increment the total successful transmissions ++total_messages_acked_; } From 6e0247f20b2fd357cc34a25cec45fb526e01a3f6 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Sun, 27 May 2018 11:20:36 -0400 Subject: [PATCH 5/5] Added flush counter --- include/cppkafka/utils/buffered_producer.h | 82 +++++++++++++++++----- tests/producer_test.cpp | 8 ++- 2 files changed, 70 insertions(+), 20 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 83e2870d..bfe9c630 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -57,15 +57,25 @@ namespace cppkafka { * When producing messages, this class will handle cases where the producer's queue is full so it * will poll until the production is successful. * - * \remark This class is thread safe + * \remark This class is thread safe. * - * \warning - * Delivery Report Callback: This class makes internal use of this function and will overwrite anything - * the user has supplied as part of the configuration options. Instead user should call - * set_produce_success_callback() and set_produce_failure_callback() respectively. + * \remark Releasing buffers: For high-performance applications preferring a zero-copy approach + * (using PayloadPolicy::PASSTHROUGH_PAYLOAD - see warning below) it is very important to know when + * to safely release owned message buffers. One way is to perform individual cleanup when + * ProduceSuccessCallback is called. If the application produces messages in batches or has a + * bursty behavior another way is to check when flush operations have fully completed with + * get_buffer_size()==0 && get_flushes_in_progress()==0. Note that get_pending_acks()==0 + * is not always a guarantee as there is very small window when flush() starts where + * get_buffer_size()==0 && get_pending_acks()==0 but messages have not yet been sent to the + * remote broker. For applications producing messages w/o buffering, get_pending_acks()==0 + * is sufficient. * - * Payload Policy: For payload-owning BufferTypes such as std::string or std::vector the default - * policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type + * \warning Delivery Report Callback: This class makes internal use of this function and will + * overwrite anything the user has supplied as part of the configuration options. Instead user + * should call set_produce_success_callback() and set_produce_failure_callback() respectively. + * + * \warning Payload Policy: For payload-owning BufferTypes such as std::string or std::vector + * the default policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type * cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka * shall not make any internal copies of the message and it is the application's responsability to free * the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory @@ -197,11 +207,28 @@ class CPPKAFKA_API BufferedProducer { size_t get_buffer_size() const; /** - * \brief Returns the total number of messages ack-ed by the broker since the beginning + * \brief Get the number of messages not yet acked by the broker * * \return The number of messages */ - size_t get_total_messages_acked() const; + size_t get_pending_acks() const; + + /** + * \brief Get the total number of messages successfully produced since the beginning + * + * \return The number of messages + */ + size_t get_total_messages_produced() const; + + /** + * \brief Get the total outstanding flush operations in progress + * + * Since flush can be called from multiple threads concurrently, this counter indicates + * how many operations are curretnly in progress. + * + * \return The number of outstanding flush operations. + */ + size_t get_flushes_in_progress() const; /** * Gets the Producer object @@ -260,6 +287,13 @@ class CPPKAFKA_API BufferedProducer { private: using QueueType = std::deque; enum class MessagePriority { Low, High }; + + template + struct CounterGuard{ + CounterGuard(std::atomic& counter) : counter_(counter) { ++counter_; } + ~CounterGuard() { --counter_; } + std::atomic& counter_; + }; template void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); @@ -276,8 +310,9 @@ class CPPKAFKA_API BufferedProducer { ProduceFailureCallback produce_failure_callback_; FlushFailureCallback flush_failure_callback_; ssize_t max_buffer_size_{-1}; - std::atomic_ulong expected_acks_{0}; - std::atomic_ullong total_messages_acked_{0}; + std::atomic pending_acks_{0}; + std::atomic flushes_in_progress_{0}; + std::atomic total_messages_produced_{0}; }; template @@ -318,6 +353,7 @@ void BufferedProducer::produce(const Message& message) { template void BufferedProducer::flush() { + CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue { std::lock_guard lock(mutex_); @@ -341,7 +377,7 @@ void BufferedProducer::flush() { template void BufferedProducer::wait_for_acks() { - while (expected_acks_ > 0) { + while (pending_acks_ > 0) { try { producer_.flush(); } @@ -412,8 +448,18 @@ const Producer& BufferedProducer::get_producer() const { } template -size_t BufferedProducer::get_total_messages_acked() const { - return total_messages_acked_; +size_t BufferedProducer::get_pending_acks() const { + return pending_acks_; +} + +template +size_t BufferedProducer::get_total_messages_produced() const { + return total_messages_produced_; +} + +template +size_t BufferedProducer::get_flushes_in_progress() const { + return flushes_in_progress_; } template @@ -444,7 +490,7 @@ void BufferedProducer::produce_message(const MessageType& message) { try { producer_.produce(message); // Sent successfully - ++expected_acks_; + ++pending_acks_; break; } catch (const HandleException& ex) { @@ -470,8 +516,8 @@ Configuration BufferedProducer::prepare_configuration(Configuration template void BufferedProducer::on_delivery_report(const Message& message) { // Decrement the expected acks - --expected_acks_; - assert(expected_acks_ != (unsigned long)-1); // Prevent underflow + --pending_acks_; + assert(pending_acks_ != (size_t)-1); // Prevent underflow // We should produce this message again if it has an error and we either don't have a // produce failure callback or we have one but it returns true @@ -487,7 +533,7 @@ void BufferedProducer::on_delivery_report(const Message& message) { produce_success_callback_(message); } // Increment the total successful transmissions - ++total_messages_acked_; + ++total_messages_produced_; } } diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 73c744a3..c388bab1 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -369,7 +369,9 @@ TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") { } const auto& messages = runner.get_messages(); REQUIRE(messages.size() == num_messages); - REQUIRE(producer.get_total_messages_acked() == num_messages); + REQUIRE(producer.get_flushes_in_progress() == 0); + REQUIRE(producer.get_pending_acks() == 0); + REQUIRE(producer.get_total_messages_produced() == num_messages); REQUIRE(producer.get_buffer_size() == 0); } @@ -390,6 +392,8 @@ TEST_CASE("clear multi-threaded buffered producer", "[producer][buffered_produce thread.join(); } - REQUIRE(producer.get_total_messages_acked() == 0); + REQUIRE(producer.get_total_messages_produced() == 0); + REQUIRE(producer.get_flushes_in_progress() == 0); + REQUIRE(producer.get_pending_acks() == 0); REQUIRE(producer.get_buffer_size() < num_messages); }