diff --git a/.travis.yml b/.travis.yml index 1196aa0f..89f6732b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ compiler: - clang env: - - RDKAFKA_VERSION=v0.11.0 + - RDKAFKA_VERSION=v0.11.5 os: - linux diff --git a/README.md b/README.md index 2b0ecdac..f4e6bf9f 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ only supported via the high level consumer API. _cppkafka_ requires **rdkafka >= order to use it. Other wrapped functionalities are also provided, like fetching metadata, offsets, etc. +* _cppkafka_ provides message header support. This feature requires **rdkafka >= 0.11.4**. + * _cppkafka_ tries to add minimal overhead over _librdkafka_. A very thin wrapper for _librdkafka_ messages is used for consumption so there's virtually no overhead at all. diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index 821e182f..afa61b5a 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -172,6 +172,14 @@ CPPKAFKA_API bool operator==(const Buffer& lhs, const Buffer& rhs); */ CPPKAFKA_API bool operator!=(const Buffer& lhs, const Buffer& rhs); +/** + * Compares Buffer objects lexicographically + */ +CPPKAFKA_API bool operator<(const Buffer& lhs, const Buffer& rhs); +CPPKAFKA_API bool operator<=(const Buffer& lhs, const Buffer& rhs); +CPPKAFKA_API bool operator>(const Buffer& lhs, const Buffer& rhs); +CPPKAFKA_API bool operator>=(const Buffer& lhs, const Buffer& rhs); + } // cppkafka #endif // CPPKAFKA_BUFFER_H diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 842e3088..78ec0f04 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -41,7 +41,7 @@ template class ClonablePtr { public: /** - * Creates an instance + * \brief Creates an instance * * \param ptr The pointer to be wrapped * \param deleter The deleter functor @@ -60,17 +60,23 @@ class ClonablePtr { * \param rhs The pointer to be copied */ ClonablePtr(const ClonablePtr& rhs) - : handle_(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()), cloner_(rhs.cloner_) { + : handle_(rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : + std::unique_ptr(rhs.handle_.get(), rhs.handle_.get_deleter())), + cloner_(rhs.cloner_) { } /** - * Copies and assigns the given pointer + * \brief Copies and assigns the given pointer * * \param rhs The pointer to be copied */ ClonablePtr& operator=(const ClonablePtr& rhs) { - handle_.reset(cloner_(rhs.handle_.get())); + if (this == &rhs) { + return *this; + } + handle_ = rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : + std::unique_ptr(rhs.handle_.get(), rhs.handle_.get_deleter()); return *this; } @@ -79,11 +85,25 @@ class ClonablePtr { ~ClonablePtr() = default; /** - * Getter for the internal pointer + * \brief Getter for the internal pointer */ T* get() const { return handle_.get(); } + + /** + * \brief Releases ownership of the internal pointer + */ + T* release() { + return handle_.release(); + } + + /** + * \brief Indicates whether this ClonablePtr instance is valid (not null) + */ + explicit operator bool() const { + return static_cast(handle_); + } private: std::unique_ptr handle_; Cloner cloner_; diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index c1c68853..4f4268e5 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -39,6 +39,9 @@ #include #include #include +#include +#include +#include #include #include #include diff --git a/include/cppkafka/header.h b/include/cppkafka/header.h new file mode 100644 index 00000000..8ff92693 --- /dev/null +++ b/include/cppkafka/header.h @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_HEADER_H +#define CPPKAFKA_HEADER_H + +#include "macros.h" +#include "buffer.h" +#include +#include + +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + +namespace cppkafka { + +/** + * \brief Class representing a rdkafka header. + * + * The template parameter 'BufferType' can represent a cppkafka::Buffer, std::string, std::vector, etc. + * A valid header may contain an empty name as well as null data. + */ +template +class Header { +public: + using ValueType = BufferType; + + /** + * \brief Build an empty header with no data + */ + Header() = default; + + /** + * \brief Build a header instance + * \param name The header name + * \param value The non-modifiable header data + */ + Header(std::string name, + const BufferType& value); + + /** + * \brief Build a header instance + * \param name The header name + * \param value The header data to be moved + */ + Header(std::string name, + BufferType&& value); + + /** + * \brief Get the header name + * \return A reference to the name + */ + const std::string& get_name() const; + + /** + * \brief Get the header value + * \return A const reference to the underlying buffer + */ + const BufferType& get_value() const; + + /** + * \brief Get the header value + * \return A non-const reference to the underlying buffer + */ + BufferType& get_value(); + + /** + * \brief Check if this header is empty + * \return True if the header contains valid data, false otherwise. + */ + operator bool() const; + +private: + template + T make_value(const T& other); + + Buffer make_value(const Buffer& other); + + std::string name_; + BufferType value_; +}; + +// Comparison operators for Header type +template +bool operator==(const Header& lhs, const Header& rhs) { + return std::tie(lhs.get_name(), lhs.get_value()) == std::tie(rhs.get_name(), rhs.get_value()); +} + +template +bool operator!=(const Header& lhs, const Header& rhs) { + return !(lhs == rhs); +} + +template +bool operator<(const Header& lhs, const Header& rhs) { + return std::tie(lhs.get_name(), lhs.get_value()) < std::tie(rhs.get_name(), rhs.get_value()); +} + +template +bool operator>(const Header& lhs, const Header& rhs) { + return std::tie(lhs.get_name(), lhs.get_value()) > std::tie(rhs.get_name(), rhs.get_value()); +} + +template +bool operator<=(const Header& lhs, const Header& rhs) { + return !(lhs > rhs); +} + +template +bool operator>=(const Header& lhs, const Header& rhs) { + return !(lhs < rhs); +} + +// Implementation +template +Header::Header(std::string name, + const BufferType& value) +: name_(std::move(name)), + value_(make_value(value)) { +} + +template +Header::Header(std::string name, + BufferType&& value) +: name_(std::move(name)), + value_(std::move(value)) { +} + +template +const std::string& Header::get_name() const { + return name_; +} + +template +const BufferType& Header::get_value() const { + return value_; +} + +template +BufferType& Header::get_value() { + return value_; +} + +template +Header::operator bool() const { + return !value_.empty(); +} + +template <> +inline +Header::operator bool() const { + return value_.get_size() > 0; +} + +template +template +T Header::make_value(const T& other) { + return other; +} + +template +Buffer Header::make_value(const Buffer& other) { + return Buffer(other.get_data(), other.get_size()); +} + +} //namespace cppkafka + +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION + +#endif //CPPKAFKA_HEADER_H diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h new file mode 100644 index 00000000..3400b944 --- /dev/null +++ b/include/cppkafka/header_list.h @@ -0,0 +1,317 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_HEADER_LIST_H +#define CPPKAFKA_HEADER_LIST_H + +#include +#include "clonable_ptr.h" +#include "header.h" +#include "header_list_iterator.h" +#include "exceptions.h" + +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + +namespace cppkafka { + +/** + * \brief Thin wrapper over a rd_kafka_headers_t handle which optionally controls its lifetime. + * \tparam HeaderType The header type + * + * This is a copyable and movable class that wraps a rd_kafka_header_t*. When copying this class, + * all associated headers are also copied via rd_kafka_headers_copy(). If this list owns the underlying handle, + * its destructor will call rd_kafka_headers_destroy(). + */ +template +class HeaderList { +public: + using BufferType = typename HeaderType::ValueType; + using Iterator = HeaderIterator; + /** + * Constructs a message that won't take ownership of the given pointer. + */ + static HeaderList make_non_owning(rd_kafka_headers_t* handle); + + /** + * \brief Create an empty header list with no handle. + */ + HeaderList(); + + /** + * \brief Create an empty header list. This call translates to rd_kafka_headers_new(). + * \param reserve The number of headers to reserve space for. + */ + explicit HeaderList(size_t reserve); + + /** + * \brief Create a header list and assume ownership of the handle. + * \param handle The header list handle. + */ + explicit HeaderList(rd_kafka_headers_t* handle); + + /** + * \brief Add a header to the list. This translates to rd_kafka_header_add(). + * \param header The header. + * \return An Error indicating if the operation was successful or not. + * \warning This operation shall invalidate all iterators. + */ + Error add(const HeaderType& header); + + /** + * \brief Remove all headers with 'name'. This translates to rd_kafka_header_remove(). + * \param name The name of the header(s) to remove. + * \return An Error indicating if the operation was successful or not. + * \warning This operation shall invalidate all iterators. + */ + Error remove(const std::string& name); + + /** + * \brief Return the header present at position 'index'. Throws on error. + * This translates to rd_kafka_header_get(index) + * \param index The header index in the list (0-based). + * \return The header at that position. + */ + HeaderType at(size_t index) const; //throws + + /** + * \brief Return the first header in the list. Throws if the list is empty. + * This translates to rd_kafka_header_get(0). + * \return The first header. + */ + HeaderType front() const; //throws + + /** + * \brief Return the first header in the list. Throws if the list is empty. + * This translates to rd_kafka_header_get(size-1). + * \return The last header. + */ + HeaderType back() const; //throws + + /** + * \brief Returns the number of headers in the list. This translates to rd_kafka_header_cnt(). + * \return The number of headers. + */ + size_t size() const; + + /** + * \brief Indicates if this list is empty. + * \return True if empty, false otherwise. + */ + bool empty() const; + + /** + * \brief Returns a HeaderIterator pointing to the first position if the list is not empty + * or pointing to end() otherwise. + * \return An iterator. + * \warning This iterator will be invalid if add() or remove() is called. + */ + Iterator begin() const; + + /** + * \brief Returns a HeaderIterator pointing to one element past the end of the list. + * \return An iterator. + * \remark This iterator cannot be de-referenced. + */ + Iterator end() const; + + /** + * \brief Get the underlying header list handle. + * \return The handle. + */ + rd_kafka_headers_t* get_handle() const; + + /** + * \brief Get the underlying header list handle and release its ownership. + * \return The handle. + * \warning After this call, the HeaderList becomes invalid. + */ + rd_kafka_headers_t* release_handle(); + + /** + * \brief Indicates if this list is valid (contains a non-null handle) or not. + * \return True if valid, false otherwise. + */ + explicit operator bool() const; + +private: + struct NonOwningTag { }; + static void dummy_deleter(rd_kafka_headers_t*) {} + static rd_kafka_headers_t* dummy_cloner(const rd_kafka_headers_t* handle) { return const_cast(handle); } + + using HandlePtr = ClonablePtr; + + HeaderList(rd_kafka_headers_t* handle, NonOwningTag); + + HandlePtr handle_; +}; + +template +bool operator==(const HeaderList& lhs, const HeaderList rhs) { + if (!lhs && !rhs) { + return true; + } + if (!lhs || !rhs) { + return false; + } + if (lhs.size() != rhs.size()) { + return false; + } + return std::equal(lhs.begin(), lhs.end(), rhs.begin()); +} + +template +bool operator!=(const HeaderList& lhs, const HeaderList rhs) { + return !(lhs == rhs); +} + +template +HeaderList HeaderList::make_non_owning(rd_kafka_headers_t* handle) { + return HeaderList(handle, NonOwningTag()); +} + +template +HeaderList::HeaderList() +: handle_(nullptr, nullptr, nullptr) { + +} + +template +HeaderList::HeaderList(size_t reserve) +: handle_(rd_kafka_headers_new(reserve), &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { + +} + +template +HeaderList::HeaderList(rd_kafka_headers_t* handle) +: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy + +} + +template +HeaderList::HeaderList(rd_kafka_headers_t* handle, NonOwningTag) +: handle_(HandlePtr(handle, &dummy_deleter, &dummy_cloner)) { //if we don't own the header list, we forward the handle on copy. + +} + +// Methods +template +Error HeaderList::add(const HeaderType& header) { + assert(handle_); + return rd_kafka_header_add(handle_.get(), + header.get_name().data(), header.get_name().size(), + header.get_value().data(), header.get_value().size()); + +} + +template <> +inline +Error HeaderList>::add(const Header& header) { + assert(handle_); + return rd_kafka_header_add(handle_.get(), + header.get_name().data(), header.get_name().size(), + header.get_value().get_data(), header.get_value().get_size()); +} + +template +Error HeaderList::remove(const std::string& name) { + assert(handle_); + return rd_kafka_header_remove(handle_.get(), name.data()); +} + +template +HeaderType HeaderList::at(size_t index) const { + assert(handle_); + const char *name, *value; + size_t size; + Error error = rd_kafka_header_get_all(handle_.get(), index, &name, reinterpret_cast(&value), &size); + if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { + throw Exception(error.to_string()); + } + return HeaderType(name, BufferType(value, size)); +} + +template +HeaderType HeaderList::front() const { + return at(0); +} + +template +HeaderType HeaderList::back() const { + return at(size()-1); +} + +template +size_t HeaderList::size() const { + assert(handle_); + return rd_kafka_header_cnt(handle_.get()); +} + +template +bool HeaderList::empty() const { + return size() == 0; +} + +template +typename HeaderList::Iterator +HeaderList::begin() const { + assert(handle_); + if (empty()) { + return end(); + } + return Iterator(make_non_owning(handle_.get()), 0); +} + +template +typename HeaderList::Iterator +HeaderList::end() const { + assert(handle_); + return Iterator(make_non_owning(handle_.get()), size()); +} + +template +rd_kafka_headers_t* HeaderList::get_handle() const { + return handle_.get(); +} + +template +rd_kafka_headers_t* HeaderList::release_handle() { + return handle_.release(); +} + +template +HeaderList::operator bool() const { + return static_cast(handle_); +} + +} //namespace cppkafka + +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION + +#endif //CPPKAFKA_HEADER_LIST_H diff --git a/include/cppkafka/header_list_iterator.h b/include/cppkafka/header_list_iterator.h new file mode 100644 index 00000000..226c3e2f --- /dev/null +++ b/include/cppkafka/header_list_iterator.h @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_HEADER_LIST_ITERATOR_H +#define CPPKAFKA_HEADER_LIST_ITERATOR_H + +#include +#include +#include +#include "header.h" + +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + +namespace cppkafka { + +template +class HeaderList; + +template +class HeaderIterator; + +template +bool operator==(const HeaderIterator& lhs, const HeaderIterator& rhs); + +/** + * \brief Iterator over a HeaderList object. + * \tparam HeaderType The type of header this iterator points to. + */ +template +class HeaderIterator { +public: + friend HeaderList; + using HeaderListType = HeaderList; + using BufferType = typename HeaderType::ValueType; + //std::iterator_traits + using difference_type = std::ptrdiff_t; + using value_type = HeaderType; + using pointer = value_type*; + using reference = value_type&; + using iterator_category = std::bidirectional_iterator_tag; + friend bool operator==(const HeaderIterator& lhs, + const HeaderIterator& rhs); + + HeaderIterator(const HeaderIterator& other) + : header_list_(other.header_list_), + header_(make_header(other.header_)), + index_(other.index_) { + + } + HeaderIterator& operator=(const HeaderIterator& other) { + if (this == &other) return *this; + header_list_ = other.header_list_; + header_ = make_header(other.header_); + index_ = other.index_; + } + HeaderIterator(HeaderIterator&&) = default; + HeaderIterator& operator=(HeaderIterator&&) = default; + + /** + * \brief Prefix increment of the iterator. + * \return Itself after being incremented. + */ + HeaderIterator& operator++() { + assert(index_ < header_list_.size()); + ++index_; + return *this; + } + + /** + * \brief Postfix increment of the iterator. + * \return Itself before being incremented. + */ + HeaderIterator operator++(int) { + HeaderIterator tmp(*this); + operator++(); + return tmp; + } + + /** + * \brief Prefix decrement of the iterator. + * \return Itself after being decremented. + */ + HeaderIterator& operator--() { + assert(index_ > 0); + --index_; + return *this; + } + + /** + * \brief Postfix decrement of the iterator. + * \return Itself before being decremented. + */ + HeaderIterator operator--(int) { + HeaderIterator tmp(*this); + operator--(); + return tmp; + } + + /** + * \brief Dereferences this iterator. + * \return A reference to the header the iterator points to. + * \warning Throws if invalid or if *this == end(). + */ + const HeaderType& operator*() const { + header_ = header_list_.at(index_); + return header_; + } + HeaderType& operator*() { + header_ = header_list_.at(index_); + return header_; + } + + /** + * \brief Dereferences this iterator. + * \return The address to the header the iterator points to. + * \warning Throws if invalid or if *this == end(). + */ + const HeaderType* operator->() const { + header_ = header_list_.at(index_); + return &header_; + } + HeaderType* operator->() { + header_ = header_list_.at(index_); + return &header_; + } + +private: + HeaderIterator(HeaderListType headers, + size_t index) + : header_list_(std::move(headers)), + header_(index == header_list_.size() ? HeaderType() : header_list_.at(index)), + index_(index) { + } + + template + T make_header(const T& other) { + return other; + } + + Header make_header(const Header& other) { + return Header(other.get_name(), + Buffer(other.get_value().get_data(), + other.get_value().get_size())); + } + + HeaderListType header_list_; + HeaderType header_; + size_t index_; +}; + +// Equality comparison operators +template +bool operator==(const HeaderIterator& lhs, const HeaderIterator& rhs) { + return (lhs.header_list_.get_handle() == rhs.header_list_.get_handle()) && (lhs.index_ == rhs.index_); +} + +template +bool operator!=(const HeaderIterator& lhs, const HeaderIterator& rhs) { + return !(lhs == rhs); +} + +} //namespace cppkafka + +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION + +#endif //CPPKAFKA_HEADER_LIST_ITERATOR_H + diff --git a/include/cppkafka/macros.h b/include/cppkafka/macros.h index d69e259c..a3fc392a 100644 --- a/include/cppkafka/macros.h +++ b/include/cppkafka/macros.h @@ -43,4 +43,8 @@ #define CPPKAFKA_API #endif // _WIN32 && !CPPKAFKA_STATIC +// See: https://github.com/edenhill/librdkafka/issues/1792 +#define RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION 0x000b0500 //v0.11.5.00 +#define RD_KAFKA_HEADERS_SUPPORT_VERSION 0x000b0402 //v0.11.4.02 + #endif // CPPKAFKA_MACROS_H diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 6faeffe0..75d053ec 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -39,6 +39,7 @@ #include "buffer.h" #include "macros.h" #include "error.h" +#include "header_list.h" namespace cppkafka { @@ -59,6 +60,10 @@ class CPPKAFKA_API Message { public: friend class MessageInternal; using InternalPtr = std::shared_ptr; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + using HeaderType = Header; + using HeaderListType = HeaderList; +#endif /** * Constructs a message that won't take ownership of the given pointer */ @@ -84,7 +89,7 @@ class CPPKAFKA_API Message { Message& operator=(Message&& rhs) = default; /** - * Gets the error attribute + * \brief Gets the error attribute */ Error get_error() const { assert(handle_); @@ -92,14 +97,14 @@ class CPPKAFKA_API Message { } /** - * Utility function to check for get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF + * \brief Utility function to check for get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF */ bool is_eof() const { return get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF; } /** - * Gets the topic that this message belongs to + * \brief Gets the topic that this message belongs to */ std::string get_topic() const { assert(handle_); @@ -107,7 +112,7 @@ class CPPKAFKA_API Message { } /** - * Gets the partition that this message belongs to + * \brief Gets the partition that this message belongs to */ int get_partition() const { assert(handle_); @@ -115,21 +120,40 @@ class CPPKAFKA_API Message { } /** - * Gets the message's payload + * \brief Gets the message's payload */ const Buffer& get_payload() const { return payload_; } + +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + /** + * \brief Gets the message's header list + */ + const HeaderListType& get_header_list() const { + return header_list_; + } + + /** + * \brief Detaches the message's header list + */ + template + HeaderList detach_header_list() { + rd_kafka_headers_t* headers_handle; + Error error = rd_kafka_message_detach_headers(handle_.get(), &headers_handle); + return error ? HeaderList() : HeaderList(headers_handle); + } +#endif /** - * Gets the message's key + * \brief Gets the message's key */ const Buffer& get_key() const { return key_; } /** - * Gets the message offset + * \brief Gets the message offset */ int64_t get_offset() const { assert(handle_); @@ -152,23 +176,31 @@ class CPPKAFKA_API Message { * If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned. */ inline boost::optional get_timestamp() const; + + /** + * \brief Gets the message latency in microseconds as measured from the produce() call. + */ + std::chrono::microseconds get_latency() const { + assert(handle_); + return std::chrono::microseconds(rd_kafka_message_latency(handle_.get())); + } /** - * Indicates whether this message is valid (not null) + * \brief Indicates whether this message is valid (not null) */ explicit operator bool() const { return handle_ != nullptr; } /** - * Gets the rdkafka message handle + * \brief Gets the rdkafka message handle */ rd_kafka_message_t* get_handle() const { return handle_.get(); } /** - * Internal private const data accessor (internal use only) + * \brief Internal private const data accessor (internal use only) */ InternalPtr internal() const { return internal_; @@ -185,6 +217,9 @@ class CPPKAFKA_API Message { HandlePtr handle_; Buffer payload_; Buffer key_; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + HeaderListType header_list_; +#endif void* user_data_; InternalPtr internal_; }; diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index d09a6022..6f8f1234 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -35,6 +35,7 @@ #include "topic.h" #include "macros.h" #include "message.h" +#include "header_list.h" namespace cppkafka { @@ -44,6 +45,10 @@ namespace cppkafka { template class BasicMessageBuilder { public: +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + using HeaderType = Header; + using HeaderListType = HeaderList; +#endif /** * Construct a BasicMessageBuilder * @@ -99,6 +104,15 @@ class BasicMessageBuilder { */ Concrete& key(BufferType&& value); +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + /** + * Add a header to the message + * + * \param header The header to be used + */ + Concrete& header(const HeaderType& header); +#endif + /** * Sets the message's payload * @@ -146,7 +160,19 @@ class BasicMessageBuilder { * Gets the message's key */ BufferType& key(); - + +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + /** + * Gets the list of headers + */ + const HeaderListType& header_list() const; + + /** + * Gets the list of headers + */ + HeaderListType& header_list(); +#endif + /** * Gets the message's payload */ @@ -180,6 +206,9 @@ class BasicMessageBuilder { std::string topic_; int partition_{-1}; BufferType key_; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + HeaderListType header_list_; +#endif BufferType payload_; std::chrono::milliseconds timestamp_{0}; void* user_data_; @@ -237,6 +266,17 @@ C& BasicMessageBuilder::key(T&& value) { return get_concrete(); } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) +template +C& BasicMessageBuilder::header(const HeaderType& header) { + if (!header_list_) { + header_list_ = HeaderListType(5); + } + header_list_.add(header); + return get_concrete(); +} +#endif + template C& BasicMessageBuilder::payload(const T& value) { get_concrete().construct_buffer(payload_, value); @@ -281,6 +321,20 @@ T& BasicMessageBuilder::key() { return key_; } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) +template +const typename BasicMessageBuilder::HeaderListType& +BasicMessageBuilder::header_list() const { + return header_list_; +} + +template +typename BasicMessageBuilder::HeaderListType& +BasicMessageBuilder::header_list() { + return header_list_; +} +#endif + template const T& BasicMessageBuilder::payload() const { return payload_; @@ -338,7 +392,12 @@ C& BasicMessageBuilder::get_concrete() { */ class MessageBuilder : public BasicMessageBuilder { public: + using Base = BasicMessageBuilder; using BasicMessageBuilder::BasicMessageBuilder; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + using HeaderType = Base::HeaderType; + using HeaderListType = Base::HeaderListType; +#endif void construct_buffer(Buffer& lhs, const Buffer& rhs) { lhs = Buffer(rhs.get_data(), rhs.get_size()); diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index c1c0e077..6877acf1 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -113,6 +113,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase { * \param builder The builder class used to compose a message */ void produce(const MessageBuilder& builder); + void produce(MessageBuilder&& builder); /** * \brief Produces a message @@ -120,6 +121,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase { * \param message The message to be produced */ void produce(const Message& message); + void produce(Message&& message); /** * \brief Polls on this handle @@ -157,6 +159,15 @@ class CPPKAFKA_API Producer : public KafkaHandleBase { */ void flush(std::chrono::milliseconds timeout); private: +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + void do_produce(const MessageBuilder& builder, MessageBuilder::HeaderListType&& headers); + void do_produce(const Message& message, MessageBuilder::HeaderListType&& headers); +#else + void do_produce(const MessageBuilder& builder); + void do_produce(const Message& message); +#endif + + // Members PayloadPolicy message_payload_policy_; }; diff --git a/src/buffer.cpp b/src/buffer.cpp index 164abe20..606e6dfa 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -34,6 +34,7 @@ using std::string; using std::equal; +using std::lexicographical_compare; using std::ostream; using std::hex; using std::dec; @@ -101,4 +102,22 @@ bool operator!=(const Buffer& lhs, const Buffer& rhs) { return !(lhs == rhs); } +bool operator<(const Buffer& lhs, const Buffer& rhs) { + return lexicographical_compare(lhs.get_data(), lhs.get_data() + lhs.get_size(), + rhs.get_data(), rhs.get_data() + rhs.get_size()); +} + +bool operator>(const Buffer& lhs, const Buffer& rhs) { + return lexicographical_compare(rhs.get_data(), rhs.get_data() + rhs.get_size(), + lhs.get_data(), lhs.get_data() + lhs.get_size()); +} + +bool operator<=(const Buffer& lhs, const Buffer& rhs) { + return !(lhs > rhs); +} + +bool operator>=(const Buffer& lhs, const Buffer& rhs) { + return !(lhs < rhs); +} + } // cppkafka diff --git a/src/consumer.cpp b/src/consumer.cpp index 89e1a474..52e50b20 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -29,6 +29,7 @@ #include #include #include +#include "macros.h" #include "consumer.h" #include "exceptions.h" #include "logging.h" @@ -48,10 +49,8 @@ using std::allocator; namespace cppkafka { -// See: https://github.com/edenhill/librdkafka/issues/1792 -const int rd_kafka_queue_refcount_bug_version = 0x000b0500; Queue Consumer::get_queue(rd_kafka_queue_t* handle) { - if (rd_kafka_version() <= rd_kafka_queue_refcount_bug_version) { + if (rd_kafka_version() <= RD_KAFKA_QUEUE_REFCOUNT_BUG_VERSION) { return Queue::make_non_owning(handle); } else { diff --git a/src/message.cpp b/src/message.cpp index 3ac6c070..798642da 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -26,7 +26,7 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ - + #include "message.h" #include "message_internal.h" @@ -63,6 +63,16 @@ Message::Message(HandlePtr handle) payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()), key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()), user_data_(handle_ ? handle_->_private : nullptr) { +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + // get the header list if any + if (handle_) { + rd_kafka_headers_t* headers_handle; + Error error = rd_kafka_message_headers(handle_.get(), &headers_handle); + if (!error) { + header_list_ = HeaderListType::make_non_owning(headers_handle); + } + } +#endif } Message& Message::load_internal() { diff --git a/src/producer.cpp b/src/producer.cpp index 4081b538..dd1c1c29 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -64,7 +64,65 @@ Producer::PayloadPolicy Producer::get_payload_policy() const { return message_payload_policy_; } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + +void Producer::produce(const MessageBuilder& builder) { + do_produce(builder, MessageBuilder::HeaderListType(builder.header_list())); //copy headers +} + +void Producer::produce(MessageBuilder&& builder) { + do_produce(builder, MessageBuilder::HeaderListType(builder.header_list().release_handle())); //move headers +} + +void Producer::produce(const Message& message) { + do_produce(message, HeaderList(message.get_header_list())); //copy headers +} + +void Producer::produce(Message&& message) { + do_produce(message, message.detach_header_list()); //move headers +} + +#else + void Producer::produce(const MessageBuilder& builder) { + do_produce(builder); +} + +void Producer::produce(MessageBuilder&& builder) { + do_produce(builder); +} + +void Producer::produce(const Message& message) { + do_produce(message); +} + +void Producer::produce(Message&& message) { + do_produce(message); +} + +#endif + +int Producer::poll() { + return poll(get_timeout()); +} + +int Producer::poll(milliseconds timeout) { + return rd_kafka_poll(get_handle(), static_cast(timeout.count())); +} + +void Producer::flush() { + flush(get_timeout()); +} + +void Producer::flush(milliseconds timeout) { + auto result = rd_kafka_flush(get_handle(), static_cast(timeout.count())); + check_error(result); +} + +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + +void Producer::do_produce(const MessageBuilder& builder, + MessageBuilder::HeaderListType&& headers) { const Buffer& payload = builder.payload(); const Buffer& key = builder.key(); const int policy = static_cast(message_payload_policy_); @@ -74,13 +132,15 @@ void Producer::produce(const MessageBuilder& builder) { RD_KAFKA_V_MSGFLAGS(policy), RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), + RD_KAFKA_V_HEADERS(headers.release_handle()), //pass ownership to rdkafka RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_OPAQUE(builder.user_data()), RD_KAFKA_V_END); check_error(result); } -void Producer::produce(const Message& message) { +void Producer::do_produce(const Message& message, + MessageBuilder::HeaderListType&& headers) { const Buffer& payload = message.get_payload(); const Buffer& key = message.get_key(); const int policy = static_cast(message_payload_policy_); @@ -91,27 +151,48 @@ void Producer::produce(const Message& message) { RD_KAFKA_V_MSGFLAGS(policy), RD_KAFKA_V_TIMESTAMP(duration), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), + RD_KAFKA_V_HEADERS(headers.release_handle()), //pass ownership to rdkafka RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_OPAQUE(message.get_user_data()), RD_KAFKA_V_END); check_error(result); } -int Producer::poll() { - return poll(get_timeout()); -} - -int Producer::poll(milliseconds timeout) { - return rd_kafka_poll(get_handle(), static_cast(timeout.count())); -} +#else -void Producer::flush() { - flush(get_timeout()); +void Producer::do_produce(const MessageBuilder& builder) { + const Buffer& payload = builder.payload(); + const Buffer& key = builder.key(); + const int policy = static_cast(message_payload_policy_); + auto result = rd_kafka_producev(get_handle(), + RD_KAFKA_V_TOPIC(builder.topic().data()), + RD_KAFKA_V_PARTITION(builder.partition()), + RD_KAFKA_V_MSGFLAGS(policy), + RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), + RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), + RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), + RD_KAFKA_V_OPAQUE(builder.user_data()), + RD_KAFKA_V_END); + check_error(result); } -void Producer::flush(milliseconds timeout) { - auto result = rd_kafka_flush(get_handle(), static_cast(timeout.count())); +void Producer::do_produce(const Message& message) { + const Buffer& payload = message.get_payload(); + const Buffer& key = message.get_key(); + const int policy = static_cast(message_payload_policy_); + int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; + auto result = rd_kafka_producev(get_handle(), + RD_KAFKA_V_TOPIC(message.get_topic().data()), + RD_KAFKA_V_PARTITION(message.get_partition()), + RD_KAFKA_V_MSGFLAGS(policy), + RD_KAFKA_V_TIMESTAMP(duration), + RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), + RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), + RD_KAFKA_V_OPAQUE(message.get_user_data()), + RD_KAFKA_V_END); check_error(result); } +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION + } // cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c3234d0f..84e0e469 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -9,8 +9,7 @@ add_custom_target(tests) include_directories(${CMAKE_CURRENT_SOURCE_DIR}) add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") -add_executable( - cppkafka_tests +add_executable(cppkafka_tests buffer_test.cpp compacted_topic_processor_test.cpp configuration_test.cpp @@ -19,6 +18,7 @@ add_executable( producer_test.cpp consumer_test.cpp roundrobin_poll_test.cpp + headers_test.cpp # Main file test_main.cpp diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 843eddcd..f02a12bb 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -195,7 +195,7 @@ TEST_CASE("consumer throttle", "[consumer]") { if (callback_executed_count == 3) { return Message(); } - return move(msg); + return msg; }, [&](ConsumerDispatcher::Timeout) { if (callback_executed_count == 3) { diff --git a/tests/headers_test.cpp b/tests/headers_test.cpp new file mode 100644 index 00000000..f860143d --- /dev/null +++ b/tests/headers_test.cpp @@ -0,0 +1,226 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "cppkafka/consumer.h" +#include "cppkafka/producer.h" +#include "cppkafka/header_list.h" +#include "test_utils.h" + +using std::vector; +using std::move; +using std::string; +using std::thread; +using std::set; +using std::mutex; +using std::tie; +using std::condition_variable; +using std::lock_guard; +using std::unique_lock; +using std::make_move_iterator; +using std::chrono::seconds; +using std::chrono::milliseconds; +using std::chrono::system_clock; + +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + +using namespace cppkafka; +using StringHeader = Header; +using BufferHeader = Header; + +TEST_CASE("creation", "[headers]") { + SECTION("empty") { + HeaderList list; + REQUIRE(!!list == false); + } + + SECTION("default") { + HeaderList list(2); + REQUIRE(!!list == true); + REQUIRE(list.size() == 0); + REQUIRE(list.empty() == true); + REQUIRE(list.get_handle() != nullptr); + } + + SECTION("from handle") { + HeaderList list(rd_kafka_headers_new(1)); + REQUIRE(!!list == true); + REQUIRE(list.size() == 0); + REQUIRE(list.empty() == true); + REQUIRE(list.get_handle() != nullptr); + } +} + +TEST_CASE("release", "[headers]") { + HeaderList list(2); + auto handle = list.release_handle(); + REQUIRE(handle != nullptr); + REQUIRE(list.release_handle() == nullptr); //release again + REQUIRE(!!list == false); + rd_kafka_headers_destroy(handle); +} + +TEST_CASE("modify", "[headers]") { + SECTION("add") { + HeaderList list(10); + //empty header name + list.add({{}, "payload1"}); + //empty payload + list.add({"header2", {}}); + list.add({"header3", "payload3"}); + //both null + list.add({{}, {}}); + //both empty (0-length strings) + list.add({"", ""}); + + //validate + REQUIRE(list.size() == 5); + REQUIRE_FALSE(list.empty()); + + //access a header + REQUIRE(list.at(1).get_name() == "header2"); + REQUIRE(list.at(1).get_value().empty()); + REQUIRE(list.at(2).get_value() == "payload3"); + } + + SECTION("remove") { + HeaderList list(10); + //empty header name + list.add({{}, "payload1"}); + //empty payload + list.add({"header2", {}}); + list.add({"header3", "payload3"}); + //both null + list.add({{}, {}}); + //both empty (0 length strings) + list.add({"", ""}); + + //Remove a bogus name + Error err = list.remove("bogus"); + REQUIRE(err.get_error() == RD_KAFKA_RESP_ERR__NOENT); + //Remove header with name + list.remove("header2"); + REQUIRE(list.size() == 4); + list.remove("header3"); + REQUIRE(list.size() == 3); + //Remove headers without name + list.remove({}); + REQUIRE(list.size() == 0); + } +} + +TEST_CASE("copy and move", "[headers]") { + SECTION("copy owning") { + //Create an owning header list and copy it + HeaderList list(3), list2(3); + list.add({"header1", "payload1"}); + list.add({"header2", "payload2"}); + list.add({"header3", "payload3"}); + REQUIRE(list2.size() == 0); + list2 = list; + REQUIRE(list2.size() == 3); + REQUIRE(list2.size() == list.size()); + //make sure the handles are different + CHECK(list.get_handle() != list2.get_handle()); + CHECK(list.at(0) == list2.at(0)); + CHECK(list.at(1) == list2.at(1)); + CHECK(list.at(2) == list2.at(2)); + CHECK(list == list2); + } + + SECTION("copy owning with buffers") { + //Create an owning header list and copy it + HeaderList list(3), list2(3); + string payload1 = "payload1", payload2 = "payload2", payload3 = "payload3"; + list.add({"header1", payload1}); + list.add({"header2", payload2}); + list.add({"header3", payload3}); + REQUIRE(list2.size() == 0); + list2 = list; + REQUIRE(list2.size() == 3); + REQUIRE(list2.size() == list.size()); + //make sure the handles are different + CHECK(list.get_handle() != list2.get_handle()); + CHECK(list.at(0) == list2.at(0)); + CHECK(list.at(1) == list2.at(1)); + CHECK(list.at(2) == list2.at(2)); + CHECK(list == list2); + } + + SECTION("copy non-owning") { + //Create an owning header list and copy it + HeaderList list(3), list2(3), list3(HeaderList::make_non_owning(list.get_handle())); + list.add({"header1", "payload1"}); + list.add({"header2", "payload2"}); + list.add({"header3", "payload3"}); + list2 = list3; //copy non-owning list + REQUIRE(list.size() == 3); + REQUIRE(list3.size() == list.size()); + REQUIRE(list2.size() == list.size()); + //make sure the handles are the same + CHECK(list2.get_handle() == list3.get_handle()); + CHECK(list2.at(0) == list3.at(0)); + CHECK(list2.at(1) == list3.at(1)); + CHECK(list2.at(2) == list3.at(2)); + CHECK(list2 == list3); + } + + SECTION("move") { + HeaderList list(3), list2; + list.add({"header1", "payload1"}); + list.add({"header2", "payload2"}); + list.add({"header3", "payload3"}); + auto handle = list.get_handle(); + list2 = std::move(list); + CHECK_FALSE(!!list); + CHECK(!!list2); + CHECK(list2.size() == 3); + CHECK(handle == list2.get_handle()); + } +} + +TEST_CASE("access", "[headers]") { + HeaderList list(3); + list.add({"header1", "payload1"}); + list.add({"header2", "payload2"}); + list.add({"header3", "payload3"}); + CHECK(list.at(0).get_value() == "payload1"); + CHECK(list.at(1).get_value() == "payload2"); + CHECK(list.at(2).get_value() == "payload3"); + CHECK_THROWS_AS(list.at(3), Exception); + CHECK(list.front() == list.at(0)); + CHECK(list.back() == list.at(2)); +} + +TEST_CASE("iterate", "[headers]") { + HeaderList list(3); + REQUIRE(list.begin() == list.end()); + list.add({"header1", "payload1"}); + REQUIRE(list.begin() != list.end()); + CHECK(++list.begin() == list.end()); + list.add({"header2", "payload2"}); + list.add({"header3", "payload3"}); + int i = 0; + for (auto it = list.begin(); it != list.end(); ++it, ++i) { + CHECK(it->get_name().length() == 7); + if (i == 0) { + CHECK(it->get_name() == "header1"); + } + else if (i == 1) { + CHECK(it->get_name() == "header2"); + } + else if (i == 2) { + CHECK(it->get_name() == "header3"); + } + } + //rewind end() iterator + CHECK((--list.end())->get_name() == "header3"); +} + +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION + + diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 841c98a4..bf4130cb 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -184,6 +184,43 @@ TEST_CASE("simple production", "[producer]") { CHECK(message.get_timestamp()->get_timestamp() == timestamp); } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + SECTION("message with key and move-able headers") { + using Hdr = MessageBuilder::HeaderType; + const string payload = "Hello world! 2"; + const string key = "such key"; + const string header1, header2 = "", header3 = "header3"; + + const milliseconds timestamp{15}; + Producer producer(config); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition) + .key(key) + .payload(payload) + .timestamp(timestamp) + .header(Hdr{}) + .header(Hdr{"", header2}) + .header(Hdr{"header3", header3})); + runner.try_join(); + + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == 1); + const auto& message = messages[0]; + CHECK(message.get_payload() == payload); + CHECK(message.get_key() == key); + CHECK(message.get_topic() == KAFKA_TOPICS[0]); + CHECK(message.get_partition() == partition); + CHECK(!!message.get_error() == false); + REQUIRE(!!message.get_timestamp() == true); + CHECK(message.get_timestamp()->get_timestamp() == timestamp); + //validate headers + REQUIRE(!!message.get_header_list()); + REQUIRE(message.get_header_list().size() == 3); + CHECK(message.get_header_list().front() == Hdr{}); + CHECK(message.get_header_list().at(1) == Hdr{"", header2}); + CHECK(message.get_header_list().back() == Hdr{"header3", header3}); + } +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION + SECTION("message without message builder") { const string payload = "Goodbye cruel world!"; const string key = "replay key"; @@ -315,6 +352,52 @@ TEST_CASE("multiple messages", "[producer]") { } } +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) +TEST_CASE("multiple messages with copy-able headers", "[producer][headers]") { + using Hdr = MessageBuilder::HeaderType; + size_t message_count = 2; + string payload = "Hello world with headers"; + const string header1, header2 = "", header3 = "header3"; + + // Create a consumer and subscribe to this topic + Consumer consumer(make_consumer_config()); + consumer.subscribe({ KAFKA_TOPICS[0] }); + ConsumerRunner runner(consumer, message_count, KAFKA_NUM_PARTITIONS); + + // Now create a producer and produce a message + Producer producer(make_producer_config()); + MessageBuilder builder(KAFKA_TOPICS[0]); + builder.payload(payload) + .header(Hdr{}) + .header(Hdr{"", header2}) + .header(Hdr{"header3", header3}); + producer.produce(builder); + producer.produce(builder); + + //Check we still have the messages after production + CHECK(!!builder.header_list()); + CHECK(builder.header_list().size() == 3); + + runner.try_join(); + + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == message_count); + const auto& message = messages[0]; + CHECK(message.get_payload() == payload); + CHECK(!!message.get_error() == false); + //validate headers + REQUIRE(!!message.get_header_list()); + REQUIRE(message.get_header_list().size() == 3); + CHECK(message.get_header_list().front() == Hdr{}); + CHECK(message.get_header_list().at(1) == Hdr{"", header2}); + CHECK(message.get_header_list().back() == Hdr{"header3", header3}); + + //validate second message + CHECK(messages[0].get_header_list() == messages[1].get_header_list()); + CHECK(messages[0].get_header_list().get_handle() != messages[1].get_header_list().get_handle()); +} +#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION + TEST_CASE("multiple sync messages", "[producer][buffered_producer][sync]") { size_t message_count = 10; set payloads;