diff --git a/CMakeLists.txt b/CMakeLists.txt index 30690fe4..1ef0f337 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,8 +24,18 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib) # Build output checks +option(CPPKAFKA_CMAKE_VERBOSE "Generate verbose output." OFF) option(CPPKAFKA_BUILD_SHARED "Build cppkafka as a shared library." ON) option(CPPKAFKA_DISABLE_TESTS "Disable build of cppkafka tests." OFF) +option(CPPKAFKA_DISABLE_EXAMPLES "Disable build of cppkafka examples." OFF) +option(CPPKAFKA_BOOST_STATIC_LIBS "Link with Boost static libraries." ON) +option(CPPKAFKA_BOOST_USE_MULTITHREADED "Use Boost multithreaded libraries." ON) + +# Disable output from find_package macro +if (NOT CPPKAFKA_CMAKE_VERBOSE) + set(FIND_PACKAGE_QUIET QUIET) +endif() + if(CPPKAFKA_BUILD_SHARED) message(STATUS "Build will generate a shared library. " "Use CPPKAFKA_BUILD_SHARED=0 to perform a static build") @@ -37,16 +47,36 @@ else() endif() # Look for Boost (just need boost.optional headers here) -find_package(Boost REQUIRED) -find_package(RdKafka REQUIRED) +find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET}) +find_package(RdKafka REQUIRED ${FIND_PACKAGE_QUIET}) + +if (Boost_FOUND) + find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET}) + set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS}) + set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED}) + include_directories(${Boost_INCLUDE_DIRS}) + link_directories(${Boost_LIBRARY_DIRS}) + if (CPPKAFKA_CMAKE_VERBOSE) + message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}") + message(STATUS "Boost library dir: ${Boost_LIBRARY_DIRS}") + message(STATUS "Boost use static libs: ${Boost_USE_STATIC_LIBS}") + message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}") + message(STATUS "Boost libraries: ${Boost_LIBRARIES}") + endif() +endif() add_subdirectory(src) add_subdirectory(include) -add_subdirectory(examples) +# Examples target +if (NOT CPPKAFKA_DISABLE_EXAMPLES AND Boost_PROGRAM_OPTIONS_FOUND) + add_subdirectory(examples) +else() + message(STATUS "Disabling examples") +endif() # Add a target to generate API documentation using Doxygen -find_package(Doxygen QUIET) +find_package(Doxygen ${FIND_PACKAGE_QUIET}) if(DOXYGEN_FOUND) configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/docs/Doxyfile.in @@ -65,12 +95,13 @@ if(NOT CPPKAFKA_DISABLE_TESTS) set(CATCH_ROOT ${CMAKE_SOURCE_DIR}/third_party/Catch2) if(EXISTS ${CATCH_ROOT}/CMakeLists.txt) set(CATCH_INCLUDE ${CATCH_ROOT}/single_include) - enable_testing() add_subdirectory(tests) else() message(STATUS "Disabling tests because submodule Catch2 isn't checked out") endif() +else() + message(STATUS "Disabling tests") endif() if(NOT TARGET uninstall) diff --git a/README.md b/README.md index 5658b9bd..a2947a2b 100644 --- a/README.md +++ b/README.md @@ -55,25 +55,32 @@ In order to compile _cppkafka_ you need: * _CMake_ * A compiler with good C++11 support (e.g. gcc >= 4.8). This was tested successfully on _g++ 4.8.3_. -* The boost library. _cppkafka_ only requires boost.optional, which is a header only library, -so this doesn't add any additional runtime dependencies. +* The boost library. Now, in order to build, just run: ```Shell mkdir build cd build -cmake .. +cmake .. make ``` ## CMake options -If you have installed _librdkafka_ on a non standard directory, you can use the -`RDKAFKA_ROOT_DIR` cmake parameter when configuring the project: - +The following cmake options can be specified: +* `RDKAFKA_ROOT_DIR` : Specify a different librdkafka install directory. +* `BOOST_ROOT` : Specify a different Boost install directory. +* `CPPKAFKA_CMAKE_VERBOSE` : Generate verbose output. Default is `OFF`. +* `CPPKAFKA_BUILD_SHARED` : Build cppkafka as a shared library. Default is `ON`. +* `CPPKAFKA_DISABLE_TESTS` : Disable build of cppkafka tests. Default is `OFF`. +* `CPPKAFKA_DISABLE_EXAMPLES` : Disable build of cppkafka examples. Default is `OFF`. +* `CPPKAFKA_BOOST_STATIC_LIBS` : Link with Boost static libraries. Default is `ON`. +* `CPPKAFKA_BOOST_USE_MULTITHREADED` : Use Boost multi-threaded libraries. Default is `ON`. + +Example: ```Shell -cmake .. -DRDKAFKA_ROOT_DIR=/some/other/dir +cmake -DRDKAFKA_ROOT_DIR=/some/other/dir -DCPPKAFKA_BUILD_SHARED=OFF ... ``` Note that the `RDKAFKA_ROOT_DIR` must contain the following structure: @@ -86,13 +93,6 @@ ${RDKAFKA_ROOT_DIR}/ + lib/librdkafka.a ``` -By default, a shared library will be built. If you want to perform a static build, -use the `CPPKAFKA_BUILD_SHARED` parameter: - -```Shell -cmake .. -DCPPKAFKA_BUILD_SHARED=0 -``` - # Using If you want to use _cppkafka_, you'll need to link your application with: @@ -108,4 +108,3 @@ _Doxygen_ to be installed. The documentation will be written in html format at Make sure to check the [wiki](https://github.com/mfontanini/cppkafka/wiki) which includes some documentation about the project and some of its features. - diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5ca1ce97..9b73941c 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,22 +1,15 @@ -find_package(Boost COMPONENTS program_options) - -if (Boost_PROGRAM_OPTIONS_FOUND) - link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY}) - - include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) - include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) - - add_custom_target(examples) - macro(create_example example_name) - add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp") - add_dependencies(examples ${example_name}) - endmacro() - - create_example(kafka_producer) - create_example(kafka_consumer) - create_example(kafka_consumer_dispatcher) - create_example(metadata) - create_example(consumers_information) -else() - message(STATUS "Disabling examples since boost.program_options was not found") -endif() +link_libraries(cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) +include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR}) + +add_custom_target(examples) +macro(create_example example_name) + add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp") + add_dependencies(examples ${example_name}) +endmacro() + +create_example(kafka_producer) +create_example(kafka_consumer) +create_example(kafka_consumer_dispatcher) +create_example(metadata) +create_example(consumers_information) diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 9e918846..59b33657 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -34,6 +34,7 @@ #include "buffer.h" #include "topic.h" #include "macros.h" +#include "message.h" namespace cppkafka { @@ -49,6 +50,11 @@ class BasicMessageBuilder { * \param topic The topic into which this message would be produced */ BasicMessageBuilder(std::string topic); + + /** + * Construct a BasicMessageBuilder from a Message object + */ + BasicMessageBuilder(const Message& message); /** * \brief Construct a message builder from another one that uses a different buffer type @@ -177,6 +183,18 @@ BasicMessageBuilder::BasicMessageBuilder(std::string topic) : topic_(std::move(topic)) { } +template +BasicMessageBuilder::BasicMessageBuilder(const Message& message) +: topic_(message.get_topic()), + key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), + payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), + timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : + std::chrono::milliseconds(0)), + user_data_(message.get_user_data()) +{ + +} + template template BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& rhs) diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 404fbe4f..358a0fc6 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -82,6 +82,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase { * The policy to use for the payload. The default policy is COPY_PAYLOAD */ enum class PayloadPolicy { + PASSTHROUGH_PAYLOAD = 0, ///< Rdkafka will not copy nor free the payload. COPY_PAYLOAD = RD_KAFKA_MSG_F_COPY, ///< Means RD_KAFKA_MSG_F_COPY FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE }; diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index c6cb51f0..bfe9c630 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -31,12 +31,14 @@ #define CPPKAFKA_BUFFERED_PRODUCER_H #include -#include +#include #include #include #include #include #include +#include +#include #include #include "../producer.h" #include "../message.h" @@ -50,13 +52,34 @@ namespace cppkafka { * to produce them just as you would using the Producer class. * * When calling either flush or wait_for_acks, the buffered producer will block until all - * produced messages (either in a buffer or non buffered way) are acknowledged by the kafka - * brokers. + * produced messages (either buffered or sent directly) are acknowledged by the kafka brokers. * - * When producing messages, this class will handle cases where the producer's queue is full so it\ + * When producing messages, this class will handle cases where the producer's queue is full so it * will poll until the production is successful. * - * This class is not thread safe. + * \remark This class is thread safe. + * + * \remark Releasing buffers: For high-performance applications preferring a zero-copy approach + * (using PayloadPolicy::PASSTHROUGH_PAYLOAD - see warning below) it is very important to know when + * to safely release owned message buffers. One way is to perform individual cleanup when + * ProduceSuccessCallback is called. If the application produces messages in batches or has a + * bursty behavior another way is to check when flush operations have fully completed with + * get_buffer_size()==0 && get_flushes_in_progress()==0. Note that get_pending_acks()==0 + * is not always a guarantee as there is very small window when flush() starts where + * get_buffer_size()==0 && get_pending_acks()==0 but messages have not yet been sent to the + * remote broker. For applications producing messages w/o buffering, get_pending_acks()==0 + * is sufficient. + * + * \warning Delivery Report Callback: This class makes internal use of this function and will + * overwrite anything the user has supplied as part of the configuration options. Instead user + * should call set_produce_success_callback() and set_produce_failure_callback() respectively. + * + * \warning Payload Policy: For payload-owning BufferTypes such as std::string or std::vector + * the default policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type + * cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka + * shall not make any internal copies of the message and it is the application's responsability to free + * the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory + * corruptions. */ template class CPPKAFKA_API BufferedProducer { @@ -65,11 +88,21 @@ class CPPKAFKA_API BufferedProducer { * Concrete builder */ using Builder = ConcreteMessageBuilder; + + /** + * Callback to indicate a message was delivered to the broker + */ + using ProduceSuccessCallback = std::function; /** - * Callback to indicate a message failed to be produced. + * Callback to indicate a message failed to be produced by the broker */ using ProduceFailureCallback = std::function; + + /** + * Callback to indicate a message failed to be flushed + */ + using FlushFailureCallback = std::function; /** * \brief Constructs a buffered producer using the provided configuration @@ -106,6 +139,8 @@ class CPPKAFKA_API BufferedProducer { * wait for it to be acknowledged. * * \param builder The builder that contains the message to be produced + * + * \remark This method throws cppkafka::HandleException on failure */ void produce(const MessageBuilder& builder); @@ -116,6 +151,8 @@ class CPPKAFKA_API BufferedProducer { * wait for it to be acknowledged. * * \param message The message to be produced + * + * \remark This method throws cppkafka::HandleException on failure */ void produce(const Message& message); @@ -124,6 +161,10 @@ class CPPKAFKA_API BufferedProducer { * * This will send all messages and keep waiting until all of them are acknowledged (this is * done by calling wait_for_acks). + * + * \remark Although it is possible to call flush from multiple threads concurrently, better + * performance is achieved when called from the same thread or when serialized + * with respect to other threads. */ void flush(); @@ -159,11 +200,35 @@ class CPPKAFKA_API BufferedProducer { ssize_t get_max_buffer_size() const; /** - * \brief Get the number of messages in the buffer + * \brief Get the number of unsent messages in the buffer * * \return The number of messages */ size_t get_buffer_size() const; + + /** + * \brief Get the number of messages not yet acked by the broker + * + * \return The number of messages + */ + size_t get_pending_acks() const; + + /** + * \brief Get the total number of messages successfully produced since the beginning + * + * \return The number of messages + */ + size_t get_total_messages_produced() const; + + /** + * \brief Get the total outstanding flush operations in progress + * + * Since flush can be called from multiple threads concurrently, this counter indicates + * how many operations are curretnly in progress. + * + * \return The number of outstanding flush operations. + */ + size_t get_flushes_in_progress() const; /** * Gets the Producer object @@ -188,69 +253,131 @@ class CPPKAFKA_API BufferedProducer { * false. Note that if the callback return false, then the message will be discarded. * * \param callback The callback to be set + * + * \remark It is *highly* recommended to set this callback as your message may be produced + * indefinitely if there's a remote error. + * + * \warning Do not call any method on the BufferedProducer while inside this callback. */ void set_produce_failure_callback(ProduceFailureCallback callback); + + /** + * \brief Sets the successful delivery callback + * + * The user can use this function to cleanup any application-owned message buffers. + * + * \param callback The callback to be set + */ + void set_produce_success_callback(ProduceSuccessCallback callback); + + /** + * \brief Sets the local message produce failure callback + * + * This callback will be called when local message production fails during a flush() operation. + * Failure errors are typically payload too large, unknown topic or unknown partition. + * Note that if the callback returns false, the message will be dropped from the buffer, + * otherwise it will be re-enqueued for later retry. + * + * \param callback + * + * \warning Do not call any method on the BufferedProducer while inside this callback + */ + void set_flush_failure_callback(FlushFailureCallback callback); + private: - using QueueType = std::queue; + using QueueType = std::deque; + enum class MessagePriority { Low, High }; + + template + struct CounterGuard{ + CounterGuard(std::atomic& counter) : counter_(counter) { ++counter_; } + ~CounterGuard() { --counter_; } + std::atomic& counter_; + }; template - void do_add_message(BuilderType&& builder); + void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); template void produce_message(const MessageType& message); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); - - Configuration::DeliveryReportCallback delivery_report_callback_; + // Members Producer producer_; QueueType messages_; + mutable std::mutex mutex_; + ProduceSuccessCallback produce_success_callback_; ProduceFailureCallback produce_failure_callback_; - size_t expected_acks_{0}; - size_t messages_acked_{0}; + FlushFailureCallback flush_failure_callback_; ssize_t max_buffer_size_{-1}; + std::atomic pending_acks_{0}; + std::atomic flushes_in_progress_{0}; + std::atomic total_messages_produced_{0}; }; template -BufferedProducer::BufferedProducer(Configuration config) -: delivery_report_callback_(config.get_delivery_report_callback()), - producer_(prepare_configuration(std::move(config))) { +Producer::PayloadPolicy get_default_payload_policy() { + return Producer::PayloadPolicy::COPY_PAYLOAD; +} +template <> inline +Producer::PayloadPolicy get_default_payload_policy() { + return Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD; +} + +template +BufferedProducer::BufferedProducer(Configuration config) +: producer_(prepare_configuration(std::move(config))) { + producer_.set_payload_policy(get_default_payload_policy()); } template void BufferedProducer::add_message(const MessageBuilder& builder) { - do_add_message(builder); + do_add_message(builder, MessagePriority::Low, true); } template void BufferedProducer::add_message(Builder builder) { - do_add_message(move(builder)); + do_add_message(move(builder), MessagePriority::Low, true); } template void BufferedProducer::produce(const MessageBuilder& builder) { produce_message(builder); - expected_acks_++; } template void BufferedProducer::produce(const Message& message) { produce_message(message); - expected_acks_++; } template void BufferedProducer::flush() { - while (!messages_.empty()) { - produce_message(messages_.front()); - messages_.pop(); + CounterGuard counter_guard(flushes_in_progress_); + QueueType flush_queue; // flush from temporary queue + { + std::lock_guard lock(mutex_); + std::swap(messages_, flush_queue); + } + while (!flush_queue.empty()) { + try { + produce_message(flush_queue.front()); + } + catch (const HandleException& ex) { + if (flush_failure_callback_ && + flush_failure_callback_(flush_queue.front(), ex.get_error())) { + // retry again later + do_add_message(std::move(flush_queue.front()), MessagePriority::Low, false); + } + } + flush_queue.pop_front(); } wait_for_acks(); } template void BufferedProducer::wait_for_acks() { - while (messages_acked_ < expected_acks_) { + while (pending_acks_ > 0) { try { producer_.flush(); } @@ -264,16 +391,13 @@ void BufferedProducer::wait_for_acks() { } } } - expected_acks_ = 0; - messages_acked_ = 0; } template void BufferedProducer::clear() { + std::lock_guard lock(mutex_); QueueType tmp; std::swap(tmp, messages_); - expected_acks_ = 0; - messages_acked_ = 0; } template @@ -296,10 +420,19 @@ size_t BufferedProducer::get_buffer_size() const { template template -void BufferedProducer::do_add_message(BuilderType&& builder) { - expected_acks_++; - messages_.push(std::forward(builder)); - if ((max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { +void BufferedProducer::do_add_message(BuilderType&& builder, + MessagePriority priority, + bool do_flush) { + { + std::lock_guard lock(mutex_); + if (priority == MessagePriority::High) { + messages_.emplace_front(std::move(builder)); + } + else { + messages_.emplace_back(std::move(builder)); + } + } + if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { flush(); } } @@ -314,6 +447,21 @@ const Producer& BufferedProducer::get_producer() const { return producer_; } +template +size_t BufferedProducer::get_pending_acks() const { + return pending_acks_; +} + +template +size_t BufferedProducer::get_total_messages_produced() const { + return total_messages_produced_; +} + +template +size_t BufferedProducer::get_flushes_in_progress() const { + return flushes_in_progress_; +} + template typename BufferedProducer::Builder BufferedProducer::make_builder(std::string topic) { @@ -325,18 +473,28 @@ void BufferedProducer::set_produce_failure_callback(ProduceFailureCa produce_failure_callback_ = std::move(callback); } +template +void BufferedProducer::set_produce_success_callback(ProduceSuccessCallback callback) { + produce_success_callback_ = std::move(callback); +} + +template +void BufferedProducer::set_flush_failure_callback(FlushFailureCallback callback) { + flush_failure_callback_ = std::move(callback); +} + template template void BufferedProducer::produce_message(const MessageType& message) { - bool sent = false; - while (!sent) { + while (true) { try { producer_.produce(message); - sent = true; + // Sent successfully + ++pending_acks_; + break; } catch (const HandleException& ex) { - const Error error = ex.get_error(); - if (error == RD_KAFKA_RESP_ERR__QUEUE_FULL) { + if (ex.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) { // If the output queue is full, then just poll producer_.poll(); } @@ -357,21 +515,26 @@ Configuration BufferedProducer::prepare_configuration(Configuration template void BufferedProducer::on_delivery_report(const Message& message) { - // Call the user-supplied delivery report callback if any - if (delivery_report_callback_) { - delivery_report_callback_(producer_, message); - } + // Decrement the expected acks + --pending_acks_; + assert(pending_acks_ != (size_t)-1); // Prevent underflow + // We should produce this message again if it has an error and we either don't have a // produce failure callback or we have one but it returns true bool should_produce = message.get_error() && (!produce_failure_callback_ || produce_failure_callback_(message)); if (should_produce) { - produce_message(message); - return; + // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) + do_add_message(Builder(message), MessagePriority::High, false); + } + else { + // Successful delivery + if (produce_success_callback_) { + produce_success_callback_(message); + } + // Increment the total successful transmissions + ++total_messages_produced_; } - // If production was successful or the produce failure callback returned false, then - // let's consider it to be acked - messages_acked_++; } } // cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 317f7cd7..a423739c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,6 +1,6 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/) include_directories(SYSTEM ${CATCH_INCLUDE}) -include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) +include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR}) set(KAFKA_TEST_INSTANCE "kafka-vm:9092" CACHE STRING "The kafka instance to which to connect to run tests") diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index cda960dd..c388bab1 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -12,17 +12,19 @@ using std::string; using std::to_string; using std::set; +using std::vector; using std::tie; using std::move; using std::thread; +namespace this_thread = std::this_thread; using std::mutex; using std::unique_lock; using std::lock_guard; using std::condition_variable; - using std::chrono::system_clock; using std::chrono::seconds; using std::chrono::milliseconds; +using std::ref; using namespace cppkafka; @@ -48,6 +50,44 @@ static Configuration make_consumer_config() { return config; } +void producer_run(BufferedProducer& producer, + int& exit_flag, condition_variable& clear, + int num_messages, + int partition) { + MessageBuilder builder(KAFKA_TOPIC); + string key("wassup?"); + string payload("nothing much!"); + + builder.partition(partition).key(key).payload(payload); + for (int i = 0; i < num_messages; ++i) { + if (i == num_messages/2) { + clear.notify_one(); + } + producer.add_message(builder); + this_thread::sleep_for(milliseconds(10)); + } + exit_flag = 1; +} + +void flusher_run(BufferedProducer& producer, + int& exit_flag, + int num_flush) { + while (!exit_flag) { + if (producer.get_buffer_size() >= (size_t)num_flush) { + producer.flush(); + } + } + producer.flush(); +} + +void clear_run(BufferedProducer& producer, + condition_variable& clear) { + mutex m; + unique_lock lock(m); + clear.wait(lock); + producer.clear(); +} + TEST_CASE("simple production", "[producer]") { int partition = 0; @@ -234,7 +274,7 @@ TEST_CASE("multiple messages", "[producer]") { } } -TEST_CASE("buffered producer", "[producer]") { +TEST_CASE("buffered producer", "[producer][buffered_producer]") { int partition = 0; // Create a consumer and assign this topic/partition @@ -303,3 +343,57 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") { const auto& messages = runner.get_messages(); REQUIRE(messages.size() == producer.get_max_buffer_size()); } + +TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") { + int partition = 0; + vector threads; + int num_messages = 50; + int num_flush = 10; + int exit_flag = 0; + condition_variable clear; + + // Create a consumer and assign this topic/partition + Consumer consumer(make_consumer_config()); + consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + ConsumerRunner runner(consumer, num_messages, 1); + + BufferedProducer producer(make_producer_config()); + + threads.push_back(thread(producer_run, ref(producer), ref(exit_flag), ref(clear), num_messages, partition)); + threads.push_back(thread(flusher_run, ref(producer), ref(exit_flag), num_flush)); + + // Wait for completion + runner.try_join(); + for (auto&& thread : threads) { + thread.join(); + } + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == num_messages); + REQUIRE(producer.get_flushes_in_progress() == 0); + REQUIRE(producer.get_pending_acks() == 0); + REQUIRE(producer.get_total_messages_produced() == num_messages); + REQUIRE(producer.get_buffer_size() == 0); +} + +TEST_CASE("clear multi-threaded buffered producer", "[producer][buffered_producer]") { + int partition = 0; + vector threads; + int num_messages = 50; + int exit_flag = 0; + condition_variable clear; + + BufferedProducer producer(make_producer_config()); + + threads.push_back(thread(producer_run, ref(producer), ref(exit_flag), ref(clear), num_messages, partition)); + threads.push_back(thread(clear_run, ref(producer), ref(clear))); + + // Wait for completion + for (auto&& thread : threads) { + thread.join(); + } + + REQUIRE(producer.get_total_messages_produced() == 0); + REQUIRE(producer.get_flushes_in_progress() == 0); + REQUIRE(producer.get_pending_acks() == 0); + REQUIRE(producer.get_buffer_size() < num_messages); +}