diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index ad5bece7..61228fe0 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -35,7 +35,7 @@ #include #include #include "kafka_handle_base.h" -#include "message.h" +#include "queue.h" #include "macros.h" #include "error.h" @@ -54,7 +54,7 @@ class TopicConfiguration; * Semi-simple code showing how to use this class * * \code - * // Create a configuration and set the group.id and broker list fields + * // Create a configuration and set the group.id and broker list fields * Configuration config = { * { "metadata.broker.list", "127.0.0.1:9092" }, * { "group.id", "foo" } @@ -74,13 +74,13 @@ class TopicConfiguration; * consumer.set_revocation_callback([&](const TopicPartitionList& topic_partitions) { * cout << topic_partitions.size() << " partitions revoked!" << endl; * }); - * - * // Subscribe + * + * // Subscribe * consumer.subscribe({ "my_topic" }); * while (true) { * // Poll. This will optionally return a message. It's necessary to check if it's a valid * // one before using it - * Message msg = consumer.poll(); + * Message msg = consumer.poll(); * if (msg) { * if (!msg.get_error()) { * // It's an actual message. Get the payload and print it to stdout @@ -103,12 +103,12 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { /** * \brief Creates an instance of a consumer. - * - * Note that the configuration *must contain* the group.id attribute set or this + * + * Note that the configuration *must contain* the group.id attribute set or this * will throw. * * \param config The configuration to be used - */ + */ Consumer(Configuration config); Consumer(const Consumer&) = delete; Consumer(Consumer&&) = delete; @@ -116,7 +116,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { Consumer& operator=(Consumer&&) = delete; /** - * \brief Closes and estroys the rdkafka handle + * \brief Closes and destroys the rdkafka handle * * This will call Consumer::close before destroying the handle */ @@ -124,7 +124,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { /** * \brief Sets the topic/partition assignment callback - * + * * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the * rebalance, converting from rdkafka topic partition list handles into TopicPartitionList * and executing the assignment/revocation/rebalance_error callbacks. @@ -138,7 +138,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { /** * \brief Sets the topic/partition revocation callback - * + * * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the * rebalance, converting from rdkafka topic partition list handles into TopicPartitionList * and executing the assignment/revocation/rebalance_error callbacks. @@ -153,7 +153,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { /** * \brief Sets the rebalance error callback - * + * * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the * rebalance, converting from rdkafka topic partition list handles into TopicPartitionList * and executing the assignment/revocation/rebalance_error callbacks. @@ -188,9 +188,9 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { /** * \brief Unassigns the current topic/partition assignment * - * This translates into a call to rd_kafka_assign using a null as the topic partition list + * This translates into a call to rd_kafka_assign using a null as the topic partition list * parameter - */ + */ void unassign(); /** @@ -262,7 +262,9 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { * * This translates into a call to rd_kafka_get_watermark_offsets * - * \param topic_partition The topic/partition to get the offsets from + * \param topic_partition The topic/partition to get the offsets from + * + * \return A pair of offsets {low, high} */ OffsetTuple get_offsets(const TopicPartition& topic_partition) const; @@ -272,6 +274,8 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { * This translates into a call to rd_kafka_committed * * \param topic_partitions The topic/partition list to be queried + * + * \return The topic partition list */ TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const; @@ -281,6 +285,8 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { * This translates into a call to rd_kafka_position * * \param topic_partitions The topic/partition list to be queried + * + * \return The topic partition list */ TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const; @@ -295,6 +301,8 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { * \brief Gets the current topic/partition list assignment * * This translates to a call to rd_kafka_assignment + * + * \return The topic partition list */ TopicPartitionList get_assignment() const; @@ -302,21 +310,29 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { * \brief Gets the group member id * * This translates to a call to rd_kafka_memberid + * + * \return The id */ std::string get_member_id() const; /** - * Gets the partition assignment callback. + * \brief Gets the partition assignment callback. + * + * \return The callback reference */ const AssignmentCallback& get_assignment_callback() const; /** - * Gets the partition revocation callback. + * \brief Gets the partition revocation callback. + * + * \return The callback reference */ const RevocationCallback& get_revocation_callback() const; /** - * Gets the rebalance error callback. + * \brief Gets the rebalance error callback. + * + * \return The callback reference */ const RebalanceErrorCallback& get_rebalance_error_callback() const; @@ -326,16 +342,16 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { * This will call rd_kafka_consumer_poll. * * Note that you need to call poll periodically as a keep alive mechanism, otherwise the broker - * will think this consumer is down and will trigger a rebalance (if using dynamic + * will think this consumer is down and will trigger a rebalance (if using dynamic * subscription). * * The timeout used on this call will be the one configured via Consumer::set_timeout. * - * The returned message *might* be empty. If's necessary to check that it's a valid one before - * using it: - * + * \return A message. The returned message *might* be empty. It's necessary to check + * that it's valid before using it: + * * \code - * Message msg = consumer.poll(); + * Message msg = consumer.poll(); * if (msg) { * // It's a valid message! * } @@ -350,6 +366,8 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { * instead of the one configured on this Consumer. * * \param timeout The timeout to be used on this call + * + * \return A message */ Message poll(std::chrono::milliseconds timeout); @@ -359,8 +377,10 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { * This can return one or more messages * * \param max_batch_size The maximum amount of messages expected + * + * \return A list of messages */ - std::vector poll_batch(size_t max_batch_size); + MessageList poll_batch(size_t max_batch_size); /** * \brief Polls for a batch of messages @@ -369,8 +389,42 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { * * \param max_batch_size The maximum amount of messages expected * \param timeout The timeout for this operation + * + * \return A list of messages + */ + MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout); + + /** + * \brief Get the global event queue servicing this consumer corresponding to + * rd_kafka_queue_get_main and which is polled via rd_kafka_poll + * + * \return A Queue object + * + * \remark Note that this call will disable forwarding to the consumer_queue. + * To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue) + */ + Queue get_main_queue() const; + + /** + * \brief Get the consumer group queue servicing corresponding to + * rd_kafka_queue_get_consumer and which is polled via rd_kafka_consumer_poll + * + * \return A Queue object + */ + Queue get_consumer_queue() const; + + /** + * \brief Get the queue belonging to this partition. If the consumer is not assigned to this + * partition, an empty queue will be returned + * + * \param partition The partition object + * + * \return A Queue object + * + * \remark Note that this call will disable forwarding to the consumer_queue. + * To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue) */ - std::vector poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout); + Queue get_partition_queue(const TopicPartition& partition) const; private: static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque); diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index 793a4aa3..2473d1d6 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -55,5 +56,8 @@ #include #include #include +#include +#include +#include #endif diff --git a/include/cppkafka/exceptions.h b/include/cppkafka/exceptions.h index 323c3e5c..8bfd8016 100644 --- a/include/cppkafka/exceptions.h +++ b/include/cppkafka/exceptions.h @@ -122,6 +122,18 @@ class CPPKAFKA_API ConsumerException : public Exception { Error error_; }; +/** + * Queue exception for rd_kafka_queue_t errors + */ +class CPPKAFKA_API QueueException : public Exception { +public: + QueueException(Error error); + + Error get_error() const; +private: + Error error_; +}; + } // cppkafka #endif // CPPKAFKA_EXCEPTIONS_H diff --git a/include/cppkafka/group_information.h b/include/cppkafka/group_information.h index 483568fc..41d3cda0 100644 --- a/include/cppkafka/group_information.h +++ b/include/cppkafka/group_information.h @@ -136,6 +136,8 @@ class CPPKAFKA_API GroupInformation { std::vector members_; }; +using GroupInformationList = std::vector; + } // cppkafka #endif // CPPKAFKA_GROUP_INFORMATION_H diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 68dd61b4..30b679e9 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -39,6 +39,7 @@ #include #include #include +#include "group_information.h" #include "topic_partition.h" #include "topic_partition_list.h" #include "topic_configuration.h" @@ -78,7 +79,7 @@ class CPPKAFKA_API KafkaHandleBase { /** * \brief Resumes consumption/production from the given topic/partition list * - * This translates into a call to rd_kafka_resume_partitions + * This translates into a call to rd_kafka_resume_partitions * * \param topic_partitions The topic/partition list to resume consuming/producing from/to */ @@ -108,11 +109,15 @@ class CPPKAFKA_API KafkaHandleBase { * This translates into a call to rd_kafka_query_watermark_offsets * * \param topic_partition The topic/partition to be queried + * + * \return A pair of watermark offsets {low, high} */ OffsetTuple query_offsets(const TopicPartition& topic_partition) const; /** - * Gets the rdkafka handle + * \brief Gets the rdkafka handle + * + * \return The rdkafka handle */ rd_kafka_t* get_handle() const; @@ -123,7 +128,9 @@ class CPPKAFKA_API KafkaHandleBase { * configuration provided in the Configuration object for this consumer/producer handle, * if any. * - * \param name The name of the topic to be created + * \param name The name of the topic to be created + * + * \return A topic */ Topic get_topic(const std::string& name); @@ -134,15 +141,19 @@ class CPPKAFKA_API KafkaHandleBase { * * \param name The name of the topic to be created * \param config The configuration to be used for the new topic + * + * \return A topic */ Topic get_topic(const std::string& name, TopicConfiguration config); /** * \brief Gets metadata for brokers, topics, partitions, etc * + * This translates into a call to rd_kafka_metadata + * * \param all_topics Whether to fetch metadata about all topics or only locally known ones * - * This translates into a call to rd_kafka_metadata + * \return The metadata */ Metadata get_metadata(bool all_topics = true) const; @@ -153,20 +164,26 @@ class CPPKAFKA_API KafkaHandleBase { * This translates into a call to rd_kafka_metadata * * \param topic The topic to fetch information for + * + * \return The topic metadata */ TopicMetadata get_metadata(const Topic& topic) const; /** - * Gets the consumer group information + * \brief Gets the consumer group information * * \param name The name of the consumer group to look up + * + * \return The group information */ GroupInformation get_consumer_group(const std::string& name); /** - * Gets all consumer groups + * \brief Gets all consumer groups + * + * \return A list of consumer groups */ - std::vector get_consumer_groups(); + GroupInformationList get_consumer_groups(); /** * \brief Gets topic/partition offsets based on timestamps @@ -174,23 +191,31 @@ class CPPKAFKA_API KafkaHandleBase { * This translates into a call to rd_kafka_offsets_for_times * * \param queries A map from topic/partition to the timestamp to be used + * + * \return A topic partition list */ TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const; /** - * Returns the kafka handle name + * \brief Get the kafka handle name + * + * \return The handle name */ std::string get_name() const; /** - * Gets the configured timeout. + * \brief Gets the configured timeout. + * + * \return The configured timeout * * \sa KafkaHandleBase::set_timeout */ std::chrono::milliseconds get_timeout() const; /** - * Gets the handle's configuration + * \brief Gets the handle's configuration + * + * \return A reference to the configuration object */ const Configuration& get_configuration() const; @@ -198,6 +223,8 @@ class CPPKAFKA_API KafkaHandleBase { * \brief Gets the length of the out queue * * This calls rd_kafka_outq_len + * + * \return The length of the queue */ int get_out_queue_length() const; @@ -221,7 +248,7 @@ class CPPKAFKA_API KafkaHandleBase { Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const; - std::vector fetch_consumer_groups(const char* name); + GroupInformationList fetch_consumer_groups(const char* name); void save_topic_config(const std::string& topic_name, TopicConfiguration config); std::chrono::milliseconds timeout_ms_; diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 58661247..a15f9474 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -177,6 +177,8 @@ class CPPKAFKA_API Message { Buffer key_; }; +using MessageList = std::vector; + /** * Represents a message's timestamp */ diff --git a/include/cppkafka/queue.h b/include/cppkafka/queue.h new file mode 100644 index 00000000..d7bc5028 --- /dev/null +++ b/include/cppkafka/queue.h @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include +#include +#include +#include "macros.h" +#include "message.h" + +#ifndef CPPKAFKA_QUEUE_H +#define CPPKAFKA_QUEUE_H + +namespace cppkafka { +/** + * \brief Represents a rdkafka queue + * + * This is a simple wrapper over a rd_kafka_queue_t* + */ +class CPPKAFKA_API Queue { +public: + /** + * \brief Creates a Queue object that doesn't take ownership of the handle + * + * \param handle The handle to be used + */ + static Queue make_non_owning(rd_kafka_queue_t* handle); + + /** + * \brief Constructs an empty queue + * + * Note that using any methods except Queue::get_handle on an empty queue is undefined + * behavior + */ + Queue(); + + /** + * \brief Constructs a queue using a handle + * + * This will take ownership of the handle + * + * \param handle The handle to be used + */ + Queue(rd_kafka_queue_t* handle); + + /** + * Returns the rdkakfa handle + */ + rd_kafka_queue_t* get_handle() const; + + /** + * \brief Returns the length of the queue + * + * This translates to a call to rd_kafka_queue_length + */ + size_t get_length() const; + + /** + * \brief Forward to another queue + * + * This translates to a call to rd_kafka_queue_forward + */ + void forward_to_queue(const Queue& forward_queue) const; + + /** + * \brief Disable forwarding to another queue + * + * This translates to a call to rd_kafka_queue_forward(NULL) + */ + void disable_queue_forwarding() const; + + /** + * \brief Sets the timeout for consume operations + * + * This timeout is applied when calling consume() + * + * \param timeout The timeout to be set + */ + void set_timeout(std::chrono::milliseconds timeout); + + /** + * Gets the configured timeout. + * + * \sa Queue::set_timeout + */ + std::chrono::milliseconds get_timeout() const; + + /** + * \brief Consume a message from this queue + * + * This translates to a call to rd_kafka_consume_queue using the configured timeout for this object + * + * \return A message + */ + Message consume() const; + + /** + * \brief Consume a message from this queue + * + * Same as consume() but the specified timeout will be used instead of the configured one + * + * \param timeout The timeout to be used on this call + * + * \return A message + */ + Message consume(std::chrono::milliseconds timeout) const; + + /** + * \brief Consumes a batch of messages from this queue + * + * This translates to a call to rd_kafka_consume_batch_queue using the configured timeout for this object + * + * \param max_batch_size The max number of messages to consume if available + * + * \return A list of messages. Could be empty if there's nothing to consume + */ + MessageList consume_batch(size_t max_batch_size) const; + + /** + * \brief Consumes a batch of messages from this queue + * + * Same as Queue::consume_batch(size_t) but the specified timeout will be used instead of the configured one + * + * \param max_batch_size The max number of messages to consume if available + * + * \param timeout The timeout to be used on this call + * + * \return A list of messages. Could be empty if there's nothing to consume + */ + MessageList consume_batch(size_t max_batch_size, std::chrono::milliseconds timeout) const; + + /** + * Indicates whether this queue is valid (not null) + */ + explicit operator bool() const { + return handle_ != nullptr; + } + +private: + static const std::chrono::milliseconds DEFAULT_TIMEOUT; + + using HandlePtr = std::unique_ptr; + + struct NonOwningTag { }; + + Queue(rd_kafka_queue_t* handle, NonOwningTag); + + // Members + HandlePtr handle_; + std::chrono::milliseconds timeout_ms_; +}; + +using QueueList = std::vector; + +} // cppkafka + +#endif //CPPKAFKA_QUEUE_H diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index bfe9c630..8454a4c3 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -178,6 +178,13 @@ class CPPKAFKA_API BufferedProducer { */ void clear(); + /** + * \brief Get the number of messages in the buffer + * + * \return The number of messages + */ + size_t get_buffer_size() const; + /** * \brief Sets the maximum amount of messages to be enqueued in the buffer. * @@ -199,13 +206,6 @@ class CPPKAFKA_API BufferedProducer { */ ssize_t get_max_buffer_size() const; - /** - * \brief Get the number of unsent messages in the buffer - * - * \return The number of messages - */ - size_t get_buffer_size() const; - /** * \brief Get the number of messages not yet acked by the broker * @@ -400,6 +400,11 @@ void BufferedProducer::clear() { std::swap(tmp, messages_); } +template +size_t BufferedProducer::get_buffer_size() const { + return messages_.size(); +} + template void BufferedProducer::set_max_buffer_size(ssize_t max_buffer_size) { if (max_buffer_size < -1) { @@ -413,11 +418,6 @@ ssize_t BufferedProducer::get_max_buffer_size() const { return max_buffer_size_; } -template -size_t BufferedProducer::get_buffer_size() const { - return messages_.size(); -} - template template void BufferedProducer::do_add_message(BuilderType&& builder, diff --git a/include/cppkafka/utils/consumer_dispatcher.h b/include/cppkafka/utils/consumer_dispatcher.h index 56c1a918..05b18759 100644 --- a/include/cppkafka/utils/consumer_dispatcher.h +++ b/include/cppkafka/utils/consumer_dispatcher.h @@ -238,7 +238,7 @@ class CPPKAFKA_API BasicConsumerDispatcher { } // Finds the first functor that accepts the parameters in a tuple and returns it. If no - // such functor is found, a static asertion will occur + // such functor is found, a static assertion will occur template const typename find_type::type& find_matching_functor(const Functors&... functors) { diff --git a/include/cppkafka/utils/poll_interface.h b/include/cppkafka/utils/poll_interface.h new file mode 100644 index 00000000..af93e3f3 --- /dev/null +++ b/include/cppkafka/utils/poll_interface.h @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_POLL_INTERFACE_H +#define CPPKAFKA_POLL_INTERFACE_H + +#include "../consumer.h" + +namespace cppkafka { + +/** + * \interface PollInterface + * + * \brief Interface defining polling methods for the Consumer class + */ +struct PollInterface { + virtual ~PollInterface() = default; + + /** + * \brief Get the underlying consumer controlled by this strategy + * + * \return A reference to the consumer instance + */ + virtual Consumer& get_consumer() = 0; + + /** + * \brief Sets the timeout for polling functions + * + * This calls Consumer::set_timeout + * + * \param timeout The timeout to be set + */ + virtual void set_timeout(std::chrono::milliseconds timeout) = 0; + + /** + * \brief Gets the timeout for polling functions + * + * This calls Consumer::get_timeout + * + * \return The timeout + */ + virtual std::chrono::milliseconds get_timeout() = 0; + + /** + * \brief Polls all assigned partitions for new messages in round-robin fashion + * + * Each call to poll() will first consume from the global event queue and if there are + * no pending events, will attempt to consume from all partitions until a valid message is found. + * The timeout used on this call will be the one configured via PollInterface::set_timeout. + * + * \return A message. The returned message *might* be empty. It's necessary to check + * that it's a valid one before using it (see example above). + * + * \remark You need to call poll() or poll_batch() periodically as a keep alive mechanism, + * otherwise the broker will think this consumer is down and will trigger a rebalance + * (if using dynamic subscription) + */ + virtual Message poll() = 0; + + /** + * \brief Polls for new messages + * + * Same as the other overload of PollInterface::poll but the provided + * timeout will be used instead of the one configured on this Consumer. + * + * \param timeout The timeout to be used on this call + */ + virtual Message poll(std::chrono::milliseconds timeout) = 0; + + /** + * \brief Polls all assigned partitions for a batch of new messages in round-robin fashion + * + * Each call to poll_batch() will first attempt to consume from the global event queue + * and if the maximum batch number has not yet been filled, will attempt to fill it by + * reading the remaining messages from each partition. + * + * \param max_batch_size The maximum amount of messages expected + * + * \return A list of messages + * + * \remark You need to call poll() or poll_batch() periodically as a keep alive mechanism, + * otherwise the broker will think this consumer is down and will trigger a rebalance + * (if using dynamic subscription) + */ + virtual MessageList poll_batch(size_t max_batch_size) = 0; + + /** + * \brief Polls all assigned partitions for a batch of new messages in round-robin fashion + * + * Same as the other overload of PollInterface::poll_batch but the provided + * timeout will be used instead of the one configured on this Consumer. + * + * \param max_batch_size The maximum amount of messages expected + * + * \param timeout The timeout for this operation + * + * \return A list of messages + */ + virtual MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout) = 0; +}; + +} //cppkafka + +#endif //CPPKAFKA_POLL_INTERFACE_H diff --git a/include/cppkafka/utils/poll_strategy_base.h b/include/cppkafka/utils/poll_strategy_base.h new file mode 100644 index 00000000..0cf6d889 --- /dev/null +++ b/include/cppkafka/utils/poll_strategy_base.h @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_POLL_STRATEGY_BASE_H +#define CPPKAFKA_POLL_STRATEGY_BASE_H + +#include +#include +#include "../queue.h" +#include "../topic_partition_list.h" +#include "poll_interface.h" + +namespace cppkafka { + +/** + * \brief Contains a partition queue and generic metadata which can be used to store + * related (user-specific) information. + */ +struct QueueData { + Queue queue; + boost::any metadata; +}; + +/** + * \class PollStrategyBase + * + * \brief Base implementation of the PollInterface + */ +class PollStrategyBase : public PollInterface { +public: + using QueueMap = std::map; + + /** + * \brief Constructor + * + * \param consumer A reference to the polled consumer instance + */ + explicit PollStrategyBase(Consumer& consumer); + + /** + * \brief Destructor + */ + ~PollStrategyBase(); + + /** + * \sa PollInterface::set_timeout + */ + void set_timeout(std::chrono::milliseconds timeout) override; + + /** + * \sa PollInterface::get_timeout + */ + std::chrono::milliseconds get_timeout() override; + + /** + * \sa PollInterface::get_consumer + */ + Consumer& get_consumer() final; + +protected: + /** + * \brief Get the queues from all assigned partitions + * + * \return A map of queues indexed by partition + */ + QueueMap& get_partition_queues(); + + /** + * \brief Get the main consumer queue which services the underlying Consumer object + * + * \return The consumer queue + */ + QueueData& get_consumer_queue(); + + /** + * \brief Reset the internal state of the queues. + * + * Use this function to reset the state of any polling strategy or algorithm. + * + * \remark This function gets called by on_assignement(), on_revocation() and on_rebalance_error() + */ + virtual void reset_state(); + + /** + * \brief Function to be called when a new partition assignment takes place + * + * This method contains a default implementation. It adds all the new queues belonging + * to the provided partition list and calls reset_state(). + * + * \param partitions Assigned topic partitions + */ + virtual void on_assignment(TopicPartitionList& partitions); + + /** + * \brief Function to be called when an old partition assignment gets revoked + * + * This method contains a default implementation. It removes all the queues + * belonging to the provided partition list and calls reset_state(). + * + * \param partitions Revoked topic partitions + */ + virtual void on_revocation(const TopicPartitionList& partitions); + + /** + * \brief Function to be called when a topic rebalance error happens + * + * This method contains a default implementation. Calls reset_state(). + * + * \param error The rebalance error + */ + virtual void on_rebalance_error(Error error); + +private: + Consumer& consumer_; + QueueData consumer_queue_; + QueueMap partition_queues_; + Consumer::AssignmentCallback assignment_callback_; + Consumer::RevocationCallback revocation_callback_; + Consumer::RebalanceErrorCallback rebalance_error_callback_; +}; + +} //cppkafka + +#endif //CPPKAFKA_POLL_STRATEGY_BASE_H diff --git a/include/cppkafka/utils/roundrobin_poll_strategy.h b/include/cppkafka/utils/roundrobin_poll_strategy.h new file mode 100644 index 00000000..bb91e054 --- /dev/null +++ b/include/cppkafka/utils/roundrobin_poll_strategy.h @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_ROUNDROBIN_POLL_STRATEGY_H +#define CPPKAFKA_ROUNDROBIN_POLL_STRATEGY_H + +#include +#include +#include "../exceptions.h" +#include "../consumer.h" +#include "../queue.h" +#include "poll_strategy_base.h" + +namespace cppkafka { + +/** + * \brief This adapter changes the default polling strategy of the Consumer into a fair round-robin + * polling mechanism. + * + * The default librdkafka (and cppkafka) poll() and poll_batch() behavior is to consume batches of + * messages from each partition in turn. For performance reasons, librdkafka pre-fetches batches + * of messages from the kafka broker (one batch from each partition), and stores them locally in + * partition queues. Since all the internal partition queues are forwarded by default unto the + * group consumer queue (one per consumer), these batches end up being polled and consumed in the + * same sequence order. + * This adapter allows fair round-robin polling of all assigned partitions, one message at a time + * (or one batch at a time if poll_batch() is used). Note that poll_batch() has nothing to do with + * the internal batching mechanism of librdkafka. + * + * Example code on how to use this: + * + * \code + * // Create a consumer + * Consumer consumer(...); + * consumer.subscribe({ "my_topic" }); + * + * // Optionally set the callbacks. This must be done *BEFORE* creating the strategy adapter + * consumer.set_assignment_callback(...); + * consumer.set_revocation_callback(...); + * consumer.set_rebalance_error_callback(...); + * + * // Create the adapter and use it for polling + * RoundRobinPollStrategy poll_strategy(consumer); + * + * while (true) { + * // Poll each partition in turn + * Message msg = poll_strategy.poll(); + * if (msg) { + * // process valid message + * } + * } + * } + * \endcode + * + * \warning Calling directly poll() or poll_batch() on the Consumer object while using this adapter will + * lead to undesired results since the RoundRobinPollStrategy modifies the internal queuing mechanism of + * the Consumer instance it owns. + */ + +class RoundRobinPollStrategy : public PollStrategyBase { +public: + RoundRobinPollStrategy(Consumer& consumer); + + ~RoundRobinPollStrategy(); + + /** + * \sa PollInterface::poll + */ + Message poll() override; + + /** + * \sa PollInterface::poll + */ + Message poll(std::chrono::milliseconds timeout) override; + + /** + * \sa PollInterface::poll_batch + */ + MessageList poll_batch(size_t max_batch_size) override; + + /** + * \sa PollInterface::poll_batch + */ + MessageList poll_batch(size_t max_batch_size, + std::chrono::milliseconds timeout) override; + +protected: + /** + * \sa PollStrategyBase::reset_state + */ + void reset_state() final; + + QueueData& get_next_queue(); + +private: + void consume_batch(Queue& queue, + MessageList& messages, + ssize_t& count, + std::chrono::milliseconds timeout); + + void restore_forwarding(); + + // Members + QueueMap::iterator queue_iter_; +}; + +} //cppkafka + +#endif //CPPKAFKA_ROUNDROBIN_POLL_STRATEGY_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fa018d56..2e893a83 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,7 @@ set(SOURCES exceptions.cpp topic.cpp buffer.cpp + queue.cpp message.cpp topic_partition.cpp topic_partition_list.cpp @@ -18,6 +19,8 @@ set(SOURCES utils/backoff_performer.cpp utils/backoff_committer.cpp + utils/poll_strategy_base.cpp + utils/roundrobin_poll_strategy.cpp ) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka) diff --git a/src/consumer.cpp b/src/consumer.cpp index f250cbc4..ce840928 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -52,14 +52,14 @@ void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, static_cast(opaque)->handle_rebalance(error, list); } -Consumer::Consumer(Configuration config) +Consumer::Consumer(Configuration config) : KafkaHandleBase(move(config)) { char error_buffer[512]; rd_kafka_conf_t* config_handle = get_configuration_handle(); // Set ourselves as the opaque pointer rd_kafka_conf_set_opaque(config_handle, this); rd_kafka_conf_set_rebalance_cb(config_handle, &Consumer::rebalance_proxy); - rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, + rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, rd_kafka_conf_dup(config_handle), error_buffer, sizeof(error_buffer)); if (!ptr) { @@ -165,7 +165,7 @@ KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_p int64_t low; int64_t high; const string& topic = topic_partition.get_topic(); - const int partition = topic_partition.get_partition(); + const int partition = topic_partition.get_partition(); rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(), partition, &low, &high); check_error(result); @@ -232,16 +232,14 @@ Message Consumer::poll() { } Message Consumer::poll(milliseconds timeout) { - rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), - static_cast(timeout.count())); - return message ? Message(message) : Message(); + return rd_kafka_consumer_poll(get_handle(), static_cast(timeout.count())); } -vector Consumer::poll_batch(size_t max_batch_size) { +MessageList Consumer::poll_batch(size_t max_batch_size) { return poll_batch(max_batch_size, get_timeout()); } -vector Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) { +MessageList Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) { vector raw_messages(max_batch_size); rd_kafka_queue_t* queue = rd_kafka_queue_get_consumer(get_handle()); ssize_t result = rd_kafka_consume_batch_queue(queue, timeout.count(), raw_messages.data(), @@ -249,15 +247,27 @@ vector Consumer::poll_batch(size_t max_batch_size, milliseconds timeout if (result == -1) { check_error(rd_kafka_last_error()); // on the off-chance that check_error() does not throw an error - result = 0; + return MessageList(); } - vector output; - raw_messages.resize(result); - output.reserve(result); - for (const auto ptr : raw_messages) { - output.emplace_back(ptr); - } - return output; + return MessageList(raw_messages.begin(), raw_messages.begin() + result); +} + +Queue Consumer::get_main_queue() const { + Queue queue(Queue::make_non_owning(rd_kafka_queue_get_main(get_handle()))); + queue.disable_queue_forwarding(); + return queue; +} + +Queue Consumer::get_consumer_queue() const { + return Queue::make_non_owning(rd_kafka_queue_get_consumer(get_handle())); +} + +Queue Consumer::get_partition_queue(const TopicPartition& partition) const { + Queue queue(Queue::make_non_owning(rd_kafka_queue_get_partition(get_handle(), + partition.get_topic().c_str(), + partition.get_partition()))); + queue.disable_queue_forwarding(); + return queue; } void Consumer::close() { diff --git a/src/exceptions.cpp b/src/exceptions.cpp index fbf98e1e..5cfa0831 100644 --- a/src/exceptions.cpp +++ b/src/exceptions.cpp @@ -108,4 +108,15 @@ Error ConsumerException::get_error() const { return error_; } +// QueueException + +QueueException::QueueException(Error error) +: Exception(error.to_string()), error_(error) { + +} + +Error QueueException::get_error() const { + return error_; +} + } // cppkafka diff --git a/src/message.cpp b/src/message.cpp index a52a0a70..d9c0870e 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -57,10 +57,9 @@ Message::Message(rd_kafka_message_t* handle, NonOwningTag) } Message::Message(HandlePtr handle) -: handle_(move(handle)), - payload_((const Buffer::DataType*)handle_->payload, handle_->len), - key_((const Buffer::DataType*)handle_->key, handle_->key_len) { - +: handle_(move(handle)), + payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()), + key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()) { } // MessageTimestamp diff --git a/src/queue.cpp b/src/queue.cpp new file mode 100644 index 00000000..7e220e5a --- /dev/null +++ b/src/queue.cpp @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ +#include "queue.h" +#include "exceptions.h" + +using std::vector; +using std::exception; +using std::chrono::milliseconds; + +namespace cppkafka { + +void dummy_deleter(rd_kafka_queue_t*) { + +} + +const milliseconds Queue::DEFAULT_TIMEOUT{1000}; + +Queue Queue::make_non_owning(rd_kafka_queue_t* handle) { + return Queue(handle, NonOwningTag{}); +} + +Queue::Queue() +: handle_(nullptr, nullptr), + timeout_ms_(DEFAULT_TIMEOUT) { + +} + +Queue::Queue(rd_kafka_queue_t* handle) +: handle_(handle, &rd_kafka_queue_destroy), + timeout_ms_(DEFAULT_TIMEOUT) { + +} + +Queue::Queue(rd_kafka_queue_t* handle, NonOwningTag) +: handle_(handle, &dummy_deleter) { + +} + +rd_kafka_queue_t* Queue::get_handle() const { + return handle_.get(); +} + +size_t Queue::get_length() const { + return rd_kafka_queue_length(handle_.get()); +} + +void Queue::forward_to_queue(const Queue& forward_queue) const { + return rd_kafka_queue_forward(handle_.get(), forward_queue.handle_.get()); +} + +void Queue::disable_queue_forwarding() const { + return rd_kafka_queue_forward(handle_.get(), nullptr); +} + +void Queue::set_timeout(milliseconds timeout) { + timeout_ms_ = timeout; +} + +milliseconds Queue::get_timeout() const { + return timeout_ms_; +} + +Message Queue::consume() const { + return consume(timeout_ms_); +} + +Message Queue::consume(milliseconds timeout) const { + return Message(rd_kafka_consume_queue(handle_.get(), static_cast(timeout.count()))); +} + +MessageList Queue::consume_batch(size_t max_batch_size) const { + return consume_batch(max_batch_size, timeout_ms_); +} + +MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const { + vector raw_messages(max_batch_size); + ssize_t result = rd_kafka_consume_batch_queue(handle_.get(), + static_cast(timeout.count()), + raw_messages.data(), + raw_messages.size()); + if (result == -1) { + rd_kafka_resp_err_t error = rd_kafka_last_error(); + if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { + throw QueueException(error); + } + return MessageList(); + } + // Build message list + return MessageList(raw_messages.begin(), raw_messages.begin() + result); +} + +} //cppkafka diff --git a/src/topic.cpp b/src/topic.cpp index 7ecdc633..9f311481 100644 --- a/src/topic.cpp +++ b/src/topic.cpp @@ -34,7 +34,7 @@ using std::string; namespace cppkafka { -void dummy_topic_destroyer(rd_kafka_topic_t*) { +void dummy_deleter(rd_kafka_topic_t*) { } @@ -47,13 +47,13 @@ Topic::Topic() } -Topic::Topic(rd_kafka_topic_t* handle) +Topic::Topic(rd_kafka_topic_t* handle) : handle_(handle, &rd_kafka_topic_destroy) { } Topic::Topic(rd_kafka_topic_t* handle, NonOwningTag) -: handle_(handle, &dummy_topic_destroyer) { +: handle_(handle, &dummy_deleter) { } diff --git a/src/utils/poll_strategy_base.cpp b/src/utils/poll_strategy_base.cpp new file mode 100644 index 00000000..910ae6f1 --- /dev/null +++ b/src/utils/poll_strategy_base.cpp @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "utils/poll_strategy_base.h" +#include "consumer.h" + +using std::chrono::milliseconds; + +namespace cppkafka { + +PollStrategyBase::PollStrategyBase(Consumer& consumer) +: consumer_(consumer), + consumer_queue_(QueueData{consumer.get_consumer_queue(), boost::any()}) { + // get all currently active partition assignments + TopicPartitionList assignment = consumer_.get_assignment(); + on_assignment(assignment); + + // take over the assignment callback + assignment_callback_ = consumer.get_assignment_callback(); + consumer_.set_assignment_callback([this](TopicPartitionList& partitions) { + on_assignment(partitions); + }); + // take over the revocation callback + revocation_callback_ = consumer.get_revocation_callback(); + consumer_.set_revocation_callback([this](const TopicPartitionList& partitions) { + on_revocation(partitions); + }); + // take over the rebalance error callback + rebalance_error_callback_ = consumer.get_rebalance_error_callback(); + consumer_.set_rebalance_error_callback([this](Error error) { + on_rebalance_error(error); + }); +} + +PollStrategyBase::~PollStrategyBase() { + //reset the original callbacks + consumer_.set_assignment_callback(assignment_callback_); + consumer_.set_revocation_callback(revocation_callback_); + consumer_.set_rebalance_error_callback(rebalance_error_callback_); +} + +void PollStrategyBase::set_timeout(milliseconds timeout) { + consumer_.set_timeout(timeout); +} + +milliseconds PollStrategyBase::get_timeout() { + return consumer_.get_timeout(); +} + +Consumer& PollStrategyBase::get_consumer() { + return consumer_; +} + +QueueData& PollStrategyBase::get_consumer_queue() { + return consumer_queue_; +} + +PollStrategyBase::QueueMap& PollStrategyBase::get_partition_queues() { + return partition_queues_; +} + +void PollStrategyBase::reset_state() { + +} + +void PollStrategyBase::on_assignment(TopicPartitionList& partitions) { + // populate partition queues + for (const auto& partition : partitions) { + // get the queue associated with this partition + partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()}); + } + reset_state(); + // call original consumer callback if any + if (assignment_callback_) { + assignment_callback_(partitions); + } +} + +void PollStrategyBase::on_revocation(const TopicPartitionList& partitions) { + for (const auto& partition : partitions) { + // get the queue associated with this partition + auto toppar_it = partition_queues_.find(partition); + if (toppar_it != partition_queues_.end()) { + // remove this queue from the list + partition_queues_.erase(toppar_it); + } + } + reset_state(); + // call original consumer callback if any + if (revocation_callback_) { + revocation_callback_(partitions); + } +} + +void PollStrategyBase::on_rebalance_error(Error error) { + reset_state(); + // call original consumer callback if any + if (rebalance_error_callback_) { + rebalance_error_callback_(error); + } +} + +} //cppkafka diff --git a/src/utils/roundrobin_poll_strategy.cpp b/src/utils/roundrobin_poll_strategy.cpp new file mode 100644 index 00000000..5d5fc7a6 --- /dev/null +++ b/src/utils/roundrobin_poll_strategy.cpp @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "utils/roundrobin_poll_strategy.h" + +using std::string; +using std::chrono::milliseconds; +using std::make_move_iterator; + +namespace cppkafka { + +RoundRobinPollStrategy::RoundRobinPollStrategy(Consumer& consumer) +: PollStrategyBase(consumer) { + reset_state(); +} + +RoundRobinPollStrategy::~RoundRobinPollStrategy() { + restore_forwarding(); +} + + +Message RoundRobinPollStrategy::poll() { + return poll(get_consumer().get_timeout()); +} + +Message RoundRobinPollStrategy::poll(milliseconds timeout) { + // Always give priority to group and global events + Message message = get_consumer_queue().queue.consume(milliseconds(0)); + if (message) { + return message; + } + size_t num_queues = get_partition_queues().size(); + while (num_queues--) { + //consume the next partition (non-blocking) + message = get_next_queue().queue.consume(milliseconds(0)); + if (message) { + return message; + } + } + // We still don't have a valid message so we block on the event queue + return get_consumer_queue().queue.consume(timeout); +} + +MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size) { + return poll_batch(max_batch_size, get_consumer().get_timeout()); +} + +MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size, milliseconds timeout) { + MessageList messages; + ssize_t count = max_batch_size; + + // batch from the group event queue first (non-blocking) + consume_batch(get_consumer_queue().queue, messages, count, milliseconds(0)); + size_t num_queues = get_partition_queues().size(); + while ((count > 0) && (num_queues--)) { + // batch from the next partition (non-blocking) + consume_batch(get_next_queue().queue, messages, count, milliseconds(0)); + } + // we still have space left in the buffer + if (count > 0) { + // wait on the event queue until timeout + consume_batch(get_consumer_queue().queue, messages, count, timeout); + } + return messages; +} + +void RoundRobinPollStrategy::consume_batch(Queue& queue, + MessageList& messages, + ssize_t& count, + milliseconds timeout) { + MessageList queue_messages = queue.consume_batch(count, timeout); + if (queue_messages.empty()) { + return; + } + // concatenate both lists + messages.insert(messages.end(), + make_move_iterator(queue_messages.begin()), + make_move_iterator(queue_messages.end())); + // reduce total batch count + count -= queue_messages.size(); +} + + +void RoundRobinPollStrategy::restore_forwarding() { + // forward all partition queues + for (const auto& toppar : get_partition_queues()) { + toppar.second.queue.forward_to_queue(get_consumer_queue().queue); + } +} + +QueueData& RoundRobinPollStrategy::get_next_queue() { + if (get_partition_queues().empty()) { + throw QueueException(RD_KAFKA_RESP_ERR__STATE); + } + if (++queue_iter_ == get_partition_queues().end()) { + queue_iter_ = get_partition_queues().begin(); + } + return queue_iter_->second; +} + +void RoundRobinPollStrategy::reset_state() { + queue_iter_ = get_partition_queues().begin(); +} + +} //cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a423739c..01e06ed6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -7,14 +7,10 @@ set(KAFKA_TEST_INSTANCE "kafka-vm:9092" add_custom_target(tests) include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -add_library(cppkafka-test EXCLUDE_FROM_ALL test_utils.cpp) -target_link_libraries(cppkafka-test cppkafka ${RDKAFKA_LIBRARY} pthread) - add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") add_executable( cppkafka_tests - EXCLUDE_FROM_ALL buffer_test.cpp compacted_topic_processor_test.cpp configuration_test.cpp @@ -22,10 +18,11 @@ add_executable( kafka_handle_base_test.cpp producer_test.cpp consumer_test.cpp + roundrobin_poll_test.cpp # Main file test_main.cpp ) -target_link_libraries(cppkafka_tests cppkafka-test) +target_link_libraries(cppkafka_tests cppkafka ${RDKAFKA_LIBRARY} pthread) add_dependencies(tests cppkafka_tests) add_test(cppkafka cppkafka_tests) diff --git a/tests/compacted_topic_processor_test.cpp b/tests/compacted_topic_processor_test.cpp index f72f580c..4b4c36d9 100644 --- a/tests/compacted_topic_processor_test.cpp +++ b/tests/compacted_topic_processor_test.cpp @@ -8,6 +8,7 @@ #include "cppkafka/producer.h" #include "cppkafka/consumer.h" #include "cppkafka/utils/compacted_topic_processor.h" +#include "test_utils.h" using std::string; using std::to_string; @@ -29,8 +30,6 @@ using std::chrono::milliseconds; using namespace cppkafka; -static const string KAFKA_TOPIC = "cppkafka_test1"; - static Configuration make_producer_config() { Configuration config; config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); @@ -65,7 +64,7 @@ TEST_CASE("consumption", "[consumer][compacted]") { compacted_consumer.set_event_handler([&](const Event& event) { events.push_back(event); }); - consumer.subscribe({ KAFKA_TOPIC }); + consumer.subscribe({ KAFKA_TOPICS[0] }); consumer.poll(); consumer.poll(); consumer.poll(); @@ -82,13 +81,13 @@ TEST_CASE("consumption", "[consumer][compacted]") { }; for (const auto& element_pair : elements) { const ElementType& element = element_pair.second; - MessageBuilder builder(KAFKA_TOPIC); + MessageBuilder builder(KAFKA_TOPICS[0]); builder.partition(element.partition).key(element_pair.first).payload(element.value); producer.produce(builder); } // Now erase the first element string deleted_key = "42"; - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(0).key(deleted_key)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(0).key(deleted_key)); for (size_t i = 0; i < 10; ++i) { compacted_consumer.process_event(); diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 87592ae8..843eddcd 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -29,8 +29,6 @@ using std::chrono::system_clock; using namespace cppkafka; -const string KAFKA_TOPIC = "cppkafka_test1"; - static Configuration make_producer_config() { Configuration config; config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); @@ -54,31 +52,32 @@ TEST_CASE("message consumption", "[consumer]") { consumer.set_assignment_callback([&](const TopicPartitionList& topic_partitions) { assignment = topic_partitions; }); - consumer.subscribe({ KAFKA_TOPIC }); - ConsumerRunner runner(consumer, 1, 3); + consumer.subscribe({ KAFKA_TOPICS[0] }); + ConsumerRunner runner(consumer, 1, KAFKA_NUM_PARTITIONS); // Produce a message just so we stop the consumer Producer producer(make_producer_config()); string payload = "Hello world!"; - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload)); runner.try_join(); - // All 3 partitions should be ours - REQUIRE(assignment.size() == 3); - set partitions = { 0, 1, 2 }; + // All partitions should be ours + REQUIRE(assignment.size() == KAFKA_NUM_PARTITIONS); + set partitions; + for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace(i++)); for (const auto& topic_partition : assignment) { - CHECK(topic_partition.get_topic() == KAFKA_TOPIC); + CHECK(topic_partition.get_topic() == KAFKA_TOPICS[0]); CHECK(partitions.erase(topic_partition.get_partition()) == true); } REQUIRE(runner.get_messages().size() == 1); - CHECK(consumer.get_subscription() == vector{ KAFKA_TOPIC }); + CHECK(consumer.get_subscription() == vector{ KAFKA_TOPICS[0] }); assignment = consumer.get_assignment(); - CHECK(assignment.size() == 3); + CHECK(assignment.size() == KAFKA_NUM_PARTITIONS); int64_t low; int64_t high; - tie(low, high) = consumer.get_offsets({ KAFKA_TOPIC, partition }); + tie(low, high) = consumer.get_offsets({ KAFKA_TOPICS[0], partition }); CHECK(high > low); CHECK(runner.get_messages().back().get_offset() + 1 == high); } @@ -97,15 +96,15 @@ TEST_CASE("consumer rebalance", "[consumer]") { consumer1.set_revocation_callback([&](const TopicPartitionList&) { revocation_called = true; }); - consumer1.subscribe({ KAFKA_TOPIC }); - ConsumerRunner runner1(consumer1, 1, 3); + consumer1.subscribe({ KAFKA_TOPICS[0] }); + ConsumerRunner runner1(consumer1, 1, KAFKA_NUM_PARTITIONS); // Create a second consumer and subscribe to the topic Consumer consumer2(make_consumer_config()); consumer2.set_assignment_callback([&](const TopicPartitionList& topic_partitions) { assignment2 = topic_partitions; }); - consumer2.subscribe({ KAFKA_TOPIC }); + consumer2.subscribe({ KAFKA_TOPICS[0] }); ConsumerRunner runner2(consumer2, 1, 1); CHECK(revocation_called == true); @@ -113,19 +112,20 @@ TEST_CASE("consumer rebalance", "[consumer]") { // Produce a message just so we stop the consumer Producer producer(make_producer_config()); string payload = "Hello world!"; - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload)); runner1.try_join(); runner2.try_join(); - // All 3 partitions should be assigned - CHECK(assignment1.size() + assignment2.size() == 3); - set partitions = { 0, 1, 2 }; + // All partitions should be assigned + CHECK(assignment1.size() + assignment2.size() == KAFKA_NUM_PARTITIONS); + set partitions; + for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace(i++)); for (const auto& topic_partition : assignment1) { - CHECK(topic_partition.get_topic() == KAFKA_TOPIC); + CHECK(topic_partition.get_topic() == KAFKA_TOPICS[0]); CHECK(partitions.erase(topic_partition.get_partition()) == true); } for (const auto& topic_partition : assignment2) { - CHECK(topic_partition.get_topic() == KAFKA_TOPIC); + CHECK(topic_partition.get_topic() == KAFKA_TOPICS[0]); CHECK(partitions.erase(topic_partition.get_partition()) == true); } CHECK(runner1.get_messages().size() + runner2.get_messages().size() == 1); @@ -143,18 +143,18 @@ TEST_CASE("consumer offset commit", "[consumer]") { offset_commit_called = true; CHECK(!!error == false); REQUIRE(topic_partitions.size() == 1); - CHECK(topic_partitions[0].get_topic() == KAFKA_TOPIC); + CHECK(topic_partitions[0].get_topic() == KAFKA_TOPICS[0]); CHECK(topic_partitions[0].get_partition() == 0); CHECK(topic_partitions[0].get_offset() == message_offset + 1); }); Consumer consumer(config); - consumer.assign({ { KAFKA_TOPIC, 0 } }); + consumer.assign({ { KAFKA_TOPICS[0], 0 } }); ConsumerRunner runner(consumer, 1, 1); // Produce a message just so we stop the consumer Producer producer(make_producer_config()); string payload = "Hello world!"; - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload)); runner.try_join(); REQUIRE(runner.get_messages().size() == 1); @@ -173,7 +173,7 @@ TEST_CASE("consumer throttle", "[consumer]") { // Create a consumer and subscribe to the topic Configuration config = make_consumer_config("offset_commit"); Consumer consumer(config); - consumer.assign({ { KAFKA_TOPIC, 0 } }); + consumer.assign({ { KAFKA_TOPICS[0], 0 } }); { ConsumerRunner runner(consumer, 0, 1); @@ -183,7 +183,7 @@ TEST_CASE("consumer throttle", "[consumer]") { // Produce a message just so we stop the consumer BufferedProducer producer(make_producer_config()); string payload = "Hello world!"; - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload)); producer.flush(); size_t callback_executed_count = 0; @@ -213,7 +213,7 @@ TEST_CASE("consume batch", "[consumer]") { // Create a consumer and subscribe to the topic Configuration config = make_consumer_config("test"); Consumer consumer(config); - consumer.assign({ { KAFKA_TOPIC, 0 } }); + consumer.assign({ { KAFKA_TOPICS[0], 0 } }); { ConsumerRunner runner(consumer, 0, 1); @@ -224,14 +224,14 @@ TEST_CASE("consume batch", "[consumer]") { BufferedProducer producer(make_producer_config()); string payload = "Hello world!"; // Produce it twice - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload)); producer.flush(); - vector all_messages; + MessageList all_messages; int i = 0; while (i < 5 && all_messages.size() != 2) { - vector messages = consumer.poll_batch(2); + MessageList messages = consumer.poll_batch(2); all_messages.insert(all_messages.end(), make_move_iterator(messages.begin()), make_move_iterator(messages.end())); ++i; diff --git a/tests/kafka_handle_base_test.cpp b/tests/kafka_handle_base_test.cpp index b33603ea..16593232 100644 --- a/tests/kafka_handle_base_test.cpp +++ b/tests/kafka_handle_base_test.cpp @@ -14,8 +14,6 @@ using std::string; using namespace cppkafka; -static const string KAFKA_TOPIC = "cppkafka_test1"; - Configuration make_config() { Configuration config; config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); @@ -45,6 +43,9 @@ uint16_t get_kafka_port() { } TEST_CASE("metadata", "[handle_base]") { + if (KAFKA_TOPICS.size() < 2) { + return; //skip test + } Producer producer({}); producer.add_brokers(KAFKA_TEST_INSTANCE); Metadata metadata = producer.get_metadata(); @@ -59,7 +60,7 @@ TEST_CASE("metadata", "[handle_base]") { } SECTION("topics") { - unordered_set topic_names = { "cppkafka_test1", "cppkafka_test2" }; + unordered_set topic_names = { KAFKA_TOPICS[0], KAFKA_TOPICS[1] }; size_t found_topics = 0; const vector& topics = metadata.get_topics(); @@ -68,8 +69,9 @@ TEST_CASE("metadata", "[handle_base]") { for (const auto& topic : topics) { if (topic_names.count(topic.get_name()) == 1) { const vector& partitions = topic.get_partitions(); - REQUIRE(partitions.size() == 3); - set expected_ids = { 0, 1, 2 }; + REQUIRE(partitions.size() == KAFKA_NUM_PARTITIONS); + set expected_ids; + for (int i = 0; i < KAFKA_NUM_PARTITIONS; expected_ids.emplace(i++)); for (const PartitionMetadata& partition : partitions) { REQUIRE(expected_ids.erase(partition.get_id()) == 1); for (int32_t replica : partition.get_replicas()) { @@ -90,8 +92,8 @@ TEST_CASE("metadata", "[handle_base]") { CHECK(metadata.get_topics_prefixed("cppkafka_").size() == topic_names.size()); // Now get the whole metadata only for this topic - Topic topic = producer.get_topic(KAFKA_TOPIC); - CHECK(producer.get_metadata(topic).get_name() == KAFKA_TOPIC); + Topic topic = producer.get_topic(KAFKA_TOPICS[0]); + CHECK(producer.get_metadata(topic).get_name() == KAFKA_TOPICS[0]); } } @@ -106,7 +108,7 @@ TEST_CASE("consumer groups", "[handle_base]") { // Build consumer Consumer consumer(config); - consumer.subscribe({ KAFKA_TOPIC }); + consumer.subscribe({ KAFKA_TOPICS[0] }); ConsumerRunner runner(consumer, 0, 3); runner.try_join(); @@ -120,11 +122,8 @@ TEST_CASE("consumer groups", "[handle_base]") { MemberAssignmentInformation assignment = member.get_member_assignment(); CHECK(assignment.get_version() == 0); - TopicPartitionList expected_topic_partitions = { - { KAFKA_TOPIC, 0 }, - { KAFKA_TOPIC, 1 }, - { KAFKA_TOPIC, 2 } - }; + TopicPartitionList expected_topic_partitions; + for (int i = 0; i < KAFKA_NUM_PARTITIONS; expected_topic_partitions.emplace_back(KAFKA_TOPICS[0], i++)); TopicPartitionList topic_partitions = assignment.get_topic_partitions(); sort(topic_partitions.begin(), topic_partitions.end()); CHECK(topic_partitions == expected_topic_partitions); diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index c388bab1..007f2e67 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -28,8 +28,6 @@ using std::ref; using namespace cppkafka; -static const string KAFKA_TOPIC = "cppkafka_test1"; - static Configuration make_producer_config() { Configuration config = { { "metadata.broker.list", KAFKA_TEST_INSTANCE }, @@ -54,7 +52,7 @@ void producer_run(BufferedProducer& producer, int& exit_flag, condition_variable& clear, int num_messages, int partition) { - MessageBuilder builder(KAFKA_TOPIC); + MessageBuilder builder(KAFKA_TOPICS[0]); string key("wassup?"); string payload("nothing much!"); @@ -93,7 +91,7 @@ TEST_CASE("simple production", "[producer]") { // Create a consumer and assign this topic/partition Consumer consumer(make_consumer_config()); - consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) }); ConsumerRunner runner(consumer, 1, 1); Configuration config = make_producer_config(); @@ -101,7 +99,7 @@ TEST_CASE("simple production", "[producer]") { // Now create a producer and produce a message const string payload = "Hello world! 1"; Producer producer(config); - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload)); runner.try_join(); const auto& messages = runner.get_messages(); @@ -109,13 +107,13 @@ TEST_CASE("simple production", "[producer]") { const auto& message = messages[0]; CHECK(message.get_payload() == payload); CHECK(!!message.get_key() == false); - CHECK(message.get_topic() == KAFKA_TOPIC); + CHECK(message.get_topic() == KAFKA_TOPICS[0]); CHECK(message.get_partition() == partition); CHECK(!!message.get_error() == false); int64_t low; int64_t high; - tie(low, high) = producer.query_offsets({ KAFKA_TOPIC, partition }); + tie(low, high) = producer.query_offsets({ KAFKA_TOPICS[0], partition }); CHECK(high > low); } @@ -124,7 +122,7 @@ TEST_CASE("simple production", "[producer]") { const string key = "such key"; const milliseconds timestamp{15}; Producer producer(config); - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition) + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition) .key(key) .payload(payload) .timestamp(timestamp)); @@ -135,7 +133,7 @@ TEST_CASE("simple production", "[producer]") { const auto& message = messages[0]; CHECK(message.get_payload() == payload); CHECK(message.get_key() == key); - CHECK(message.get_topic() == KAFKA_TOPIC); + CHECK(message.get_topic() == KAFKA_TOPICS[0]); CHECK(message.get_partition() == partition); CHECK(!!message.get_error() == false); REQUIRE(!!message.get_timestamp() == true); @@ -147,7 +145,7 @@ TEST_CASE("simple production", "[producer]") { const string key = "replay key"; const milliseconds timestamp{15}; Producer producer(config); - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition) + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition) .key(key) .payload(payload) .timestamp(timestamp)); @@ -167,7 +165,7 @@ TEST_CASE("simple production", "[producer]") { const auto& message = messages[0]; CHECK(message.get_payload() == payload); CHECK(message.get_key() == key); - CHECK(message.get_topic() == KAFKA_TOPIC); + CHECK(message.get_topic() == KAFKA_TOPICS[0]); CHECK(message.get_partition() == partition); CHECK(!!message.get_error() == false); REQUIRE(!!message.get_timestamp() == true); @@ -188,14 +186,14 @@ TEST_CASE("simple production", "[producer]") { topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key, int32_t partition_count) { CHECK(msg_key == key); - CHECK(partition_count == 3); - CHECK(topic.get_name() == KAFKA_TOPIC); + CHECK(partition_count == KAFKA_NUM_PARTITIONS); + CHECK(topic.get_name() == KAFKA_TOPICS[0]); return 0; }); config.set_default_topic_configuration(topic_config); Producer producer(config); - producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).key(key).payload(payload)); while (producer.get_out_queue_length() > 0) { producer.poll(); } @@ -206,7 +204,7 @@ TEST_CASE("simple production", "[producer]") { const auto& message = messages[0]; CHECK(message.get_payload() == payload); CHECK(message.get_key() == key); - CHECK(message.get_topic() == KAFKA_TOPIC); + CHECK(message.get_topic() == KAFKA_TOPICS[0]); CHECK(message.get_partition() == partition); CHECK(!!message.get_error() == false); CHECK(delivery_report_called == true); @@ -222,15 +220,15 @@ TEST_CASE("simple production", "[producer]") { topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key, int32_t partition_count) { CHECK(msg_key == key); - CHECK(partition_count == 3); - CHECK(topic.get_name() == KAFKA_TOPIC); + CHECK(partition_count == KAFKA_NUM_PARTITIONS); + CHECK(topic.get_name() == KAFKA_TOPICS[0]); callback_called = true; return 0; }); config.set_default_topic_configuration(topic_config); Producer producer(config); - producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).key(key).payload(payload)); producer.poll(); runner.try_join(); @@ -244,13 +242,12 @@ TEST_CASE("simple production", "[producer]") { TEST_CASE("multiple messages", "[producer]") { size_t message_count = 10; - int partitions = 3; set payloads; // Create a consumer and subscribe to this topic Consumer consumer(make_consumer_config()); - consumer.subscribe({ KAFKA_TOPIC }); - ConsumerRunner runner(consumer, message_count, partitions); + consumer.subscribe({ KAFKA_TOPICS[0] }); + ConsumerRunner runner(consumer, message_count, KAFKA_NUM_PARTITIONS); // Now create a producer and produce a message Producer producer(make_producer_config()); @@ -258,19 +255,19 @@ TEST_CASE("multiple messages", "[producer]") { for (size_t i = 0; i < message_count; ++i) { const string payload = payload_base + to_string(i); payloads.insert(payload); - producer.produce(MessageBuilder(KAFKA_TOPIC).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload)); } runner.try_join(); const auto& messages = runner.get_messages(); REQUIRE(messages.size() == message_count); for (const auto& message : messages) { - CHECK(message.get_topic() == KAFKA_TOPIC); + CHECK(message.get_topic() == KAFKA_TOPICS[0]); CHECK(payloads.erase(message.get_payload()) == 1); CHECK(!!message.get_error() == false); CHECK(!!message.get_key() == false); CHECK(message.get_partition() >= 0); - CHECK(message.get_partition() < 3); + CHECK(message.get_partition() < KAFKA_NUM_PARTITIONS); } } @@ -279,22 +276,22 @@ TEST_CASE("buffered producer", "[producer][buffered_producer]") { // Create a consumer and assign this topic/partition Consumer consumer(make_consumer_config()); - consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) }); ConsumerRunner runner(consumer, 3, 1); // Now create a buffered producer and produce two messages BufferedProducer producer(make_producer_config()); const string payload = "Hello world! 2"; const string key = "such key"; - producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition) + producer.add_message(MessageBuilder(KAFKA_TOPICS[0]).partition(partition) .key(key) .payload(payload)); - producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.add_message(producer.make_builder(KAFKA_TOPICS[0]).partition(partition).payload(payload)); producer.flush(); - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload)); producer.wait_for_acks(); // Add another one but then clear it - producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.add_message(producer.make_builder(KAFKA_TOPICS[0]).partition(partition).payload(payload)); producer.clear(); runner.try_join(); @@ -302,7 +299,7 @@ TEST_CASE("buffered producer", "[producer][buffered_producer]") { REQUIRE(messages.size() == 3); const auto& message = messages[0]; CHECK(message.get_key() == key); - CHECK(message.get_topic() == KAFKA_TOPIC); + CHECK(message.get_topic() == KAFKA_TOPICS[0]); CHECK(message.get_partition() == partition); CHECK(!!message.get_error() == false); @@ -319,7 +316,7 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") { // Create a consumer and assign this topic/partition Consumer consumer(make_consumer_config()); - consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) }); ConsumerRunner runner(consumer, 3, 1); // Now create a buffered producer and produce two messages @@ -332,7 +329,7 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") { // Limit the size of the internal buffer producer.set_max_buffer_size(num_messages-1); while (num_messages--) { - producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition).key(key).payload(payload)); + producer.add_message(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).key(key).payload(payload)); } REQUIRE(producer.get_buffer_size() == 1); @@ -354,7 +351,7 @@ TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") { // Create a consumer and assign this topic/partition Consumer consumer(make_consumer_config()); - consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) }); ConsumerRunner runner(consumer, num_messages, 1); BufferedProducer producer(make_producer_config()); diff --git a/tests/roundrobin_poll_test.cpp b/tests/roundrobin_poll_test.cpp new file mode 100644 index 00000000..99414dd4 --- /dev/null +++ b/tests/roundrobin_poll_test.cpp @@ -0,0 +1,164 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "cppkafka/cppkafka.h" +#include "test_utils.h" + +using std::vector; +using std::move; +using std::string; +using std::thread; +using std::set; +using std::mutex; +using std::tie; +using std::condition_variable; +using std::lock_guard; +using std::unique_lock; +using std::unique_ptr; +using std::make_move_iterator; +using std::chrono::seconds; +using std::chrono::milliseconds; +using std::chrono::system_clock; + +using namespace cppkafka; + +//================================================================================== +// Helper functions +//================================================================================== +static Configuration make_producer_config() { + Configuration config; + config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); + return config; +} + +static Configuration make_consumer_config(const string& group_id = "rr_consumer_test") { + Configuration config; + config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); + config.set("enable.auto.commit", true); + config.set("enable.auto.offset.store", true ); + config.set("auto.commit.interval.ms", 100); + config.set("group.id", group_id); + return config; +} + +static vector make_roundrobin_partition_vector(int total_messages) { + vector partition_order; + for (int i = 0, partition = 0; i < total_messages+1; ++i) { + if ((i % KAFKA_NUM_PARTITIONS) == 0) { + partition = 0; + } + partition_order.push_back(partition++); + } + return partition_order; +} + +//======================================================================== +// TESTS +//======================================================================== + +TEST_CASE("serial consumer test", "[roundrobin consumer]") { + int messages_per_partition = 3; + int total_messages = KAFKA_NUM_PARTITIONS * messages_per_partition; + + // Create a consumer and subscribe to the topic + Consumer consumer(make_consumer_config()); + TopicPartitionList partitions; + for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace_back(KAFKA_TOPICS[0], i++)); + consumer.assign(partitions); + + // Start the runner with the original consumer + ConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS); + + // Produce messages so we stop the consumer + Producer producer(make_producer_config()); + string payload = "Serial"; + + // push 3 messages in each partition + for (int i = 0; i < total_messages; ++i) { + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload)); + } + producer.flush(); + runner.try_join(); + + // Check that we have all messages + REQUIRE(runner.get_messages().size() == total_messages); + + // messages should have sequential identical partition ids in groups of + int expected_partition; + for (int i = 0; i < total_messages; ++i) { + if ((i % messages_per_partition) == 0) { + expected_partition = runner.get_messages()[i].get_partition(); + } + REQUIRE(runner.get_messages()[i].get_partition() == expected_partition); + REQUIRE((string)runner.get_messages()[i].get_payload() == payload); + } +} + +TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") { + TopicPartitionList assignment; + int messages_per_partition = 3; + int total_messages = KAFKA_NUM_PARTITIONS * messages_per_partition; + + // Create a consumer and subscribe to the topic + PollStrategyAdapter consumer(make_consumer_config()); + consumer.subscribe({ KAFKA_TOPICS[0] }); + consumer.add_polling_strategy(unique_ptr(new RoundRobinPollStrategy(consumer))); + + PollConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS); + + // Produce messages so we stop the consumer + Producer producer(make_producer_config()); + string payload = "RoundRobin"; + + // push 3 messages in each partition + for (int i = 0; i < total_messages; ++i) { + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload)); + } + producer.flush(); + runner.try_join(); + + // Check that we have all messages + REQUIRE(runner.get_messages().size() == total_messages); + + // Check that we have one message from each partition in desired order + vector partition_order = make_roundrobin_partition_vector(total_messages+KAFKA_NUM_PARTITIONS); + int partition_idx; + for (int i = 0; i < total_messages; ++i) { + if (i == 0) { + // find first polled partition index + partition_idx = runner.get_messages()[i].get_partition(); + } + REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+partition_idx]); + REQUIRE((string)runner.get_messages()[i].get_payload() == payload); + } + + //============ resume original poll strategy =============// + + //validate that once the round robin strategy is deleted, normal poll works as before + consumer.delete_polling_strategy(); + + ConsumerRunner serial_runner(consumer, total_messages, KAFKA_NUM_PARTITIONS); + + payload = "SerialPolling"; + // push 3 messages in each partition + for (int i = 0; i < total_messages; ++i) { + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload)); + } + producer.flush(); + serial_runner.try_join(); + + // Check that we have all messages + REQUIRE(serial_runner.get_messages().size() == total_messages); + + for (int i = 0; i < total_messages; ++i) { + REQUIRE((string)serial_runner.get_messages()[i].get_payload() == payload); + } +} + diff --git a/tests/test_main.cpp b/tests/test_main.cpp index 6eec4ca1..5c41d07c 100644 --- a/tests/test_main.cpp +++ b/tests/test_main.cpp @@ -15,6 +15,9 @@ using Catch::TestCaseStats; using Catch::Totals; using Catch::Session; +std::vector KAFKA_TOPICS = {"cppkafka_test1", "cppkafka_test2"}; +int KAFKA_NUM_PARTITIONS = 3; + namespace cppkafka { class InstantTestReporter : public ConsoleReporter { diff --git a/tests/test_utils.cpp b/tests/test_utils.cpp deleted file mode 100644 index 8eff92a2..00000000 --- a/tests/test_utils.cpp +++ /dev/null @@ -1,91 +0,0 @@ -#include -#include -#include -#include "test_utils.h" -#include "cppkafka/utils/consumer_dispatcher.h" - -using std::vector; -using std::move; -using std::thread; -using std::mutex; -using std::lock_guard; -using std::unique_lock; -using std::condition_variable; - -using std::chrono::system_clock; -using std::chrono::milliseconds; -using std::chrono::seconds; - -using cppkafka::Consumer; -using cppkafka::ConsumerDispatcher; -using cppkafka::Message; -using cppkafka::TopicPartition; - -ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions) -: consumer_(consumer) { - bool booted = false; - mutex mtx; - condition_variable cond; - thread_ = thread([&, expected, partitions]() { - consumer_.set_timeout(milliseconds(500)); - size_t number_eofs = 0; - auto start = system_clock::now(); - ConsumerDispatcher dispatcher(consumer_); - dispatcher.run( - // Message callback - [&](Message msg) { - if (number_eofs == partitions) { - messages_.push_back(move(msg)); - } - }, - // EOF callback - [&](ConsumerDispatcher::EndOfFile, const TopicPartition& topic_partition) { - if (number_eofs != partitions) { - number_eofs++; - if (number_eofs == partitions) { - lock_guard _(mtx); - booted = true; - cond.notify_one(); - } - } - }, - // Every time there's any event callback - [&](ConsumerDispatcher::Event) { - if (expected > 0 && messages_.size() == expected) { - dispatcher.stop(); - } - if (expected == 0 && number_eofs >= partitions) { - dispatcher.stop(); - } - if (system_clock::now() - start >= seconds(20)) { - dispatcher.stop(); - } - } - ); - if (number_eofs < partitions) { - lock_guard _(mtx); - booted = true; - cond.notify_one(); - } - }); - - unique_lock lock(mtx); - while (!booted) { - cond.wait(lock); - } -} - -ConsumerRunner::~ConsumerRunner() { - try_join(); -} - -const vector& ConsumerRunner::get_messages() const { - return messages_; -} - -void ConsumerRunner::try_join() { - if (thread_.joinable()) { - thread_.join(); - } -} - diff --git a/tests/test_utils.h b/tests/test_utils.h index 310989ef..b6943e6e 100644 --- a/tests/test_utils.h +++ b/tests/test_utils.h @@ -4,21 +4,62 @@ #include #include #include "cppkafka/consumer.h" +#include "cppkafka/utils/roundrobin_poll_strategy.h" +#include "cppkafka/utils/consumer_dispatcher.h" -class ConsumerRunner { +extern const std::vector KAFKA_TOPICS; +extern const int KAFKA_NUM_PARTITIONS; + +using namespace cppkafka; + +//================================================================================== +// BasicConsumerRunner +//================================================================================== +template +class BasicConsumerRunner { public: - ConsumerRunner(cppkafka::Consumer& consumer, size_t expected, size_t partitions); - ConsumerRunner(const ConsumerRunner&) = delete; - ConsumerRunner& operator=(const ConsumerRunner&) = delete; - ~ConsumerRunner(); + BasicConsumerRunner(ConsumerType& consumer, + size_t expected, + size_t partitions); + BasicConsumerRunner(const BasicConsumerRunner&) = delete; + BasicConsumerRunner& operator=(const BasicConsumerRunner&) = delete; + ~BasicConsumerRunner(); const std::vector& get_messages() const; void try_join(); private: - cppkafka::Consumer& consumer_; + ConsumerType& consumer_; std::thread thread_; std::vector messages_; }; +//================================================================================== +// PollStrategyAdapter +//================================================================================== +/** + * \brief Specific implementation which can be used with other + * util classes such as BasicConsumerDispatcher. + */ +class PollStrategyAdapter : public Consumer { +public: + PollStrategyAdapter(Configuration config); + void add_polling_strategy(std::unique_ptr poll_strategy); + void delete_polling_strategy(); + Message poll(); + Message poll(std::chrono::milliseconds timeout); + MessageList poll_batch(size_t max_batch_size); + MessageList poll_batch(size_t max_batch_size, + std::chrono::milliseconds timeout); + void set_timeout(std::chrono::milliseconds timeout); + std::chrono::milliseconds get_timeout(); +private: + std::unique_ptr strategy_; +}; + +using PollConsumerRunner = BasicConsumerRunner; +using ConsumerRunner = BasicConsumerRunner; + +#include "test_utils_impl.h" + #endif // CPPKAFKA_TEST_UTILS_H diff --git a/tests/test_utils_impl.h b/tests/test_utils_impl.h new file mode 100644 index 00000000..e978de24 --- /dev/null +++ b/tests/test_utils_impl.h @@ -0,0 +1,172 @@ +#include +#include +#include +#include "test_utils.h" +#include "cppkafka/utils/consumer_dispatcher.h" + +using std::vector; +using std::move; +using std::thread; +using std::mutex; +using std::lock_guard; +using std::unique_lock; +using std::condition_variable; +using std::chrono::system_clock; +using std::chrono::milliseconds; +using std::chrono::seconds; + +using cppkafka::Consumer; +using cppkafka::BasicConsumerDispatcher; + +using cppkafka::Message; +using cppkafka::MessageList; +using cppkafka::TopicPartition; + +//================================================================================== +// BasicConsumerRunner +//================================================================================== +template +BasicConsumerRunner::BasicConsumerRunner(ConsumerType& consumer, + size_t expected, + size_t partitions) +: consumer_(consumer) { + bool booted = false; + mutex mtx; + condition_variable cond; + thread_ = thread([&, expected, partitions]() { + consumer_.set_timeout(milliseconds(500)); + size_t number_eofs = 0; + auto start = system_clock::now(); + BasicConsumerDispatcher dispatcher(consumer_); + dispatcher.run( + // Message callback + [&](Message msg) { + if (number_eofs == partitions) { + messages_.push_back(move(msg)); + } + }, + // EOF callback + [&](typename BasicConsumerDispatcher::EndOfFile, const TopicPartition& topic_partition) { + if (number_eofs != partitions) { + number_eofs++; + if (number_eofs == partitions) { + lock_guard _(mtx); + booted = true; + cond.notify_one(); + } + } + }, + // Every time there's any event callback + [&](typename BasicConsumerDispatcher::Event) { + if (expected > 0 && messages_.size() == expected) { + dispatcher.stop(); + } + if (expected == 0 && number_eofs >= partitions) { + dispatcher.stop(); + } + if (system_clock::now() - start >= seconds(20)) { + dispatcher.stop(); + } + } + ); + // dispatcher has stopped + if (number_eofs < partitions) { + lock_guard _(mtx); + booted = true; + cond.notify_one(); + } + }); + + unique_lock lock(mtx); + while (!booted) { + cond.wait(lock); + } +} + +template +BasicConsumerRunner::~BasicConsumerRunner() { + try_join(); +} + +template +const MessageList& BasicConsumerRunner::get_messages() const { + return messages_; +} + +template +void BasicConsumerRunner::try_join() { + if (thread_.joinable()) { + thread_.join(); + } +} + +//================================================================================== +// PollStrategyAdapter +//================================================================================== +inline +PollStrategyAdapter::PollStrategyAdapter(Configuration config) + : Consumer(config) { +} + +inline +void PollStrategyAdapter::add_polling_strategy(std::unique_ptr poll_strategy) { + strategy_ = std::move(poll_strategy); +} + +inline +void PollStrategyAdapter::delete_polling_strategy() { + strategy_.reset(); +} + +inline +Message PollStrategyAdapter::poll() { + if (strategy_) { + return strategy_->poll(); + } + return Consumer::poll(); +} + +inline +Message PollStrategyAdapter::poll(milliseconds timeout) { + if (strategy_) { + return strategy_->poll(timeout); + } + return Consumer::poll(timeout); +} + +inline +MessageList PollStrategyAdapter::poll_batch(size_t max_batch_size) { + if (strategy_) { + return strategy_->poll_batch(max_batch_size); + } + return Consumer::poll_batch(max_batch_size); +} + +inline +MessageList PollStrategyAdapter::poll_batch(size_t max_batch_size, + milliseconds timeout) { + if (strategy_) { + return strategy_->poll_batch(max_batch_size, timeout); + } + return Consumer::poll_batch(max_batch_size, timeout); +} + +inline +void PollStrategyAdapter::set_timeout(milliseconds timeout) { + if (strategy_) { + strategy_->set_timeout(timeout); + } + else { + Consumer::set_timeout(timeout); + } +} + +inline +milliseconds PollStrategyAdapter::get_timeout() { + if (strategy_) { + return strategy_->get_timeout(); + } + return Consumer::get_timeout(); +} + +