From b2219972470b916648023a1b3cba80052972fd72 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Tue, 2 Oct 2018 16:36:48 -0400 Subject: [PATCH 1/6] header support implementation --- include/cppkafka/buffer.h | 8 + include/cppkafka/clonable_ptr.h | 13 +- include/cppkafka/header.h | 154 +++++++++++++++ include/cppkafka/header_list.h | 297 +++++++++++++++++++++++++++++ include/cppkafka/message.h | 49 ++++- include/cppkafka/message_builder.h | 42 ++++ src/buffer.cpp | 19 ++ src/message.cpp | 8 + src/producer.cpp | 2 + 9 files changed, 579 insertions(+), 13 deletions(-) create mode 100644 include/cppkafka/header.h create mode 100644 include/cppkafka/header_list.h diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index 821e182f..afa61b5a 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -172,6 +172,14 @@ CPPKAFKA_API bool operator==(const Buffer& lhs, const Buffer& rhs); */ CPPKAFKA_API bool operator!=(const Buffer& lhs, const Buffer& rhs); +/** + * Compares Buffer objects lexicographically + */ +CPPKAFKA_API bool operator<(const Buffer& lhs, const Buffer& rhs); +CPPKAFKA_API bool operator<=(const Buffer& lhs, const Buffer& rhs); +CPPKAFKA_API bool operator>(const Buffer& lhs, const Buffer& rhs); +CPPKAFKA_API bool operator>=(const Buffer& lhs, const Buffer& rhs); + } // cppkafka #endif // CPPKAFKA_BUFFER_H diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 842e3088..36d7feee 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -41,7 +41,7 @@ template class ClonablePtr { public: /** - * Creates an instance + * \brief Creates an instance * * \param ptr The pointer to be wrapped * \param deleter The deleter functor @@ -65,7 +65,7 @@ class ClonablePtr { } /** - * Copies and assigns the given pointer + * \brief Copies and assigns the given pointer * * \param rhs The pointer to be copied */ @@ -79,11 +79,18 @@ class ClonablePtr { ~ClonablePtr() = default; /** - * Getter for the internal pointer + * \brief Getter for the internal pointer */ T* get() const { return handle_.get(); } + + /** + * \brief Indicates whether this clonable pointer is valid (not null) + */ + explicit operator bool() const { + return static_cast(handle_); + } private: std::unique_ptr handle_; Cloner cloner_; diff --git a/include/cppkafka/header.h b/include/cppkafka/header.h new file mode 100644 index 00000000..f3c9fb74 --- /dev/null +++ b/include/cppkafka/header.h @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_HEADER_H +#define CPPKAFKA_HEADER_H + +#include "buffer.h" +#include +#include + +namespace cppkafka { + +template +class Header { +public: + using ValueType = BufferType; + Header() = default; + + Header(const std::string& name, + const BufferType& value); + + Header(const std::string& name, + BufferType&& value); + + const std::string& get_name() const; + + const BufferType& get_value() const; + + BufferType& get_value(); + + operator bool() const; + +private: + template + T make_value(const T& other); + + Buffer make_value(const Buffer& other); + + std::string name_; + BufferType value_; +}; + +template +bool operator==(const Header& lhs, const Header& rhs) { + return std::tie(lhs.get_name(), lhs.get_value()) == std::tie(rhs.get_name(), rhs.get_value()); +} + +template +bool operator!=(const Header& lhs, const Header& rhs) { + return !(lhs == rhs); +} + +template +bool operator<(const Header& lhs, const Header& rhs) { + return std::tie(lhs.get_name(), lhs.get_value()) < std::tie(rhs.get_name(), rhs.get_value()); +} + +template +bool operator>(const Header& lhs, const Header& rhs) { + return std::tie(lhs.get_name(), lhs.get_value()) > std::tie(rhs.get_name(), rhs.get_value()); +} + +template +bool operator<=(const Header& lhs, const Header& rhs) { + return !(lhs > rhs); +} + +template +bool operator>=(const Header& lhs, const Header& rhs) { + return !(lhs < rhs); +} + +template +Header::Header(const std::string& name, + const BufferType& value) +: name_(name), + value_(make_value(value)) { + assert(!name.empty()); +} + +template +Header::Header(const std::string& name, + BufferType&& value) +: name_(name), + value_(std::move(value)) { + assert(!name.empty()); +} + +template +const std::string& Header::get_name() const { + return name_; +} + +template +const BufferType& Header::get_value() const { + return value_; +} + +template +BufferType& Header::get_value() { + return value_; +} + +template +Header::operator bool() const { + return !value_.empty(); +} + +template <> +inline +Header::operator bool() const { + return value_.get_size() > 0; +} + +template +template +T Header::make_value(const T& other) { + return other; +} + +template +Buffer Header::make_value(const Buffer& other) { + return Buffer(other.get_data(), other.get_size()); +} + +} //namespace cppkafka + +#endif //CPPKAFKA_HEADER_H diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h new file mode 100644 index 00000000..197ba77d --- /dev/null +++ b/include/cppkafka/header_list.h @@ -0,0 +1,297 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_HEADER_LIST_H +#define CPPKAFKA_HEADER_LIST_H + +#include +#include "clonable_ptr.h" +#include "header.h" +#include "exceptions.h" + +namespace cppkafka { + +template +class HeaderList; + +template +class HeaderIterator; + +template +bool operator==(const HeaderIterator& lhs, const HeaderIterator& rhs); + +template +class HeaderIterator { +public: + friend HeaderList; + using HeaderListType = HeaderList; + using BufferType = typename HeaderType::ValueType; + + friend bool operator==(const HeaderIterator& lhs, const HeaderIterator& rhs); + + HeaderIterator(HeaderListType headers, + size_t index) + : header_list_(std::move(headers)), + header_(index == -1 ? HeaderType() : headers.at(index)), //check for end() + index_(index) { + } + + // prefix increment + HeaderIterator& operator++() + { + if (*this == header_list_.end()) { + throw Exception("Over bounds"); + } + if (++index_ >= header_list_.size()) { + *this = header_list_.end(); + } + return *this; + } + + // postfix increment + HeaderIterator operator++(int) + { + HeaderIterator tmp(*this); + operator++(); + return tmp; + } + + // prefix decrement + HeaderIterator& operator--() + { + if (index_ == 0) { + throw Exception("Under bounds"); + } + if (*this == header_list_.end()) { + index_ = header_list_.size()-1; + } + else { + --index_; + } + return *this; + } + + // postfix decrement + HeaderIterator operator--(int) + { + HeaderIterator tmp(*this); + operator--(); + return tmp; + } + + HeaderType& operator*() const { + header_ = header_list_.at(index_); + return header_; + } + + HeaderType* operator->() const { + header_ = header_list_.at(index_); + return &header_; + } + +private: + HeaderListType header_list_; + HeaderType header_; + size_t index_; +}; + +template +bool operator==(const HeaderIterator& lhs, const HeaderIterator& rhs) { + return (lhs.header_list_ == rhs.header_list_) && (lhs.index_= rhs.index_); +} + +template +bool operator!=(const HeaderIterator& lhs, const HeaderIterator& rhs) { + return !(lhs == rhs); +} + +template +class HeaderList { +public: + using BufferType = typename HeaderType::ValueType; + using Iterator = HeaderIterator; + /** + * Constructs a message that won't take ownership of the given pointer + */ + static HeaderList make_non_owning(rd_kafka_headers_t* handle); + + HeaderList(); + explicit HeaderList(size_t reserve); + explicit HeaderList(const rd_kafka_headers_t* handle); + + Error add(const HeaderType& header); + Error remove(const std::string& name); + HeaderType at(size_t index) const; //throws + HeaderType front() const; + HeaderType back() const; + HeaderType back(const std::string& name) const; + size_t size() const; + bool empty() const; + Iterator begin() const; + Iterator end() const; + rd_kafka_headers_t* get_handle() const; + +private: + struct NonOwningTag { }; + static void dummy_deleter(rd_kafka_headers_t*) {} + static rd_kafka_headers_t* dummy_cloner(const rd_kafka_headers_t* handle) { return const_cast(handle); } + + using HandlePtr = ClonablePtr; + + HeaderList(rd_kafka_headers_t* handle, NonOwningTag); + + HandlePtr handle_; +}; + +template +HeaderList HeaderList::make_non_owning(rd_kafka_headers_t* handle) { + return HeaderList(handle, NonOwningTag()); +} + +template +HeaderList::HeaderList() +: handle_(nullptr, nullptr, nullptr) { + +} + +template +HeaderList::HeaderList(size_t reserve) +: handle_(rd_kafka_headers_new(reserve), &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { + +} + +template +HeaderList::HeaderList(const rd_kafka_headers_t* handle) +: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy + +} + +template +HeaderList::HeaderList(rd_kafka_headers_t* handle, NonOwningTag) +: handle_(HandlePtr(handle, &dummy_deleter, &dummy_cloner)) { //if we don't own the header list, we forward the handle on copy. + +} + +// Methods +template +Error HeaderList::add(const HeaderType& header) { + assert(handle_); + return rd_kafka_header_add(handle_.get(), + header.get_name().c_str(), header.get_name().size(), + header.get_value().data(), header.get_value().size()); + +} + +template <> +inline +Error HeaderList>::add(const Header& header) { + assert(handle_); + return rd_kafka_header_add(handle_.get(), + header.get_name().c_str(), header.get_name().size(), + header.get_value().get_data(), header.get_value().get_size()); +} + +template +Error HeaderList::remove(const std::string& name) { + assert(handle_); + return rd_kafka_header_remove(handle_.get(), name.c_str()); +} + +template +HeaderType HeaderList::at(size_t index) const { + assert(handle_); + const char *name, *value; + size_t size; + Error error = rd_kafka_header_get_all(handle_.get(), index, &name, &value, &size); + if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { + throw Exception(error.to_string()); + } + return HeaderType(name, BufferType(value, size)); +} + +template +HeaderType HeaderList::front() const { + return at(0); +} + +template +HeaderType HeaderList::back() const { + return at(size()-1); +} + +template +HeaderType HeaderList::back(const std::string& name) const { + assert(handle_); + const char *value; + size_t size; + Error error = rd_kafka_header_get_last(handle_.get(), name.c_str(), &value, &size); + if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { + throw Exception(error.to_string()); + } + return HeaderType(name, BufferType(value, size)); +} + +template +size_t HeaderList::size() const { + assert(handle_); + return rd_kafka_header_cnt(handle_.get()); +} + +template +bool HeaderList::empty() const { + return size() == 0; +} + +template +typename HeaderList::Iterator +HeaderList::begin() const { + assert(handle_); + if (empty()) { + return end(); + } + return Iterator(make_non_owning(handle_.get()), 0); +} + +template +typename HeaderList::Iterator +HeaderList::end() const { + assert(handle_); + static Iterator end(make_non_owning(handle_.get()), -1); + return end; +} + +template +rd_kafka_headers_t* HeaderList::get_handle() const { + return handle_.get(); +} + +} //namespace cppkafka + +#endif //CPPKAFKA_HEADER_LIST_H diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 6faeffe0..2bb1075b 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -39,6 +39,7 @@ #include "buffer.h" #include "macros.h" #include "error.h" +#include "header_list.h" namespace cppkafka { @@ -59,6 +60,8 @@ class CPPKAFKA_API Message { public: friend class MessageInternal; using InternalPtr = std::shared_ptr; + using HeaderType = Header; + using HeaderListType = HeaderList; /** * Constructs a message that won't take ownership of the given pointer */ @@ -84,7 +87,7 @@ class CPPKAFKA_API Message { Message& operator=(Message&& rhs) = default; /** - * Gets the error attribute + * \brief Gets the error attribute */ Error get_error() const { assert(handle_); @@ -92,14 +95,14 @@ class CPPKAFKA_API Message { } /** - * Utility function to check for get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF + * \brief Utility function to check for get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF */ bool is_eof() const { return get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF; } /** - * Gets the topic that this message belongs to + * \brief Gets the topic that this message belongs to */ std::string get_topic() const { assert(handle_); @@ -107,7 +110,7 @@ class CPPKAFKA_API Message { } /** - * Gets the partition that this message belongs to + * \brief Gets the partition that this message belongs to */ int get_partition() const { assert(handle_); @@ -115,21 +118,38 @@ class CPPKAFKA_API Message { } /** - * Gets the message's payload + * \brief Gets the message's payload */ const Buffer& get_payload() const { return payload_; } + + /** + * \brief Gets the message's header list + */ + const HeaderListType& get_header_list() const { + return header_list_; + } + + /** + * \brief Detaches the message's header list + */ + template + HeaderList detach_header_list() { + rd_kafka_headers_t* headers_handle; + Error error = rd_kafka_message_detach_headers(handle_.get(), &headers_handle); + return error ? HeaderList() : HeaderList(headers_handle); + } /** - * Gets the message's key + * \brief Gets the message's key */ const Buffer& get_key() const { return key_; } /** - * Gets the message offset + * \brief Gets the message offset */ int64_t get_offset() const { assert(handle_); @@ -152,23 +172,31 @@ class CPPKAFKA_API Message { * If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned. */ inline boost::optional get_timestamp() const; + + /** + * \brief Gets the message latency in microseconds as measured from the produce() call. + */ + int64_t get_latency() const { + assert(handle_); + return rd_kafka_message_latency(handle_.get()); + } /** - * Indicates whether this message is valid (not null) + * \brief Indicates whether this message is valid (not null) */ explicit operator bool() const { return handle_ != nullptr; } /** - * Gets the rdkafka message handle + * \brief Gets the rdkafka message handle */ rd_kafka_message_t* get_handle() const { return handle_.get(); } /** - * Internal private const data accessor (internal use only) + * \brief Internal private const data accessor (internal use only) */ InternalPtr internal() const { return internal_; @@ -185,6 +213,7 @@ class CPPKAFKA_API Message { HandlePtr handle_; Buffer payload_; Buffer key_; + HeaderListType header_list_; void* user_data_; InternalPtr internal_; }; diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index d09a6022..8bd35a2b 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -35,6 +35,7 @@ #include "topic.h" #include "macros.h" #include "message.h" +#include "header_list.h" namespace cppkafka { @@ -44,6 +45,8 @@ namespace cppkafka { template class BasicMessageBuilder { public: + using HeaderType = Header; + using HeaderListType = HeaderList; /** * Construct a BasicMessageBuilder * @@ -99,6 +102,13 @@ class BasicMessageBuilder { */ Concrete& key(BufferType&& value); + /** + * Add a header to the message + * + * \param header The header to be used + */ + Concrete& header(const HeaderType& header); + /** * Sets the message's payload * @@ -146,6 +156,16 @@ class BasicMessageBuilder { * Gets the message's key */ BufferType& key(); + + /** + * Gets the list of headers + */ + const HeaderListType& header_list() const; + + /** + * Gets the list of headers + */ + HeaderListType& header_list(); /** * Gets the message's payload @@ -180,6 +200,7 @@ class BasicMessageBuilder { std::string topic_; int partition_{-1}; BufferType key_; + HeaderListType header_list_; BufferType payload_; std::chrono::milliseconds timestamp_{0}; void* user_data_; @@ -237,6 +258,12 @@ C& BasicMessageBuilder::key(T&& value) { return get_concrete(); } +template +C& BasicMessageBuilder::header(const HeaderType& header) { + header_list_.add(header); + return get_concrete(); +} + template C& BasicMessageBuilder::payload(const T& value) { get_concrete().construct_buffer(payload_, value); @@ -281,6 +308,18 @@ T& BasicMessageBuilder::key() { return key_; } +template +const typename BasicMessageBuilder::HeaderListType& +BasicMessageBuilder::header_list() const { + return header_list_; +} + +template +typename BasicMessageBuilder::HeaderListType& +BasicMessageBuilder::header_list() { + return header_list_; +} + template const T& BasicMessageBuilder::payload() const { return payload_; @@ -338,6 +377,9 @@ C& BasicMessageBuilder::get_concrete() { */ class MessageBuilder : public BasicMessageBuilder { public: + using Base = BasicMessageBuilder; + using HeaderType = Base::HeaderType; + using HeaderListType = Base::HeaderListType; using BasicMessageBuilder::BasicMessageBuilder; void construct_buffer(Buffer& lhs, const Buffer& rhs) { diff --git a/src/buffer.cpp b/src/buffer.cpp index 164abe20..606e6dfa 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -34,6 +34,7 @@ using std::string; using std::equal; +using std::lexicographical_compare; using std::ostream; using std::hex; using std::dec; @@ -101,4 +102,22 @@ bool operator!=(const Buffer& lhs, const Buffer& rhs) { return !(lhs == rhs); } +bool operator<(const Buffer& lhs, const Buffer& rhs) { + return lexicographical_compare(lhs.get_data(), lhs.get_data() + lhs.get_size(), + rhs.get_data(), rhs.get_data() + rhs.get_size()); +} + +bool operator>(const Buffer& lhs, const Buffer& rhs) { + return lexicographical_compare(rhs.get_data(), rhs.get_data() + rhs.get_size(), + lhs.get_data(), lhs.get_data() + lhs.get_size()); +} + +bool operator<=(const Buffer& lhs, const Buffer& rhs) { + return !(lhs > rhs); +} + +bool operator>=(const Buffer& lhs, const Buffer& rhs) { + return !(lhs < rhs); +} + } // cppkafka diff --git a/src/message.cpp b/src/message.cpp index 3ac6c070..0738c6dc 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -63,6 +63,14 @@ Message::Message(HandlePtr handle) payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()), key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()), user_data_(handle_ ? handle_->_private : nullptr) { + // get the header list if any + if (handle_) { + rd_kafka_headers_t* headers_handle; + Error error = rd_kafka_message_headers(handle_.get(), &headers_handle); + if (!error) { + header_list_ = HeaderListType::make_non_owning(headers_handle); + } + } } Message& Message::load_internal() { diff --git a/src/producer.cpp b/src/producer.cpp index 4081b538..23c61270 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -74,6 +74,7 @@ void Producer::produce(const MessageBuilder& builder) { RD_KAFKA_V_MSGFLAGS(policy), RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), + RD_KAFKA_V_HEADERS(MessageBuilder::HeaderListType(builder.header_list()).get_handle()), //copy headers RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_OPAQUE(builder.user_data()), RD_KAFKA_V_END); @@ -91,6 +92,7 @@ void Producer::produce(const Message& message) { RD_KAFKA_V_MSGFLAGS(policy), RD_KAFKA_V_TIMESTAMP(duration), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), + RD_KAFKA_V_HEADERS(Message::HeaderListType(message.get_header_list()).get_handle()), //copy headers RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_OPAQUE(message.get_user_data()), RD_KAFKA_V_END); From 8c7149d3f98ddf0b8814402b2da7012dfc8ff15c Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Thu, 4 Oct 2018 11:52:47 -0400 Subject: [PATCH 2/6] Fixed issue when ptr is null and doesn't have a cloner function --- include/cppkafka/clonable_ptr.h | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 36d7feee..a870c145 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -60,7 +60,9 @@ class ClonablePtr { * \param rhs The pointer to be copied */ ClonablePtr(const ClonablePtr& rhs) - : handle_(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()), cloner_(rhs.cloner_) { + : handle_(rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : + std::unique_ptr(nullptr, nullptr)), + cloner_(rhs.cloner_) { } @@ -70,7 +72,15 @@ class ClonablePtr { * \param rhs The pointer to be copied */ ClonablePtr& operator=(const ClonablePtr& rhs) { - handle_.reset(cloner_(rhs.handle_.get())); + if (this == &rhs) { + return *this; + } + if (rhs.cloner_) { + handle_.reset(rhs.cloner_(rhs.handle_.get())); + } + else { + handle_.reset(); + } return *this; } From c17f3e50216ce8c35dee0241a22fb01d60c08ca7 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Wed, 10 Oct 2018 13:36:06 -0400 Subject: [PATCH 3/6] Code complete with test cases updated travis file with v0.11.5 --- .travis.yml | 2 +- include/cppkafka/clonable_ptr.h | 19 +- include/cppkafka/cppkafka.h | 3 + include/cppkafka/header.h | 54 ++++- include/cppkafka/header_list.h | 252 +++++++++++++----------- include/cppkafka/header_list_iterator.h | 189 ++++++++++++++++++ include/cppkafka/message.h | 4 +- include/cppkafka/message_builder.h | 3 + include/cppkafka/producer.h | 6 + src/producer.cpp | 60 ++++-- tests/CMakeLists.txt | 4 +- tests/consumer_test.cpp | 2 +- tests/headers_test.cpp | 222 +++++++++++++++++++++ tests/producer_test.cpp | 79 ++++++++ 14 files changed, 738 insertions(+), 161 deletions(-) create mode 100644 include/cppkafka/header_list_iterator.h create mode 100644 tests/headers_test.cpp diff --git a/.travis.yml b/.travis.yml index 1196aa0f..89f6732b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ compiler: - clang env: - - RDKAFKA_VERSION=v0.11.0 + - RDKAFKA_VERSION=v0.11.5 os: - linux diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index a870c145..78ec0f04 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -61,7 +61,7 @@ class ClonablePtr { */ ClonablePtr(const ClonablePtr& rhs) : handle_(rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : - std::unique_ptr(nullptr, nullptr)), + std::unique_ptr(rhs.handle_.get(), rhs.handle_.get_deleter())), cloner_(rhs.cloner_) { } @@ -75,12 +75,8 @@ class ClonablePtr { if (this == &rhs) { return *this; } - if (rhs.cloner_) { - handle_.reset(rhs.cloner_(rhs.handle_.get())); - } - else { - handle_.reset(); - } + handle_ = rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : + std::unique_ptr(rhs.handle_.get(), rhs.handle_.get_deleter()); return *this; } @@ -96,7 +92,14 @@ class ClonablePtr { } /** - * \brief Indicates whether this clonable pointer is valid (not null) + * \brief Releases ownership of the internal pointer + */ + T* release() { + return handle_.release(); + } + + /** + * \brief Indicates whether this ClonablePtr instance is valid (not null) */ explicit operator bool() const { return static_cast(handle_); diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index c1c68853..4f4268e5 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -39,6 +39,9 @@ #include #include #include +#include +#include +#include #include #include #include diff --git a/include/cppkafka/header.h b/include/cppkafka/header.h index f3c9fb74..8339547e 100644 --- a/include/cppkafka/header.h +++ b/include/cppkafka/header.h @@ -36,24 +36,60 @@ namespace cppkafka { +/** + * \brief Class representing a rdkafka header. + * + * The template parameter 'BufferType' can represent a cppkafka::Buffer, std::string, std::vector, etc. + * A valid header may contain an empty name as well as null data. + */ template class Header { public: using ValueType = BufferType; + + /** + * \brief Build an empty header with no data + */ Header() = default; - Header(const std::string& name, + /** + * \brief Build a header instance + * \param name The header name + * \param value The non-modifiable header data + */ + Header(const std::string name, const BufferType& value); - Header(const std::string& name, + /** + * \brief Build a header instance + * \param name The header name + * \param value The header data to be moved + */ + Header(const std::string name, BufferType&& value); + /** + * \brief Get the header name + * \return A reference to the name + */ const std::string& get_name() const; + /** + * \brief Get the header value + * \return A const reference to the underlying buffer + */ const BufferType& get_value() const; + /** + * \brief Get the header value + * \return A non-const reference to the underlying buffer + */ BufferType& get_value(); + /** + * \brief Check if this header is empty + * \return True if the header contains valid data, false otherwise. + */ operator bool() const; private: @@ -66,6 +102,7 @@ class Header { BufferType value_; }; +// Comparison operators for Header type template bool operator==(const Header& lhs, const Header& rhs) { return std::tie(lhs.get_name(), lhs.get_value()) == std::tie(rhs.get_name(), rhs.get_value()); @@ -96,20 +133,19 @@ bool operator>=(const Header& lhs, const Header& rhs) { return !(lhs < rhs); } +// Implementation template -Header::Header(const std::string& name, +Header::Header(const std::string name, const BufferType& value) -: name_(name), +: name_(std::move(name)), value_(make_value(value)) { - assert(!name.empty()); } template -Header::Header(const std::string& name, - BufferType&& value) -: name_(name), +Header::Header(const std::string name, + BufferType&& value) +: name_(std::move(name)), value_(std::move(value)) { - assert(!name.empty()); } template diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 197ba77d..97e58033 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -33,130 +33,130 @@ #include #include "clonable_ptr.h" #include "header.h" +#include "header_list_iterator.h" #include "exceptions.h" namespace cppkafka { -template -class HeaderList; - -template -class HeaderIterator; - -template -bool operator==(const HeaderIterator& lhs, const HeaderIterator& rhs); - -template -class HeaderIterator { -public: - friend HeaderList; - using HeaderListType = HeaderList; - using BufferType = typename HeaderType::ValueType; - - friend bool operator==(const HeaderIterator& lhs, const HeaderIterator& rhs); - - HeaderIterator(HeaderListType headers, - size_t index) - : header_list_(std::move(headers)), - header_(index == -1 ? HeaderType() : headers.at(index)), //check for end() - index_(index) { - } - - // prefix increment - HeaderIterator& operator++() - { - if (*this == header_list_.end()) { - throw Exception("Over bounds"); - } - if (++index_ >= header_list_.size()) { - *this = header_list_.end(); - } - return *this; - } - - // postfix increment - HeaderIterator operator++(int) - { - HeaderIterator tmp(*this); - operator++(); - return tmp; - } - - // prefix decrement - HeaderIterator& operator--() - { - if (index_ == 0) { - throw Exception("Under bounds"); - } - if (*this == header_list_.end()) { - index_ = header_list_.size()-1; - } - else { - --index_; - } - return *this; - } - - // postfix decrement - HeaderIterator operator--(int) - { - HeaderIterator tmp(*this); - operator--(); - return tmp; - } - - HeaderType& operator*() const { - header_ = header_list_.at(index_); - return header_; - } - - HeaderType* operator->() const { - header_ = header_list_.at(index_); - return &header_; - } - -private: - HeaderListType header_list_; - HeaderType header_; - size_t index_; -}; - -template -bool operator==(const HeaderIterator& lhs, const HeaderIterator& rhs) { - return (lhs.header_list_ == rhs.header_list_) && (lhs.index_= rhs.index_); -} - -template -bool operator!=(const HeaderIterator& lhs, const HeaderIterator& rhs) { - return !(lhs == rhs); -} - +/** + * \brief Thin wrapper over a rd_kafka_headers_t handle which optionally controls its lifetime. + * \tparam HeaderType The header type + * + * This is a copyable and movable class that wraps a rd_kafka_header_t*. When copying this class, + * all associated headers are also copied via rd_kafka_headers_copy(). If this list owns the underlying handle, + * its destructor will call rd_kafka_headers_destroy(). + */ template class HeaderList { public: using BufferType = typename HeaderType::ValueType; using Iterator = HeaderIterator; /** - * Constructs a message that won't take ownership of the given pointer + * Constructs a message that won't take ownership of the given pointer. */ static HeaderList make_non_owning(rd_kafka_headers_t* handle); + /** + * \brief Create an empty header list with no handle. + */ HeaderList(); + + /** + * \brief Create an empty header list. This call translates to rd_kafka_headers_new(). + * \param reserve The number of headers to reserve space for. + */ explicit HeaderList(size_t reserve); - explicit HeaderList(const rd_kafka_headers_t* handle); + /** + * \brief Create a header list and assume ownership of the handle. + * \param handle The header list handle. + */ + explicit HeaderList(rd_kafka_headers_t* handle); + + /** + * \brief Add a header to the list. This translates to rd_kafka_header_add(). + * \param header The header. + * \return An Error indicating if the operation was successful or not. + * \warning This operation shall invalidate all iterators. + */ Error add(const HeaderType& header); + + /** + * \brief Remove all headers with 'name'. This translates to rd_kafka_header_remove(). + * \param name The name of the header(s) to remove. + * \return An Error indicating if the operation was successful or not. + * \warning This operation shall invalidate all iterators. + */ Error remove(const std::string& name); + + /** + * \brief Return the header present at position 'index'. Throws on error. + * This translates to rd_kafka_header_get(index) + * \param index The header index in the list (0-based). + * \return The header at that position. + */ HeaderType at(size_t index) const; //throws - HeaderType front() const; - HeaderType back() const; - HeaderType back(const std::string& name) const; + + /** + * \brief Return the first header in the list. Throws if the list is empty. + * This translates to rd_kafka_header_get(0). + * \return The first header. + */ + HeaderType front() const; //throws + + /** + * \brief Return the first header in the list. Throws if the list is empty. + * This translates to rd_kafka_header_get(size-1). + * \return The last header. + */ + HeaderType back() const; //throws + + /** + * \brief Returns the number of headers in the list. This translates to rd_kafka_header_cnt(). + * \return The number of headers. + */ size_t size() const; + + /** + * \brief Indicates if this list is empty. + * \return True if empty, false otherwise. + */ bool empty() const; + + /** + * \brief Returns a HeaderIterator pointing to the first position if the list is not empty + * or pointing to end() otherwise. + * \return An iterator. + * \warning This iterator will be invalid if add() or remove() is called. + */ Iterator begin() const; + + /** + * \brief Returns a HeaderIterator pointing to one element past the end of the list. + * \return An iterator. + * \remark This iterator cannot be de-referenced. + */ Iterator end() const; + + /** + * \brief Get the underlying header list handle. + * \return The handle. + */ rd_kafka_headers_t* get_handle() const; + /** + * \brief Get the underlying header list handle and release its ownership. + * \return The handle. + * \warning After this call, the HeaderList becomes invalid. + */ + rd_kafka_headers_t* release_handle(); + + /** + * \brief Indicates if this list is valid (contains a non-null handle) or not. + * \return True if valid, false otherwise. + */ + explicit operator bool() const; + private: struct NonOwningTag { }; static void dummy_deleter(rd_kafka_headers_t*) {} @@ -170,6 +170,25 @@ class HeaderList { HandlePtr handle_; }; +template +bool operator==(const HeaderList& lhs, const HeaderList rhs) { + if (!lhs && !rhs) { + return true; + } + if (!lhs || !rhs) { + return false; + } + if (lhs.size() != rhs.size()) { + return false; + } + return std::equal(lhs.begin(), lhs.end(), rhs.begin()); +} + +template +bool operator!=(const HeaderList& lhs, const HeaderList rhs) { + return !(lhs == rhs); +} + template HeaderList HeaderList::make_non_owning(rd_kafka_headers_t* handle) { return HeaderList(handle, NonOwningTag()); @@ -188,7 +207,7 @@ HeaderList::HeaderList(size_t reserve) } template -HeaderList::HeaderList(const rd_kafka_headers_t* handle) +HeaderList::HeaderList(rd_kafka_headers_t* handle) : handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy } @@ -204,7 +223,7 @@ template Error HeaderList::add(const HeaderType& header) { assert(handle_); return rd_kafka_header_add(handle_.get(), - header.get_name().c_str(), header.get_name().size(), + header.get_name().data(), header.get_name().size(), header.get_value().data(), header.get_value().size()); } @@ -214,14 +233,14 @@ inline Error HeaderList>::add(const Header& header) { assert(handle_); return rd_kafka_header_add(handle_.get(), - header.get_name().c_str(), header.get_name().size(), + header.get_name().data(), header.get_name().size(), header.get_value().get_data(), header.get_value().get_size()); } template Error HeaderList::remove(const std::string& name) { assert(handle_); - return rd_kafka_header_remove(handle_.get(), name.c_str()); + return rd_kafka_header_remove(handle_.get(), name.data()); } template @@ -229,7 +248,7 @@ HeaderType HeaderList::at(size_t index) const { assert(handle_); const char *name, *value; size_t size; - Error error = rd_kafka_header_get_all(handle_.get(), index, &name, &value, &size); + Error error = rd_kafka_header_get_all(handle_.get(), index, &name, reinterpret_cast(&value), &size); if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw Exception(error.to_string()); } @@ -246,18 +265,6 @@ HeaderType HeaderList::back() const { return at(size()-1); } -template -HeaderType HeaderList::back(const std::string& name) const { - assert(handle_); - const char *value; - size_t size; - Error error = rd_kafka_header_get_last(handle_.get(), name.c_str(), &value, &size); - if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { - throw Exception(error.to_string()); - } - return HeaderType(name, BufferType(value, size)); -} - template size_t HeaderList::size() const { assert(handle_); @@ -283,8 +290,7 @@ template typename HeaderList::Iterator HeaderList::end() const { assert(handle_); - static Iterator end(make_non_owning(handle_.get()), -1); - return end; + return Iterator(make_non_owning(handle_.get()), size()); } template @@ -292,6 +298,16 @@ rd_kafka_headers_t* HeaderList::get_handle() const { return handle_.get(); } +template +rd_kafka_headers_t* HeaderList::release_handle() { + return handle_.release(); +} + +template +HeaderList::operator bool() const { + return static_cast(handle_); +} + } //namespace cppkafka #endif //CPPKAFKA_HEADER_LIST_H diff --git a/include/cppkafka/header_list_iterator.h b/include/cppkafka/header_list_iterator.h new file mode 100644 index 00000000..03c7a012 --- /dev/null +++ b/include/cppkafka/header_list_iterator.h @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_HEADER_LIST_ITERATOR_H +#define CPPKAFKA_HEADER_LIST_ITERATOR_H + +#include +#include +#include +#include "header.h" + +namespace cppkafka { + +template +class HeaderList; + +template +class HeaderIterator; + +template +bool operator==(const HeaderIterator& lhs, const HeaderIterator& rhs); + +/** + * \brief Iterator over a HeaderList object. + * \tparam HeaderType The type of header this iterator points to. + */ +template +class HeaderIterator { +public: + friend HeaderList; + using HeaderListType = HeaderList; + using BufferType = typename HeaderType::ValueType; + //std::iterator_traits + using difference_type = std::ptrdiff_t; + using value_type = HeaderType; + using pointer = value_type*; + using reference = value_type&; + using iterator_category = std::bidirectional_iterator_tag; + friend bool operator==(const HeaderIterator& lhs, + const HeaderIterator& rhs); + + HeaderIterator(const HeaderIterator& other) + : header_list_(other.header_list_), + header_(make_header(other.header_)), + index_(other.index_) { + + } + HeaderIterator& operator=(const HeaderIterator& other) { + if (this == &other) return *this; + header_list_ = other.header_list_; + header_ = make_header(other.header_); + index_ = other.index_; + } + HeaderIterator(HeaderIterator&&) = default; + HeaderIterator& operator=(HeaderIterator&&) = default; + + /** + * \brief Prefix increment of the iterator. + * \return Itself after being incremented. + */ + HeaderIterator& operator++() { + assert(index_ < header_list_.size()); + ++index_; + return *this; + } + + /** + * \brief Postfix increment of the iterator. + * \return Itself before being incremented. + */ + HeaderIterator operator++(int) { + HeaderIterator tmp(*this); + operator++(); + return tmp; + } + + /** + * \brief Prefix decrement of the iterator. + * \return Itself after being decremented. + */ + HeaderIterator& operator--() { + assert(index_ > 0); + --index_; + return *this; + } + + /** + * \brief Postfix decrement of the iterator. + * \return Itself before being decremented. + */ + HeaderIterator operator--(int) { + HeaderIterator tmp(*this); + operator--(); + return tmp; + } + + /** + * \brief Dereferences this iterator. + * \return A reference to the header the iterator points to. + * \warning Throws if invalid or if *this == end(). + */ + const HeaderType& operator*() const { + header_ = header_list_.at(index_); + return header_; + } + HeaderType& operator*() { + header_ = header_list_.at(index_); + return header_; + } + + /** + * \brief Dereferences this iterator. + * \return The address to the header the iterator points to. + * \warning Throws if invalid or if *this == end(). + */ + const HeaderType* operator->() const { + header_ = header_list_.at(index_); + return &header_; + } + HeaderType* operator->() { + header_ = header_list_.at(index_); + return &header_; + } + +private: + HeaderIterator(HeaderListType headers, + size_t index) + : header_list_(std::move(headers)), + header_(index == header_list_.size() ? HeaderType() : header_list_.at(index)), + index_(index) { + } + + template + T make_header(const T& other) { + return other; + } + + Header make_header(const Header& other) { + return Header(other.get_name(), + Buffer(other.get_value().get_data(), + other.get_value().get_size())); + } + + HeaderListType header_list_; + HeaderType header_; + size_t index_; +}; + +// Equality comparison operators +template +bool operator==(const HeaderIterator& lhs, const HeaderIterator& rhs) { + return (lhs.header_list_.get_handle() == rhs.header_list_.get_handle()) && (lhs.index_ == rhs.index_); +} + +template +bool operator!=(const HeaderIterator& lhs, const HeaderIterator& rhs) { + return !(lhs == rhs); +} + +} //namespace cppkafka + +#endif //CPPKAFKA_HEADER_LIST_ITERATOR_H + diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 2bb1075b..72f0da29 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -176,9 +176,9 @@ class CPPKAFKA_API Message { /** * \brief Gets the message latency in microseconds as measured from the produce() call. */ - int64_t get_latency() const { + std::chrono::microseconds get_latency() const { assert(handle_); - return rd_kafka_message_latency(handle_.get()); + return std::chrono::microseconds(rd_kafka_message_latency(handle_.get())); } /** diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 8bd35a2b..d1053dc9 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -260,6 +260,9 @@ C& BasicMessageBuilder::key(T&& value) { template C& BasicMessageBuilder::header(const HeaderType& header) { + if (!header_list_) { + header_list_ = HeaderListType(5); + } header_list_.add(header); return get_concrete(); } diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index c1c0e077..9b42dcfc 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -113,6 +113,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase { * \param builder The builder class used to compose a message */ void produce(const MessageBuilder& builder); + void produce(MessageBuilder&& builder); /** * \brief Produces a message @@ -120,6 +121,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase { * \param message The message to be produced */ void produce(const Message& message); + void produce(Message&& message); /** * \brief Polls on this handle @@ -157,6 +159,10 @@ class CPPKAFKA_API Producer : public KafkaHandleBase { */ void flush(std::chrono::milliseconds timeout); private: + void do_produce(const MessageBuilder& builder, MessageBuilder::HeaderListType&& headers); + void do_produce(const Message& message, MessageBuilder::HeaderListType&& headers); + + // Members PayloadPolicy message_payload_policy_; }; diff --git a/src/producer.cpp b/src/producer.cpp index 23c61270..474c3be5 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -65,6 +65,42 @@ Producer::PayloadPolicy Producer::get_payload_policy() const { } void Producer::produce(const MessageBuilder& builder) { + do_produce(builder, MessageBuilder::HeaderListType(builder.header_list())); //copy headers +} + +void Producer::produce(MessageBuilder&& builder) { + MessageBuilder temp(std::move(builder)); //owns header list after the move + do_produce(temp, MessageBuilder::HeaderListType(temp.header_list().release_handle())); //move headers +} + +void Producer::produce(const Message& message) { + do_produce(message, HeaderList(message.get_header_list())); //copy headers +} + +void Producer::produce(Message&& message) { + Message temp(std::move(message)); //rdakfka still owns the header list at this point + do_produce(temp, HeaderList(temp.detach_header_list())); //move headers +} + +int Producer::poll() { + return poll(get_timeout()); +} + +int Producer::poll(milliseconds timeout) { + return rd_kafka_poll(get_handle(), static_cast(timeout.count())); +} + +void Producer::flush() { + flush(get_timeout()); +} + +void Producer::flush(milliseconds timeout) { + auto result = rd_kafka_flush(get_handle(), static_cast(timeout.count())); + check_error(result); +} + +void Producer::do_produce(const MessageBuilder& builder, + MessageBuilder::HeaderListType&& headers) { const Buffer& payload = builder.payload(); const Buffer& key = builder.key(); const int policy = static_cast(message_payload_policy_); @@ -74,14 +110,15 @@ void Producer::produce(const MessageBuilder& builder) { RD_KAFKA_V_MSGFLAGS(policy), RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), - RD_KAFKA_V_HEADERS(MessageBuilder::HeaderListType(builder.header_list()).get_handle()), //copy headers + RD_KAFKA_V_HEADERS(headers.release_handle()), //pass ownership to rdkafka RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_OPAQUE(builder.user_data()), RD_KAFKA_V_END); check_error(result); } -void Producer::produce(const Message& message) { +void Producer::do_produce(const Message& message, + MessageBuilder::HeaderListType&& headers) { const Buffer& payload = message.get_payload(); const Buffer& key = message.get_key(); const int policy = static_cast(message_payload_policy_); @@ -92,28 +129,11 @@ void Producer::produce(const Message& message) { RD_KAFKA_V_MSGFLAGS(policy), RD_KAFKA_V_TIMESTAMP(duration), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), - RD_KAFKA_V_HEADERS(Message::HeaderListType(message.get_header_list()).get_handle()), //copy headers + RD_KAFKA_V_HEADERS(headers.release_handle()), //pass ownership to rdkafka RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_OPAQUE(message.get_user_data()), RD_KAFKA_V_END); check_error(result); } -int Producer::poll() { - return poll(get_timeout()); -} - -int Producer::poll(milliseconds timeout) { - return rd_kafka_poll(get_handle(), static_cast(timeout.count())); -} - -void Producer::flush() { - flush(get_timeout()); -} - -void Producer::flush(milliseconds timeout) { - auto result = rd_kafka_flush(get_handle(), static_cast(timeout.count())); - check_error(result); -} - } // cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c3234d0f..84e0e469 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -9,8 +9,7 @@ add_custom_target(tests) include_directories(${CMAKE_CURRENT_SOURCE_DIR}) add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") -add_executable( - cppkafka_tests +add_executable(cppkafka_tests buffer_test.cpp compacted_topic_processor_test.cpp configuration_test.cpp @@ -19,6 +18,7 @@ add_executable( producer_test.cpp consumer_test.cpp roundrobin_poll_test.cpp + headers_test.cpp # Main file test_main.cpp diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 843eddcd..f02a12bb 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -195,7 +195,7 @@ TEST_CASE("consumer throttle", "[consumer]") { if (callback_executed_count == 3) { return Message(); } - return move(msg); + return msg; }, [&](ConsumerDispatcher::Timeout) { if (callback_executed_count == 3) { diff --git a/tests/headers_test.cpp b/tests/headers_test.cpp new file mode 100644 index 00000000..47e2bdfa --- /dev/null +++ b/tests/headers_test.cpp @@ -0,0 +1,222 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "cppkafka/consumer.h" +#include "cppkafka/producer.h" +#include "cppkafka/header_list.h" +#include "test_utils.h" + +using std::vector; +using std::move; +using std::string; +using std::thread; +using std::set; +using std::mutex; +using std::tie; +using std::condition_variable; +using std::lock_guard; +using std::unique_lock; +using std::make_move_iterator; +using std::chrono::seconds; +using std::chrono::milliseconds; +using std::chrono::system_clock; + +using namespace cppkafka; +using StringHeader = Header; +using BufferHeader = Header; + +TEST_CASE("creation", "[headers]") { + SECTION("empty") { + HeaderList list; + REQUIRE(!!list == false); + } + + SECTION("default") { + HeaderList list(2); + REQUIRE(!!list == true); + REQUIRE(list.size() == 0); + REQUIRE(list.empty() == true); + REQUIRE(list.get_handle() != nullptr); + } + + SECTION("from handle") { + HeaderList list(rd_kafka_headers_new(1)); + REQUIRE(!!list == true); + REQUIRE(list.size() == 0); + REQUIRE(list.empty() == true); + REQUIRE(list.get_handle() != nullptr); + } +} + +TEST_CASE("release", "[headers]") { + HeaderList list(2); + auto handle = list.release_handle(); + REQUIRE(handle != nullptr); + REQUIRE(list.release_handle() == nullptr); //release again + REQUIRE(!!list == false); + rd_kafka_headers_destroy(handle); +} + +TEST_CASE("modify", "[headers]") { + SECTION("add") { + HeaderList list(10); + //empty header name + list.add(StringHeader(std::string(), std::string("payload1"))); + //empty payload + list.add(StringHeader(std::string("header2"), std::string())); + list.add(StringHeader(std::string("header3"), std::string("payload3"))); + //both null + list.add(StringHeader(std::string(), std::string())); + //both empty (0 length strings) + list.add(StringHeader(std::string(""), std::string(""))); + + //validate + REQUIRE(list.size() == 5); + REQUIRE_FALSE(list.empty()); + + //access a header + REQUIRE(list.at(1).get_name() == "header2"); + REQUIRE(list.at(1).get_value().empty()); + REQUIRE(list.at(2).get_value() == "payload3"); + } + + SECTION("remove") { + HeaderList list(10); + //empty header name + list.add(StringHeader(std::string(), std::string("payload1"))); + //empty payload + list.add(StringHeader(std::string("header2"), std::string())); + list.add(StringHeader(std::string("header3"), std::string("payload3"))); + //both null + list.add(StringHeader(std::string(), std::string())); + //both empty (0 length strings) + list.add(StringHeader(std::string(""), std::string(""))); + + //Remove a bogus name + Error err = list.remove(std::string("bogus")); + REQUIRE(err.get_error() == RD_KAFKA_RESP_ERR__NOENT); + //Remove header with name + list.remove(std::string("header2")); + REQUIRE(list.size() == 4); + list.remove(std::string("header3")); + REQUIRE(list.size() == 3); + //Remove headers without name + list.remove(std::string()); + REQUIRE(list.size() == 0); + } +} + +TEST_CASE("copy and move", "[headers]") { + SECTION("copy owning") { + //Create an owning header list and copy it + HeaderList list(3), list2(3); + list.add(StringHeader(std::string("header1"), std::string("payload1"))); + list.add(StringHeader(std::string("header2"), std::string("payload2"))); + list.add(StringHeader(std::string("header3"), std::string("payload3"))); + REQUIRE(list2.size() == 0); + list2 = list; + REQUIRE(list2.size() == 3); + REQUIRE(list2.size() == list.size()); + //make sure the handles are different + CHECK(list.get_handle() != list2.get_handle()); + CHECK(list.at(0) == list2.at(0)); + CHECK(list.at(1) == list2.at(1)); + CHECK(list.at(2) == list2.at(2)); + CHECK(list == list2); + } + + SECTION("copy owning with buffers") { + //Create an owning header list and copy it + HeaderList list(3), list2(3); + string payload1 = "payload1", payload2 = "payload2", payload3 = "payload3"; + list.add(BufferHeader(std::string("header1"), payload1)); + list.add(BufferHeader(std::string("header2"), payload2)); + list.add(BufferHeader(std::string("header3"), payload3)); + REQUIRE(list2.size() == 0); + list2 = list; + REQUIRE(list2.size() == 3); + REQUIRE(list2.size() == list.size()); + //make sure the handles are different + CHECK(list.get_handle() != list2.get_handle()); + CHECK(list.at(0) == list2.at(0)); + CHECK(list.at(1) == list2.at(1)); + CHECK(list.at(2) == list2.at(2)); + CHECK(list == list2); + } + + SECTION("copy non-owning") { + //Create an owning header list and copy it + HeaderList list(3), list2(3), list3(HeaderList::make_non_owning(list.get_handle())); + list.add(StringHeader(std::string("header1"), std::string("payload1"))); + list.add(StringHeader(std::string("header2"), std::string("payload2"))); + list.add(StringHeader(std::string("header3"), std::string("payload3"))); + list2 = list3; //copy non-owning list + REQUIRE(list.size() == 3); + REQUIRE(list3.size() == list.size()); + REQUIRE(list2.size() == list.size()); + //make sure the handles are the same + CHECK(list2.get_handle() == list3.get_handle()); + CHECK(list2.at(0) == list3.at(0)); + CHECK(list2.at(1) == list3.at(1)); + CHECK(list2.at(2) == list3.at(2)); + CHECK(list2 == list3); + } + + SECTION("move") { + HeaderList list(3), list2; + list.add(StringHeader(std::string("header1"), std::string("payload1"))); + list.add(StringHeader(std::string("header2"), std::string("payload2"))); + list.add(StringHeader(std::string("header3"), std::string("payload3"))); + auto handle = list.get_handle(); + list2 = std::move(list); + CHECK_FALSE(!!list); + CHECK(!!list2); + CHECK(list2.size() == 3); + CHECK(handle == list2.get_handle()); + } +} + +TEST_CASE("access", "[headers]") { + HeaderList list(3); + list.add(StringHeader(std::string("header1"), std::string("payload1"))); + list.add(StringHeader(std::string("header2"), std::string("payload2"))); + list.add(StringHeader(std::string("header3"), std::string("payload3"))); + CHECK(list.at(0).get_value() == "payload1"); + CHECK(list.at(1).get_value() == "payload2"); + CHECK(list.at(2).get_value() == "payload3"); + CHECK_THROWS_AS(list.at(3), Exception); + CHECK(list.front() == list.at(0)); + CHECK(list.back() == list.at(2)); +} + +TEST_CASE("iterate", "[headers]") { + HeaderList list(3); + REQUIRE(list.begin() == list.end()); + list.add(StringHeader(std::string("header1"), std::string("payload1"))); + REQUIRE(list.begin() != list.end()); + CHECK(++list.begin() == list.end()); + list.add(StringHeader(std::string("header2"), std::string("payload2"))); + list.add(StringHeader(std::string("header3"), std::string("payload3"))); + int i = 0; + for (auto it = list.begin(); it != list.end(); ++it, ++i) { + CHECK(it->get_name().length() == 7); + if (i == 0) { + CHECK(it->get_name() == "header1"); + } + else if (i == 1) { + CHECK(it->get_name() == "header2"); + } + else if (i == 2) { + CHECK(it->get_name() == "header3"); + } + } + //rewind end() iterator + CHECK((--list.end())->get_name() == "header3"); +} + + diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 841c98a4..4033e580 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -184,6 +184,41 @@ TEST_CASE("simple production", "[producer]") { CHECK(message.get_timestamp()->get_timestamp() == timestamp); } + SECTION("message with key and move-able headers") { + using Hdr = MessageBuilder::HeaderType; + const string payload = "Hello world! 2"; + const string key = "such key"; + const string header1, header2 = "", header3 = "header3"; + + const milliseconds timestamp{15}; + Producer producer(config); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition) + .key(key) + .payload(payload) + .timestamp(timestamp) + .header(Hdr()) + .header(Hdr(std::string(""), header2)) + .header(Hdr(std::string("header3"), header3))); + runner.try_join(); + + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == 1); + const auto& message = messages[0]; + CHECK(message.get_payload() == payload); + CHECK(message.get_key() == key); + CHECK(message.get_topic() == KAFKA_TOPICS[0]); + CHECK(message.get_partition() == partition); + CHECK(!!message.get_error() == false); + REQUIRE(!!message.get_timestamp() == true); + CHECK(message.get_timestamp()->get_timestamp() == timestamp); + //validate headers + REQUIRE(!!message.get_header_list()); + REQUIRE(message.get_header_list().size() == 3); + CHECK(message.get_header_list().front() == Hdr()); + CHECK(message.get_header_list().at(1) == Hdr(std::string(""), header2)); + CHECK(message.get_header_list().back() == Hdr(std::string("header3"), header3)); + } + SECTION("message without message builder") { const string payload = "Goodbye cruel world!"; const string key = "replay key"; @@ -315,6 +350,50 @@ TEST_CASE("multiple messages", "[producer]") { } } +TEST_CASE("multiple messages with copy-able headers", "[producer][headers]") { + using Hdr = MessageBuilder::HeaderType; + size_t message_count = 2; + string payload = "Hello world with headers"; + const string header1, header2 = "", header3 = "header3"; + + // Create a consumer and subscribe to this topic + Consumer consumer(make_consumer_config()); + consumer.subscribe({ KAFKA_TOPICS[0] }); + ConsumerRunner runner(consumer, message_count, KAFKA_NUM_PARTITIONS); + + // Now create a producer and produce a message + Producer producer(make_producer_config()); + MessageBuilder builder(KAFKA_TOPICS[0]); + builder.payload(payload) + .header(Hdr()) + .header(Hdr(std::string(""), header2)) + .header(Hdr(std::string("header3"), header3)); + producer.produce(builder); + producer.produce(builder); + + //Check we still have the messages after production + CHECK(!!builder.header_list()); + CHECK(builder.header_list().size() == 3); + + runner.try_join(); + + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == message_count); + const auto& message = messages[0]; + CHECK(message.get_payload() == payload); + CHECK(!!message.get_error() == false); + //validate headers + REQUIRE(!!message.get_header_list()); + REQUIRE(message.get_header_list().size() == 3); + CHECK(message.get_header_list().front() == Hdr()); + CHECK(message.get_header_list().at(1) == Hdr(std::string(""), header2)); + CHECK(message.get_header_list().back() == Hdr(std::string("header3"), header3)); + + //validate second message + CHECK(messages[0].get_header_list() == messages[1].get_header_list()); + CHECK(messages[0].get_header_list().get_handle() != messages[1].get_header_list().get_handle()); +} + TEST_CASE("multiple sync messages", "[producer][buffered_producer][sync]") { size_t message_count = 10; set payloads; From ac5d41ebd759566f5cb133086a997cf9e7e8fb8b Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Wed, 10 Oct 2018 18:07:32 -0400 Subject: [PATCH 4/6] Added compile time check for rdkafka header support version --- README.md | 2 + include/cppkafka/header.h | 5 +++ include/cppkafka/header_list.h | 4 ++ include/cppkafka/header_list_iterator.h | 4 ++ include/cppkafka/macros.h | 3 ++ include/cppkafka/message.h | 6 +++ include/cppkafka/message_builder.h | 18 +++++++- include/cppkafka/producer.h | 5 +++ src/consumer.cpp | 3 +- src/message.cpp | 4 +- src/producer.cpp | 59 +++++++++++++++++++++++++ tests/headers_test.cpp | 4 ++ tests/producer_test.cpp | 4 ++ 13 files changed, 117 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 2b0ecdac..f4e6bf9f 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ only supported via the high level consumer API. _cppkafka_ requires **rdkafka >= order to use it. Other wrapped functionalities are also provided, like fetching metadata, offsets, etc. +* _cppkafka_ provides message header support. This feature requires **rdkafka >= 0.11.4**. + * _cppkafka_ tries to add minimal overhead over _librdkafka_. A very thin wrapper for _librdkafka_ messages is used for consumption so there's virtually no overhead at all. diff --git a/include/cppkafka/header.h b/include/cppkafka/header.h index 8339547e..ca34b4ec 100644 --- a/include/cppkafka/header.h +++ b/include/cppkafka/header.h @@ -30,10 +30,13 @@ #ifndef CPPKAFKA_HEADER_H #define CPPKAFKA_HEADER_H +#include "macros.h" #include "buffer.h" #include #include +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + namespace cppkafka { /** @@ -187,4 +190,6 @@ Buffer Header::make_value(const Buffer& other) { } //namespace cppkafka +#endif //v0.11.4 + #endif //CPPKAFKA_HEADER_H diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 97e58033..fffa67fe 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -36,6 +36,8 @@ #include "header_list_iterator.h" #include "exceptions.h" +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + namespace cppkafka { /** @@ -310,4 +312,6 @@ HeaderList::operator bool() const { } //namespace cppkafka +#endif //v0.11.4 + #endif //CPPKAFKA_HEADER_LIST_H diff --git a/include/cppkafka/header_list_iterator.h b/include/cppkafka/header_list_iterator.h index 03c7a012..7608cf14 100644 --- a/include/cppkafka/header_list_iterator.h +++ b/include/cppkafka/header_list_iterator.h @@ -35,6 +35,8 @@ #include #include "header.h" +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + namespace cppkafka { template @@ -185,5 +187,7 @@ bool operator!=(const HeaderIterator& lhs, const HeaderIterator; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) using HeaderType = Header; using HeaderListType = HeaderList; +#endif /** * Constructs a message that won't take ownership of the given pointer */ @@ -124,6 +126,7 @@ class CPPKAFKA_API Message { return payload_; } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) /** * \brief Gets the message's header list */ @@ -140,6 +143,7 @@ class CPPKAFKA_API Message { Error error = rd_kafka_message_detach_headers(handle_.get(), &headers_handle); return error ? HeaderList() : HeaderList(headers_handle); } +#endif /** * \brief Gets the message's key @@ -213,7 +217,9 @@ class CPPKAFKA_API Message { HandlePtr handle_; Buffer payload_; Buffer key_; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) HeaderListType header_list_; +#endif void* user_data_; InternalPtr internal_; }; diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index d1053dc9..6f8f1234 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -45,8 +45,10 @@ namespace cppkafka { template class BasicMessageBuilder { public: +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) using HeaderType = Header; using HeaderListType = HeaderList; +#endif /** * Construct a BasicMessageBuilder * @@ -102,12 +104,14 @@ class BasicMessageBuilder { */ Concrete& key(BufferType&& value); +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) /** * Add a header to the message * * \param header The header to be used */ Concrete& header(const HeaderType& header); +#endif /** * Sets the message's payload @@ -157,6 +161,7 @@ class BasicMessageBuilder { */ BufferType& key(); +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) /** * Gets the list of headers */ @@ -166,7 +171,8 @@ class BasicMessageBuilder { * Gets the list of headers */ HeaderListType& header_list(); - +#endif + /** * Gets the message's payload */ @@ -200,7 +206,9 @@ class BasicMessageBuilder { std::string topic_; int partition_{-1}; BufferType key_; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) HeaderListType header_list_; +#endif BufferType payload_; std::chrono::milliseconds timestamp_{0}; void* user_data_; @@ -258,6 +266,7 @@ C& BasicMessageBuilder::key(T&& value) { return get_concrete(); } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) template C& BasicMessageBuilder::header(const HeaderType& header) { if (!header_list_) { @@ -266,6 +275,7 @@ C& BasicMessageBuilder::header(const HeaderType& header) { header_list_.add(header); return get_concrete(); } +#endif template C& BasicMessageBuilder::payload(const T& value) { @@ -311,6 +321,7 @@ T& BasicMessageBuilder::key() { return key_; } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) template const typename BasicMessageBuilder::HeaderListType& BasicMessageBuilder::header_list() const { @@ -322,6 +333,7 @@ typename BasicMessageBuilder::HeaderListType& BasicMessageBuilder::header_list() { return header_list_; } +#endif template const T& BasicMessageBuilder::payload() const { @@ -381,9 +393,11 @@ C& BasicMessageBuilder::get_concrete() { class MessageBuilder : public BasicMessageBuilder { public: using Base = BasicMessageBuilder; + using BasicMessageBuilder::BasicMessageBuilder; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) using HeaderType = Base::HeaderType; using HeaderListType = Base::HeaderListType; - using BasicMessageBuilder::BasicMessageBuilder; +#endif void construct_buffer(Buffer& lhs, const Buffer& rhs) { lhs = Buffer(rhs.get_data(), rhs.get_size()); diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 9b42dcfc..6877acf1 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -159,8 +159,13 @@ class CPPKAFKA_API Producer : public KafkaHandleBase { */ void flush(std::chrono::milliseconds timeout); private: +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) void do_produce(const MessageBuilder& builder, MessageBuilder::HeaderListType&& headers); void do_produce(const Message& message, MessageBuilder::HeaderListType&& headers); +#else + void do_produce(const MessageBuilder& builder); + void do_produce(const Message& message); +#endif // Members PayloadPolicy message_payload_policy_; diff --git a/src/consumer.cpp b/src/consumer.cpp index 1c2c6b80..b6eecf1d 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -29,6 +29,7 @@ #include #include #include +#include "macros.h" #include "consumer.h" #include "exceptions.h" #include "logging.h" @@ -48,7 +49,7 @@ using std::equal; namespace cppkafka { // See: https://github.com/edenhill/librdkafka/issues/1792 -const int rd_kafka_queue_refcount_bug_version = 0x000b0500; +const int rd_kafka_queue_refcount_bug_version = RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION; Queue get_queue(rd_kafka_queue_t* handle) { if (rd_kafka_version() <= rd_kafka_queue_refcount_bug_version) { return Queue::make_non_owning(handle); diff --git a/src/message.cpp b/src/message.cpp index 0738c6dc..798642da 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -26,7 +26,7 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ - + #include "message.h" #include "message_internal.h" @@ -63,6 +63,7 @@ Message::Message(HandlePtr handle) payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()), key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()), user_data_(handle_ ? handle_->_private : nullptr) { +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) // get the header list if any if (handle_) { rd_kafka_headers_t* headers_handle; @@ -71,6 +72,7 @@ Message::Message(HandlePtr handle) header_list_ = HeaderListType::make_non_owning(headers_handle); } } +#endif } Message& Message::load_internal() { diff --git a/src/producer.cpp b/src/producer.cpp index 474c3be5..d0297474 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -64,6 +64,7 @@ Producer::PayloadPolicy Producer::get_payload_policy() const { return message_payload_policy_; } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) void Producer::produce(const MessageBuilder& builder) { do_produce(builder, MessageBuilder::HeaderListType(builder.header_list())); //copy headers } @@ -81,6 +82,25 @@ void Producer::produce(Message&& message) { Message temp(std::move(message)); //rdakfka still owns the header list at this point do_produce(temp, HeaderList(temp.detach_header_list())); //move headers } +#else +void Producer::produce(const MessageBuilder& builder) { + do_produce(builder); +} + +void Producer::produce(MessageBuilder&& builder) { + MessageBuilder temp(std::move(builder)); + do_produce(temp); +} + +void Producer::produce(const Message& message) { + do_produce(message); +} + +void Producer::produce(Message&& message) { + Message temp(std::move(message)); + do_produce(temp); +} +#endif int Producer::poll() { return poll(get_timeout()); @@ -99,6 +119,8 @@ void Producer::flush(milliseconds timeout) { check_error(result); } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + void Producer::do_produce(const MessageBuilder& builder, MessageBuilder::HeaderListType&& headers) { const Buffer& payload = builder.payload(); @@ -136,4 +158,41 @@ void Producer::do_produce(const Message& message, check_error(result); } +#else + +void Producer::do_produce(const MessageBuilder& builder) { + const Buffer& payload = builder.payload(); + const Buffer& key = builder.key(); + const int policy = static_cast(message_payload_policy_); + auto result = rd_kafka_producev(get_handle(), + RD_KAFKA_V_TOPIC(builder.topic().data()), + RD_KAFKA_V_PARTITION(builder.partition()), + RD_KAFKA_V_MSGFLAGS(policy), + RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), + RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), + RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), + RD_KAFKA_V_OPAQUE(builder.user_data()), + RD_KAFKA_V_END); + check_error(result); +} + +void Producer::do_produce(const Message& message) { + const Buffer& payload = message.get_payload(); + const Buffer& key = message.get_key(); + const int policy = static_cast(message_payload_policy_); + int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; + auto result = rd_kafka_producev(get_handle(), + RD_KAFKA_V_TOPIC(message.get_topic().data()), + RD_KAFKA_V_PARTITION(message.get_partition()), + RD_KAFKA_V_MSGFLAGS(policy), + RD_KAFKA_V_TIMESTAMP(duration), + RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), + RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), + RD_KAFKA_V_OPAQUE(message.get_user_data()), + RD_KAFKA_V_END); + check_error(result); +} + +#endif //v0.11.4 + } // cppkafka diff --git a/tests/headers_test.cpp b/tests/headers_test.cpp index 47e2bdfa..6fb21473 100644 --- a/tests/headers_test.cpp +++ b/tests/headers_test.cpp @@ -26,6 +26,8 @@ using std::chrono::seconds; using std::chrono::milliseconds; using std::chrono::system_clock; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + using namespace cppkafka; using StringHeader = Header; using BufferHeader = Header; @@ -219,4 +221,6 @@ TEST_CASE("iterate", "[headers]") { CHECK((--list.end())->get_name() == "header3"); } +#endif //v0.11.4 + diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 4033e580..4e0762d1 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -184,6 +184,7 @@ TEST_CASE("simple production", "[producer]") { CHECK(message.get_timestamp()->get_timestamp() == timestamp); } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) SECTION("message with key and move-able headers") { using Hdr = MessageBuilder::HeaderType; const string payload = "Hello world! 2"; @@ -218,6 +219,7 @@ TEST_CASE("simple production", "[producer]") { CHECK(message.get_header_list().at(1) == Hdr(std::string(""), header2)); CHECK(message.get_header_list().back() == Hdr(std::string("header3"), header3)); } +#endif //v0.11.4 SECTION("message without message builder") { const string payload = "Goodbye cruel world!"; @@ -350,6 +352,7 @@ TEST_CASE("multiple messages", "[producer]") { } } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) TEST_CASE("multiple messages with copy-able headers", "[producer][headers]") { using Hdr = MessageBuilder::HeaderType; size_t message_count = 2; @@ -393,6 +396,7 @@ TEST_CASE("multiple messages with copy-able headers", "[producer][headers]") { CHECK(messages[0].get_header_list() == messages[1].get_header_list()); CHECK(messages[0].get_header_list().get_handle() != messages[1].get_header_list().get_handle()); } +#endif //v0.11.4 TEST_CASE("multiple sync messages", "[producer][buffered_producer][sync]") { size_t message_count = 10; From 9dbf515bd0bd9589b8322f664817355e5187a9b0 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Thu, 11 Oct 2018 09:12:28 -0400 Subject: [PATCH 5/6] Changes per last code review --- include/cppkafka/header.h | 10 +++++----- include/cppkafka/header_list.h | 2 +- include/cppkafka/header_list_iterator.h | 2 +- src/producer.cpp | 18 +++++++++--------- tests/headers_test.cpp | 2 +- tests/producer_test.cpp | 4 ++-- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/include/cppkafka/header.h b/include/cppkafka/header.h index ca34b4ec..8ff92693 100644 --- a/include/cppkafka/header.h +++ b/include/cppkafka/header.h @@ -60,7 +60,7 @@ class Header { * \param name The header name * \param value The non-modifiable header data */ - Header(const std::string name, + Header(std::string name, const BufferType& value); /** @@ -68,7 +68,7 @@ class Header { * \param name The header name * \param value The header data to be moved */ - Header(const std::string name, + Header(std::string name, BufferType&& value); /** @@ -138,14 +138,14 @@ bool operator>=(const Header& lhs, const Header& rhs) { // Implementation template -Header::Header(const std::string name, +Header::Header(std::string name, const BufferType& value) : name_(std::move(name)), value_(make_value(value)) { } template -Header::Header(const std::string name, +Header::Header(std::string name, BufferType&& value) : name_(std::move(name)), value_(std::move(value)) { @@ -190,6 +190,6 @@ Buffer Header::make_value(const Buffer& other) { } //namespace cppkafka -#endif //v0.11.4 +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION #endif //CPPKAFKA_HEADER_H diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index fffa67fe..3400b944 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -312,6 +312,6 @@ HeaderList::operator bool() const { } //namespace cppkafka -#endif //v0.11.4 +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION #endif //CPPKAFKA_HEADER_LIST_H diff --git a/include/cppkafka/header_list_iterator.h b/include/cppkafka/header_list_iterator.h index 7608cf14..226c3e2f 100644 --- a/include/cppkafka/header_list_iterator.h +++ b/include/cppkafka/header_list_iterator.h @@ -187,7 +187,7 @@ bool operator!=(const HeaderIterator& lhs, const HeaderIterator= RD_KAFKA_HEADERS_SUPPORT_VERSION) + void Producer::produce(const MessageBuilder& builder) { do_produce(builder, MessageBuilder::HeaderListType(builder.header_list())); //copy headers } void Producer::produce(MessageBuilder&& builder) { - MessageBuilder temp(std::move(builder)); //owns header list after the move - do_produce(temp, MessageBuilder::HeaderListType(temp.header_list().release_handle())); //move headers + do_produce(builder, MessageBuilder::HeaderListType(builder.header_list().release_handle())); //move headers } void Producer::produce(const Message& message) { @@ -79,17 +79,17 @@ void Producer::produce(const Message& message) { } void Producer::produce(Message&& message) { - Message temp(std::move(message)); //rdakfka still owns the header list at this point - do_produce(temp, HeaderList(temp.detach_header_list())); //move headers + do_produce(message, message.detach_header_list()); //move headers } + #else + void Producer::produce(const MessageBuilder& builder) { do_produce(builder); } void Producer::produce(MessageBuilder&& builder) { - MessageBuilder temp(std::move(builder)); - do_produce(temp); + do_produce(builder); } void Producer::produce(const Message& message) { @@ -97,9 +97,9 @@ void Producer::produce(const Message& message) { } void Producer::produce(Message&& message) { - Message temp(std::move(message)); - do_produce(temp); + do_produce(message); } + #endif int Producer::poll() { @@ -193,6 +193,6 @@ void Producer::do_produce(const Message& message) { check_error(result); } -#endif //v0.11.4 +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION } // cppkafka diff --git a/tests/headers_test.cpp b/tests/headers_test.cpp index 6fb21473..1ec12c9d 100644 --- a/tests/headers_test.cpp +++ b/tests/headers_test.cpp @@ -221,6 +221,6 @@ TEST_CASE("iterate", "[headers]") { CHECK((--list.end())->get_name() == "header3"); } -#endif //v0.11.4 +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 4e0762d1..57436c5f 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -219,7 +219,7 @@ TEST_CASE("simple production", "[producer]") { CHECK(message.get_header_list().at(1) == Hdr(std::string(""), header2)); CHECK(message.get_header_list().back() == Hdr(std::string("header3"), header3)); } -#endif //v0.11.4 +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION SECTION("message without message builder") { const string payload = "Goodbye cruel world!"; @@ -396,7 +396,7 @@ TEST_CASE("multiple messages with copy-able headers", "[producer][headers]") { CHECK(messages[0].get_header_list() == messages[1].get_header_list()); CHECK(messages[0].get_header_list().get_handle() != messages[1].get_header_list().get_handle()); } -#endif //v0.11.4 +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION TEST_CASE("multiple sync messages", "[producer][buffered_producer][sync]") { size_t message_count = 10; From 06938a166f5d084bacf05664de00086c635b412c Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Tue, 16 Oct 2018 13:25:38 -0400 Subject: [PATCH 6/6] Using brace list initializers --- tests/headers_test.cpp | 66 ++++++++++++++++++++--------------------- tests/producer_test.cpp | 24 +++++++-------- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/tests/headers_test.cpp b/tests/headers_test.cpp index 1ec12c9d..f860143d 100644 --- a/tests/headers_test.cpp +++ b/tests/headers_test.cpp @@ -68,14 +68,14 @@ TEST_CASE("modify", "[headers]") { SECTION("add") { HeaderList list(10); //empty header name - list.add(StringHeader(std::string(), std::string("payload1"))); + list.add({{}, "payload1"}); //empty payload - list.add(StringHeader(std::string("header2"), std::string())); - list.add(StringHeader(std::string("header3"), std::string("payload3"))); + list.add({"header2", {}}); + list.add({"header3", "payload3"}); //both null - list.add(StringHeader(std::string(), std::string())); - //both empty (0 length strings) - list.add(StringHeader(std::string(""), std::string(""))); + list.add({{}, {}}); + //both empty (0-length strings) + list.add({"", ""}); //validate REQUIRE(list.size() == 5); @@ -90,25 +90,25 @@ TEST_CASE("modify", "[headers]") { SECTION("remove") { HeaderList list(10); //empty header name - list.add(StringHeader(std::string(), std::string("payload1"))); + list.add({{}, "payload1"}); //empty payload - list.add(StringHeader(std::string("header2"), std::string())); - list.add(StringHeader(std::string("header3"), std::string("payload3"))); + list.add({"header2", {}}); + list.add({"header3", "payload3"}); //both null - list.add(StringHeader(std::string(), std::string())); + list.add({{}, {}}); //both empty (0 length strings) - list.add(StringHeader(std::string(""), std::string(""))); + list.add({"", ""}); //Remove a bogus name - Error err = list.remove(std::string("bogus")); + Error err = list.remove("bogus"); REQUIRE(err.get_error() == RD_KAFKA_RESP_ERR__NOENT); //Remove header with name - list.remove(std::string("header2")); + list.remove("header2"); REQUIRE(list.size() == 4); - list.remove(std::string("header3")); + list.remove("header3"); REQUIRE(list.size() == 3); //Remove headers without name - list.remove(std::string()); + list.remove({}); REQUIRE(list.size() == 0); } } @@ -117,9 +117,9 @@ TEST_CASE("copy and move", "[headers]") { SECTION("copy owning") { //Create an owning header list and copy it HeaderList list(3), list2(3); - list.add(StringHeader(std::string("header1"), std::string("payload1"))); - list.add(StringHeader(std::string("header2"), std::string("payload2"))); - list.add(StringHeader(std::string("header3"), std::string("payload3"))); + list.add({"header1", "payload1"}); + list.add({"header2", "payload2"}); + list.add({"header3", "payload3"}); REQUIRE(list2.size() == 0); list2 = list; REQUIRE(list2.size() == 3); @@ -136,9 +136,9 @@ TEST_CASE("copy and move", "[headers]") { //Create an owning header list and copy it HeaderList list(3), list2(3); string payload1 = "payload1", payload2 = "payload2", payload3 = "payload3"; - list.add(BufferHeader(std::string("header1"), payload1)); - list.add(BufferHeader(std::string("header2"), payload2)); - list.add(BufferHeader(std::string("header3"), payload3)); + list.add({"header1", payload1}); + list.add({"header2", payload2}); + list.add({"header3", payload3}); REQUIRE(list2.size() == 0); list2 = list; REQUIRE(list2.size() == 3); @@ -154,9 +154,9 @@ TEST_CASE("copy and move", "[headers]") { SECTION("copy non-owning") { //Create an owning header list and copy it HeaderList list(3), list2(3), list3(HeaderList::make_non_owning(list.get_handle())); - list.add(StringHeader(std::string("header1"), std::string("payload1"))); - list.add(StringHeader(std::string("header2"), std::string("payload2"))); - list.add(StringHeader(std::string("header3"), std::string("payload3"))); + list.add({"header1", "payload1"}); + list.add({"header2", "payload2"}); + list.add({"header3", "payload3"}); list2 = list3; //copy non-owning list REQUIRE(list.size() == 3); REQUIRE(list3.size() == list.size()); @@ -171,9 +171,9 @@ TEST_CASE("copy and move", "[headers]") { SECTION("move") { HeaderList list(3), list2; - list.add(StringHeader(std::string("header1"), std::string("payload1"))); - list.add(StringHeader(std::string("header2"), std::string("payload2"))); - list.add(StringHeader(std::string("header3"), std::string("payload3"))); + list.add({"header1", "payload1"}); + list.add({"header2", "payload2"}); + list.add({"header3", "payload3"}); auto handle = list.get_handle(); list2 = std::move(list); CHECK_FALSE(!!list); @@ -185,9 +185,9 @@ TEST_CASE("copy and move", "[headers]") { TEST_CASE("access", "[headers]") { HeaderList list(3); - list.add(StringHeader(std::string("header1"), std::string("payload1"))); - list.add(StringHeader(std::string("header2"), std::string("payload2"))); - list.add(StringHeader(std::string("header3"), std::string("payload3"))); + list.add({"header1", "payload1"}); + list.add({"header2", "payload2"}); + list.add({"header3", "payload3"}); CHECK(list.at(0).get_value() == "payload1"); CHECK(list.at(1).get_value() == "payload2"); CHECK(list.at(2).get_value() == "payload3"); @@ -199,11 +199,11 @@ TEST_CASE("access", "[headers]") { TEST_CASE("iterate", "[headers]") { HeaderList list(3); REQUIRE(list.begin() == list.end()); - list.add(StringHeader(std::string("header1"), std::string("payload1"))); + list.add({"header1", "payload1"}); REQUIRE(list.begin() != list.end()); CHECK(++list.begin() == list.end()); - list.add(StringHeader(std::string("header2"), std::string("payload2"))); - list.add(StringHeader(std::string("header3"), std::string("payload3"))); + list.add({"header2", "payload2"}); + list.add({"header3", "payload3"}); int i = 0; for (auto it = list.begin(); it != list.end(); ++it, ++i) { CHECK(it->get_name().length() == 7); diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 57436c5f..bf4130cb 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -197,9 +197,9 @@ TEST_CASE("simple production", "[producer]") { .key(key) .payload(payload) .timestamp(timestamp) - .header(Hdr()) - .header(Hdr(std::string(""), header2)) - .header(Hdr(std::string("header3"), header3))); + .header(Hdr{}) + .header(Hdr{"", header2}) + .header(Hdr{"header3", header3})); runner.try_join(); const auto& messages = runner.get_messages(); @@ -215,9 +215,9 @@ TEST_CASE("simple production", "[producer]") { //validate headers REQUIRE(!!message.get_header_list()); REQUIRE(message.get_header_list().size() == 3); - CHECK(message.get_header_list().front() == Hdr()); - CHECK(message.get_header_list().at(1) == Hdr(std::string(""), header2)); - CHECK(message.get_header_list().back() == Hdr(std::string("header3"), header3)); + CHECK(message.get_header_list().front() == Hdr{}); + CHECK(message.get_header_list().at(1) == Hdr{"", header2}); + CHECK(message.get_header_list().back() == Hdr{"header3", header3}); } #endif //RD_KAFKA_HEADERS_SUPPORT_VERSION @@ -368,9 +368,9 @@ TEST_CASE("multiple messages with copy-able headers", "[producer][headers]") { Producer producer(make_producer_config()); MessageBuilder builder(KAFKA_TOPICS[0]); builder.payload(payload) - .header(Hdr()) - .header(Hdr(std::string(""), header2)) - .header(Hdr(std::string("header3"), header3)); + .header(Hdr{}) + .header(Hdr{"", header2}) + .header(Hdr{"header3", header3}); producer.produce(builder); producer.produce(builder); @@ -388,9 +388,9 @@ TEST_CASE("multiple messages with copy-able headers", "[producer][headers]") { //validate headers REQUIRE(!!message.get_header_list()); REQUIRE(message.get_header_list().size() == 3); - CHECK(message.get_header_list().front() == Hdr()); - CHECK(message.get_header_list().at(1) == Hdr(std::string(""), header2)); - CHECK(message.get_header_list().back() == Hdr(std::string("header3"), header3)); + CHECK(message.get_header_list().front() == Hdr{}); + CHECK(message.get_header_list().at(1) == Hdr{"", header2}); + CHECK(message.get_header_list().back() == Hdr{"header3", header3}); //validate second message CHECK(messages[0].get_header_list() == messages[1].get_header_list());