Skip to content

Allow metadata object to be non-owning #73

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion include/cppkafka/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,24 @@ class CPPKAFKA_API BrokerMetadata {
*/
class CPPKAFKA_API Metadata {
public:
Metadata(const rd_kafka_metadata_t* ptr);
/**
* \brief Creates a Metadata object that doesn't take ownership of the handle
*
* \param handle The handle to be used
*/
static Metadata make_non_owning(const rd_kafka_metadata_t* handle);

/**
* \brief Constructs an empty metadata object
*
* \remark Using any methods except Metadata::get_handle on an empty metadata is undefined behavior
*/
Metadata();

/**
* Constructor
*/
Metadata(const rd_kafka_metadata_t* handle);

/**
* Gets the brokers' metadata
Expand All @@ -165,8 +182,22 @@ class CPPKAFKA_API Metadata {
* \param prefix The prefix to be looked up
*/
std::vector<TopicMetadata> get_topics_prefixed(const std::string& prefix) const;

/**
* Indicates whether this metadata is valid (not null)
*/
explicit operator bool() const;

/**
* Returns the rdkakfa handle
*/
const rd_kafka_metadata_t* get_handle() const;
private:
using HandlePtr = std::unique_ptr<const rd_kafka_metadata_t, decltype(&rd_kafka_metadata_destroy)>;

struct NonOwningTag { };

Metadata(const rd_kafka_metadata_t* handle, NonOwningTag);

HandlePtr handle_;
};
Expand Down
7 changes: 7 additions & 0 deletions include/cppkafka/topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class CPPKAFKA_API Topic {
* \param partition The partition to check
*/
bool is_partition_available(int partition) const;

/**
* Indicates whether this topic is valid (not null)
*/
explicit operator bool() const {
return handle_ != nullptr;
}

/**
* Returns the rdkakfa handle
Expand Down
36 changes: 34 additions & 2 deletions src/metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
*
*/

#include <assert.h>
#include "metadata.h"
#include "error.h"

Expand Down Expand Up @@ -110,12 +111,31 @@ uint16_t BrokerMetadata::get_port() const {

// Metadata

Metadata::Metadata(const rd_kafka_metadata_t* ptr)
: handle_(ptr, &rd_kafka_metadata_destroy) {
void dummy_metadata_destroyer(const rd_kafka_metadata_t*) {

}

Metadata Metadata::make_non_owning(const rd_kafka_metadata_t* handle) {
return Metadata(handle, NonOwningTag{});
}

Metadata::Metadata()
: handle_(nullptr, nullptr) {

}

Metadata::Metadata(const rd_kafka_metadata_t* handle)
: handle_(handle, &rd_kafka_metadata_destroy) {

}

Metadata::Metadata(const rd_kafka_metadata_t* handle, NonOwningTag)
: handle_(handle, &dummy_metadata_destroyer) {

}

vector<BrokerMetadata> Metadata::get_brokers() const {
assert(handle_);
vector<BrokerMetadata> output;
for (int i = 0; i < handle_->broker_cnt; ++i) {
const rd_kafka_metadata_broker_t& broker = handle_->brokers[i];
Expand All @@ -125,6 +145,7 @@ vector<BrokerMetadata> Metadata::get_brokers() const {
}

vector<TopicMetadata> Metadata::get_topics() const {
assert(handle_);
vector<TopicMetadata> output;
for (int i = 0; i < handle_->topic_cnt; ++i) {
const rd_kafka_metadata_topic_t& topic = handle_->topics[i];
Expand All @@ -134,6 +155,7 @@ vector<TopicMetadata> Metadata::get_topics() const {
}

vector<TopicMetadata> Metadata::get_topics(const unordered_set<string>& topics) const {
assert(handle_);
vector<TopicMetadata> output;
for (int i = 0; i < handle_->topic_cnt; ++i) {
const rd_kafka_metadata_topic_t& topic = handle_->topics[i];
Expand All @@ -145,6 +167,7 @@ vector<TopicMetadata> Metadata::get_topics(const unordered_set<string>& topics)
}

vector<TopicMetadata> Metadata::get_topics_prefixed(const string& prefix) const {
assert(handle_);
vector<TopicMetadata> output;
for (int i = 0; i < handle_->topic_cnt; ++i) {
const rd_kafka_metadata_topic_t& topic = handle_->topics[i];
Expand All @@ -156,4 +179,13 @@ vector<TopicMetadata> Metadata::get_topics_prefixed(const string& prefix) const
return output;
}


Metadata::operator bool() const {
return handle_ != nullptr;
}

const rd_kafka_metadata_t* Metadata::get_handle() const {
return handle_.get();
}

} // cppkafka