Skip to content

round robin polling for assigned partitions #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 30, 2018
Merged
104 changes: 79 additions & 25 deletions include/cppkafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#include <chrono>
#include <functional>
#include "kafka_handle_base.h"
#include "message.h"
#include "queue.h"
#include "macros.h"
#include "error.h"

Expand All @@ -54,7 +54,7 @@ class TopicConfiguration;
* Semi-simple code showing how to use this class
*
* \code
* // Create a configuration and set the group.id and broker list fields
* // Create a configuration and set the group.id and broker list fields
* Configuration config = {
* { "metadata.broker.list", "127.0.0.1:9092" },
* { "group.id", "foo" }
Expand All @@ -74,13 +74,13 @@ class TopicConfiguration;
* consumer.set_revocation_callback([&](const TopicPartitionList& topic_partitions) {
* cout << topic_partitions.size() << " partitions revoked!" << endl;
* });
*
* // Subscribe
*
* // Subscribe
* consumer.subscribe({ "my_topic" });
* while (true) {
* // Poll. This will optionally return a message. It's necessary to check if it's a valid
* // one before using it
* Message msg = consumer.poll();
* Message msg = consumer.poll();
* if (msg) {
* if (!msg.get_error()) {
* // It's an actual message. Get the payload and print it to stdout
Expand All @@ -103,28 +103,28 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {

/**
* \brief Creates an instance of a consumer.
*
* Note that the configuration *must contain* the group.id attribute set or this
*
* Note that the configuration *must contain* the group.id attribute set or this
* will throw.
*
* \param config The configuration to be used
*/
*/
Consumer(Configuration config);
Consumer(const Consumer&) = delete;
Consumer(Consumer&&) = delete;
Consumer& operator=(const Consumer&) = delete;
Consumer& operator=(Consumer&&) = delete;

/**
* \brief Closes and estroys the rdkafka handle
* \brief Closes and destroys the rdkafka handle
*
* This will call Consumer::close before destroying the handle
*/
~Consumer();

/**
* \brief Sets the topic/partition assignment callback
*
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
* and executing the assignment/revocation/rebalance_error callbacks.
Expand All @@ -138,7 +138,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {

/**
* \brief Sets the topic/partition revocation callback
*
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
* and executing the assignment/revocation/rebalance_error callbacks.
Expand All @@ -153,7 +153,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {

/**
* \brief Sets the rebalance error callback
*
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
* and executing the assignment/revocation/rebalance_error callbacks.
Expand Down Expand Up @@ -188,9 +188,9 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
/**
* \brief Unassigns the current topic/partition assignment
*
* This translates into a call to rd_kafka_assign using a null as the topic partition list
* This translates into a call to rd_kafka_assign using a null as the topic partition list
* parameter
*/
*/
void unassign();

/**
Expand Down Expand Up @@ -262,7 +262,9 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
*
* This translates into a call to rd_kafka_get_watermark_offsets
*
* \param topic_partition The topic/partition to get the offsets from
* \param topic_partition The topic/partition to get the offsets from
*
* \return A pair of offsets {low, high}
*/
OffsetTuple get_offsets(const TopicPartition& topic_partition) const;

Expand All @@ -272,6 +274,8 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
* This translates into a call to rd_kafka_committed
*
* \param topic_partitions The topic/partition list to be queried
*
* \return The topic partition list
*/
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const;

Expand All @@ -281,6 +285,8 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
* This translates into a call to rd_kafka_position
*
* \param topic_partitions The topic/partition list to be queried
*
* \return The topic partition list
*/
TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const;

Expand All @@ -295,28 +301,38 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
* \brief Gets the current topic/partition list assignment
*
* This translates to a call to rd_kafka_assignment
*
* \return The topic partition list
*/
TopicPartitionList get_assignment() const;

/**
* \brief Gets the group member id
*
* This translates to a call to rd_kafka_memberid
*
* \return The id
*/
std::string get_member_id() const;

/**
* Gets the partition assignment callback.
* \brief Gets the partition assignment callback.
*
* \return The callback reference
*/
const AssignmentCallback& get_assignment_callback() const;

/**
* Gets the partition revocation callback.
* \brief Gets the partition revocation callback.
*
* \return The callback reference
*/
const RevocationCallback& get_revocation_callback() const;

/**
* Gets the rebalance error callback.
* \brief Gets the rebalance error callback.
*
* \return The callback reference
*/
const RebalanceErrorCallback& get_rebalance_error_callback() const;

Expand All @@ -326,16 +342,16 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
* This will call rd_kafka_consumer_poll.
*
* Note that you need to call poll periodically as a keep alive mechanism, otherwise the broker
* will think this consumer is down and will trigger a rebalance (if using dynamic
* will think this consumer is down and will trigger a rebalance (if using dynamic
* subscription).
*
* The timeout used on this call will be the one configured via Consumer::set_timeout.
*
* The returned message *might* be empty. If's necessary to check that it's a valid one before
* using it:
*
* \return A message. The returned message *might* be empty. It's necessary to check
* that it's valid before using it:
*
* \code
* Message msg = consumer.poll();
* Message msg = consumer.poll();
* if (msg) {
* // It's a valid message!
* }
Expand All @@ -350,6 +366,8 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
* instead of the one configured on this Consumer.
*
* \param timeout The timeout to be used on this call
*
* \return A message
*/
Message poll(std::chrono::milliseconds timeout);

Expand All @@ -359,8 +377,10 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
* This can return one or more messages
*
* \param max_batch_size The maximum amount of messages expected
*
* \return A list of messages
*/
std::vector<Message> poll_batch(size_t max_batch_size);
MessageList poll_batch(size_t max_batch_size);

/**
* \brief Polls for a batch of messages
Expand All @@ -369,8 +389,42 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
*
* \param max_batch_size The maximum amount of messages expected
* \param timeout The timeout for this operation
*
* \return A list of messages
*/
MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);

/**
* \brief Get the global event queue servicing this consumer corresponding to
* rd_kafka_queue_get_main and which is polled via rd_kafka_poll
*
* \return A Queue object
*
* \remark Note that this call will disable forwarding to the consumer_queue.
* To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue)
*/
Queue get_main_queue() const;

/**
* \brief Get the consumer group queue servicing corresponding to
* rd_kafka_queue_get_consumer and which is polled via rd_kafka_consumer_poll
*
* \return A Queue object
*/
Queue get_consumer_queue() const;

/**
* \brief Get the queue belonging to this partition. If the consumer is not assigned to this
* partition, an empty queue will be returned
*
* \param partition The partition object
*
* \return A Queue object
*
* \remark Note that this call will disable forwarding to the consumer_queue.
* To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue)
*/
std::vector<Message> poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
Queue get_partition_queue(const TopicPartition& partition) const;
private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque);
Expand Down
4 changes: 4 additions & 0 deletions include/cppkafka/cppkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <cppkafka/message_builder.h>
#include <cppkafka/metadata.h>
#include <cppkafka/producer.h>
#include <cppkafka/queue.h>
#include <cppkafka/topic.h>
#include <cppkafka/topic_configuration.h>
#include <cppkafka/topic_partition.h>
Expand All @@ -55,5 +56,8 @@
#include <cppkafka/utils/buffered_producer.h>
#include <cppkafka/utils/compacted_topic_processor.h>
#include <cppkafka/utils/consumer_dispatcher.h>
#include <cppkafka/utils/poll_interface.h>
#include <cppkafka/utils/poll_strategy_base.h>
#include <cppkafka/utils/roundrobin_poll_strategy.h>

#endif
12 changes: 12 additions & 0 deletions include/cppkafka/exceptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ class CPPKAFKA_API ConsumerException : public Exception {
Error error_;
};

/**
* Queue exception for rd_kafka_queue_t errors
*/
class CPPKAFKA_API QueueException : public Exception {
public:
QueueException(Error error);

Error get_error() const;
private:
Error error_;
};

} // cppkafka

#endif // CPPKAFKA_EXCEPTIONS_H
2 changes: 2 additions & 0 deletions include/cppkafka/group_information.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class CPPKAFKA_API GroupInformation {
std::vector<GroupMemberInformation> members_;
};

using GroupInformationList = std::vector<GroupInformation>;

} // cppkafka

#endif // CPPKAFKA_GROUP_INFORMATION_H
Loading