diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 9e009324..5c909853 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -101,6 +101,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { using AssignmentCallback = std::function; using RevocationCallback = std::function; using RebalanceErrorCallback = std::function; + using KafkaHandleBase::pause; /** * \brief Creates an instance of a consumer. @@ -202,7 +203,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { /** * \brief Resumes all consumption */ - void resume(); + void resume(); /** * \brief Commits the current partition assignment diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 30b679e9..334c6be4 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -76,6 +76,13 @@ class CPPKAFKA_API KafkaHandleBase { */ void pause_partitions(const TopicPartitionList& topic_partitions); + /** + * \brief Pauses consumption/production for this topic + * + * \param topic The topic name + */ + void pause(const std::string& topic); + /** * \brief Resumes consumption/production from the given topic/partition list * @@ -84,6 +91,13 @@ class CPPKAFKA_API KafkaHandleBase { * \param topic_partitions The topic/partition list to resume consuming/producing from/to */ void resume_partitions(const TopicPartitionList& topic_partitions); + + /** + * \brief Resumes consumption/production for this topic + * + * \param topic The topic name + */ + void resume(const std::string& topic); /** * \brief Sets the timeout for operations that require a timeout diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 358a0fc6..c1c0e077 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -78,6 +78,7 @@ class Message; */ class CPPKAFKA_API Producer : public KafkaHandleBase { public: + using KafkaHandleBase::pause; /** * The policy to use for the payload. The default policy is COPY_PAYLOAD */ diff --git a/include/cppkafka/topic_partition_list.h b/include/cppkafka/topic_partition_list.h index d07bb6ee..19e90626 100644 --- a/include/cppkafka/topic_partition_list.h +++ b/include/cppkafka/topic_partition_list.h @@ -41,6 +41,7 @@ namespace cppkafka { class TopicPartition; +class PartitionMetadata; using TopicPartitionsListPtr = std::unique_ptr; @@ -53,6 +54,8 @@ using TopicPartitionList = std::vector; CPPKAFKA_API TopicPartitionsListPtr convert(const TopicPartitionList& topic_partitions); CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_partitions); CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions); +CPPKAFKA_API TopicPartitionList convert(const std::string& topic, + const std::vector& partition_metadata); CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle); // Extracts a partition list subset belonging to the provided topics (case-insensitive) diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 9d470395..c3daba10 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -64,6 +64,10 @@ void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partition check_error(error); } +void KafkaHandleBase::pause(const std::string& topic) { + pause_partitions(convert(topic, get_metadata(get_topic(topic)).get_partitions())); +} + void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitions) { TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(), @@ -71,6 +75,10 @@ void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitio check_error(error); } +void KafkaHandleBase::resume(const std::string& topic) { + resume_partitions(convert(topic, get_metadata(get_topic(topic)).get_partitions())); +} + void KafkaHandleBase::set_timeout(milliseconds timeout) { timeout_ms_ = timeout; } diff --git a/src/topic_partition_list.cpp b/src/topic_partition_list.cpp index 029fce9b..90d65347 100644 --- a/src/topic_partition_list.cpp +++ b/src/topic_partition_list.cpp @@ -32,6 +32,7 @@ #include "topic_partition_list.h" #include "topic_partition.h" #include "exceptions.h" +#include "metadata.h" using std::vector; using std::set; @@ -66,6 +67,16 @@ TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions) { return output; } +TopicPartitionList convert(const std::string& topic, + const std::vector& partition_metadata) +{ + TopicPartitionList output; + for (const auto& meta : partition_metadata) { + output.emplace_back(topic, meta.get_id()); + } + return output; +} + TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) { return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy); }