Skip to content

header support implementation #115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ compiler:
- clang

env:
- RDKAFKA_VERSION=v0.11.0
- RDKAFKA_VERSION=v0.11.5

os:
- linux
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ only supported via the high level consumer API. _cppkafka_ requires **rdkafka >=
order to use it. Other wrapped functionalities are also provided, like fetching metadata,
offsets, etc.

* _cppkafka_ provides message header support. This feature requires **rdkafka >= 0.11.4**.

* _cppkafka_ tries to add minimal overhead over _librdkafka_. A very thin wrapper for _librdkafka_
messages is used for consumption so there's virtually no overhead at all.

Expand Down
8 changes: 8 additions & 0 deletions include/cppkafka/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ CPPKAFKA_API bool operator==(const Buffer& lhs, const Buffer& rhs);
*/
CPPKAFKA_API bool operator!=(const Buffer& lhs, const Buffer& rhs);

/**
* Compares Buffer objects lexicographically
*/
CPPKAFKA_API bool operator<(const Buffer& lhs, const Buffer& rhs);
CPPKAFKA_API bool operator<=(const Buffer& lhs, const Buffer& rhs);
CPPKAFKA_API bool operator>(const Buffer& lhs, const Buffer& rhs);
CPPKAFKA_API bool operator>=(const Buffer& lhs, const Buffer& rhs);

} // cppkafka

#endif // CPPKAFKA_BUFFER_H
30 changes: 25 additions & 5 deletions include/cppkafka/clonable_ptr.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ template <typename T, typename Deleter, typename Cloner>
class ClonablePtr {
public:
/**
* Creates an instance
* \brief Creates an instance
*
* \param ptr The pointer to be wrapped
* \param deleter The deleter functor
Expand All @@ -60,17 +60,23 @@ class ClonablePtr {
* \param rhs The pointer to be copied
*/
ClonablePtr(const ClonablePtr& rhs)
: handle_(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()), cloner_(rhs.cloner_) {
: handle_(rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter())),
cloner_(rhs.cloner_) {

}

/**
* Copies and assigns the given pointer
* \brief Copies and assigns the given pointer
*
* \param rhs The pointer to be copied
*/
ClonablePtr& operator=(const ClonablePtr& rhs) {
handle_.reset(cloner_(rhs.handle_.get()));
if (this == &rhs) {
return *this;
}
handle_ = rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter());
return *this;
}

Expand All @@ -79,11 +85,25 @@ class ClonablePtr {
~ClonablePtr() = default;

/**
* Getter for the internal pointer
* \brief Getter for the internal pointer
*/
T* get() const {
return handle_.get();
}

/**
* \brief Releases ownership of the internal pointer
*/
T* release() {
return handle_.release();
}

/**
* \brief Indicates whether this ClonablePtr instance is valid (not null)
*/
explicit operator bool() const {
return static_cast<bool>(handle_);
}
private:
std::unique_ptr<T, Deleter> handle_;
Cloner cloner_;
Expand Down
3 changes: 3 additions & 0 deletions include/cppkafka/cppkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
#include <cppkafka/error.h>
#include <cppkafka/exceptions.h>
#include <cppkafka/group_information.h>
#include <cppkafka/header.h>
#include <cppkafka/header_list.h>
#include <cppkafka/header_list_iterator.h>
#include <cppkafka/kafka_handle_base.h>
#include <cppkafka/logging.h>
#include <cppkafka/macros.h>
Expand Down
195 changes: 195 additions & 0 deletions include/cppkafka/header.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#ifndef CPPKAFKA_HEADER_H
#define CPPKAFKA_HEADER_H

#include "macros.h"
#include "buffer.h"
#include <string>
#include <assert.h>

#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)

namespace cppkafka {

/**
* \brief Class representing a rdkafka header.
*
* The template parameter 'BufferType' can represent a cppkafka::Buffer, std::string, std::vector, etc.
* A valid header may contain an empty name as well as null data.
*/
template <typename BufferType>
class Header {
public:
using ValueType = BufferType;

/**
* \brief Build an empty header with no data
*/
Header() = default;

/**
* \brief Build a header instance
* \param name The header name
* \param value The non-modifiable header data
*/
Header(std::string name,
const BufferType& value);

/**
* \brief Build a header instance
* \param name The header name
* \param value The header data to be moved
*/
Header(std::string name,
BufferType&& value);

/**
* \brief Get the header name
* \return A reference to the name
*/
const std::string& get_name() const;

/**
* \brief Get the header value
* \return A const reference to the underlying buffer
*/
const BufferType& get_value() const;

/**
* \brief Get the header value
* \return A non-const reference to the underlying buffer
*/
BufferType& get_value();

/**
* \brief Check if this header is empty
* \return True if the header contains valid data, false otherwise.
*/
operator bool() const;

private:
template <typename T>
T make_value(const T& other);

Buffer make_value(const Buffer& other);

std::string name_;
BufferType value_;
};

// Comparison operators for Header type
template <typename BufferType>
bool operator==(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return std::tie(lhs.get_name(), lhs.get_value()) == std::tie(rhs.get_name(), rhs.get_value());
}

template <typename BufferType>
bool operator!=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return !(lhs == rhs);
}

template <typename BufferType>
bool operator<(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return std::tie(lhs.get_name(), lhs.get_value()) < std::tie(rhs.get_name(), rhs.get_value());
}

template <typename BufferType>
bool operator>(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return std::tie(lhs.get_name(), lhs.get_value()) > std::tie(rhs.get_name(), rhs.get_value());
}

template <typename BufferType>
bool operator<=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return !(lhs > rhs);
}

template <typename BufferType>
bool operator>=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
return !(lhs < rhs);
}

// Implementation
template <typename BufferType>
Header<BufferType>::Header(std::string name,
const BufferType& value)
: name_(std::move(name)),
value_(make_value(value)) {
}

template <typename BufferType>
Header<BufferType>::Header(std::string name,
BufferType&& value)
: name_(std::move(name)),
value_(std::move(value)) {
}

template <typename BufferType>
const std::string& Header<BufferType>::get_name() const {
return name_;
}

template <typename BufferType>
const BufferType& Header<BufferType>::get_value() const {
return value_;
}

template <typename BufferType>
BufferType& Header<BufferType>::get_value() {
return value_;
}

template <typename BufferType>
Header<BufferType>::operator bool() const {
return !value_.empty();
}

template <>
inline
Header<Buffer>::operator bool() const {
return value_.get_size() > 0;
}

template <typename BufferType>
template <typename T>
T Header<BufferType>::make_value(const T& other) {
return other;
}

template <typename BufferType>
Buffer Header<BufferType>::make_value(const Buffer& other) {
return Buffer(other.get_data(), other.get_size());
}

} //namespace cppkafka

#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION

#endif //CPPKAFKA_HEADER_H
Loading