Skip to content

Commit fbe3759

Browse files
acceleratedmfontanini
authored andcommitted
Header support implementation (#115)
* header support implementation * Fixed issue when ptr is null and doesn't have a cloner function * Code complete with test cases updated travis file with v0.11.5 * Added compile time check for rdkafka header support version * Changes per last code review * Using brace list initializers
1 parent 9af4330 commit fbe3759

20 files changed

+1301
-36
lines changed

Diff for: .travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ compiler:
77
- clang
88

99
env:
10-
- RDKAFKA_VERSION=v0.11.0
10+
- RDKAFKA_VERSION=v0.11.5
1111

1212
os:
1313
- linux

Diff for: README.md

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ only supported via the high level consumer API. _cppkafka_ requires **rdkafka >=
1717
order to use it. Other wrapped functionalities are also provided, like fetching metadata,
1818
offsets, etc.
1919

20+
* _cppkafka_ provides message header support. This feature requires **rdkafka >= 0.11.4**.
21+
2022
* _cppkafka_ tries to add minimal overhead over _librdkafka_. A very thin wrapper for _librdkafka_
2123
messages is used for consumption so there's virtually no overhead at all.
2224

Diff for: include/cppkafka/buffer.h

+8
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,14 @@ CPPKAFKA_API bool operator==(const Buffer& lhs, const Buffer& rhs);
172172
*/
173173
CPPKAFKA_API bool operator!=(const Buffer& lhs, const Buffer& rhs);
174174

175+
/**
176+
* Compares Buffer objects lexicographically
177+
*/
178+
CPPKAFKA_API bool operator<(const Buffer& lhs, const Buffer& rhs);
179+
CPPKAFKA_API bool operator<=(const Buffer& lhs, const Buffer& rhs);
180+
CPPKAFKA_API bool operator>(const Buffer& lhs, const Buffer& rhs);
181+
CPPKAFKA_API bool operator>=(const Buffer& lhs, const Buffer& rhs);
182+
175183
} // cppkafka
176184

177185
#endif // CPPKAFKA_BUFFER_H

Diff for: include/cppkafka/clonable_ptr.h

+25-5
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ template <typename T, typename Deleter, typename Cloner>
4141
class ClonablePtr {
4242
public:
4343
/**
44-
* Creates an instance
44+
* \brief Creates an instance
4545
*
4646
* \param ptr The pointer to be wrapped
4747
* \param deleter The deleter functor
@@ -60,17 +60,23 @@ class ClonablePtr {
6060
* \param rhs The pointer to be copied
6161
*/
6262
ClonablePtr(const ClonablePtr& rhs)
63-
: handle_(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()), cloner_(rhs.cloner_) {
63+
: handle_(rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
64+
std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter())),
65+
cloner_(rhs.cloner_) {
6466

6567
}
6668

6769
/**
68-
* Copies and assigns the given pointer
70+
* \brief Copies and assigns the given pointer
6971
*
7072
* \param rhs The pointer to be copied
7173
*/
7274
ClonablePtr& operator=(const ClonablePtr& rhs) {
73-
handle_.reset(cloner_(rhs.handle_.get()));
75+
if (this == &rhs) {
76+
return *this;
77+
}
78+
handle_ = rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
79+
std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter());
7480
return *this;
7581
}
7682

@@ -79,11 +85,25 @@ class ClonablePtr {
7985
~ClonablePtr() = default;
8086

8187
/**
82-
* Getter for the internal pointer
88+
* \brief Getter for the internal pointer
8389
*/
8490
T* get() const {
8591
return handle_.get();
8692
}
93+
94+
/**
95+
* \brief Releases ownership of the internal pointer
96+
*/
97+
T* release() {
98+
return handle_.release();
99+
}
100+
101+
/**
102+
* \brief Indicates whether this ClonablePtr instance is valid (not null)
103+
*/
104+
explicit operator bool() const {
105+
return static_cast<bool>(handle_);
106+
}
87107
private:
88108
std::unique_ptr<T, Deleter> handle_;
89109
Cloner cloner_;

Diff for: include/cppkafka/cppkafka.h

+3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
#include <cppkafka/error.h>
4040
#include <cppkafka/exceptions.h>
4141
#include <cppkafka/group_information.h>
42+
#include <cppkafka/header.h>
43+
#include <cppkafka/header_list.h>
44+
#include <cppkafka/header_list_iterator.h>
4245
#include <cppkafka/kafka_handle_base.h>
4346
#include <cppkafka/logging.h>
4447
#include <cppkafka/macros.h>

Diff for: include/cppkafka/header.h

+195
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#ifndef CPPKAFKA_HEADER_H
31+
#define CPPKAFKA_HEADER_H
32+
33+
#include "macros.h"
34+
#include "buffer.h"
35+
#include <string>
36+
#include <assert.h>
37+
38+
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
39+
40+
namespace cppkafka {
41+
42+
/**
43+
* \brief Class representing a rdkafka header.
44+
*
45+
* The template parameter 'BufferType' can represent a cppkafka::Buffer, std::string, std::vector, etc.
46+
* A valid header may contain an empty name as well as null data.
47+
*/
48+
template <typename BufferType>
49+
class Header {
50+
public:
51+
using ValueType = BufferType;
52+
53+
/**
54+
* \brief Build an empty header with no data
55+
*/
56+
Header() = default;
57+
58+
/**
59+
* \brief Build a header instance
60+
* \param name The header name
61+
* \param value The non-modifiable header data
62+
*/
63+
Header(std::string name,
64+
const BufferType& value);
65+
66+
/**
67+
* \brief Build a header instance
68+
* \param name The header name
69+
* \param value The header data to be moved
70+
*/
71+
Header(std::string name,
72+
BufferType&& value);
73+
74+
/**
75+
* \brief Get the header name
76+
* \return A reference to the name
77+
*/
78+
const std::string& get_name() const;
79+
80+
/**
81+
* \brief Get the header value
82+
* \return A const reference to the underlying buffer
83+
*/
84+
const BufferType& get_value() const;
85+
86+
/**
87+
* \brief Get the header value
88+
* \return A non-const reference to the underlying buffer
89+
*/
90+
BufferType& get_value();
91+
92+
/**
93+
* \brief Check if this header is empty
94+
* \return True if the header contains valid data, false otherwise.
95+
*/
96+
operator bool() const;
97+
98+
private:
99+
template <typename T>
100+
T make_value(const T& other);
101+
102+
Buffer make_value(const Buffer& other);
103+
104+
std::string name_;
105+
BufferType value_;
106+
};
107+
108+
// Comparison operators for Header type
109+
template <typename BufferType>
110+
bool operator==(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
111+
return std::tie(lhs.get_name(), lhs.get_value()) == std::tie(rhs.get_name(), rhs.get_value());
112+
}
113+
114+
template <typename BufferType>
115+
bool operator!=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
116+
return !(lhs == rhs);
117+
}
118+
119+
template <typename BufferType>
120+
bool operator<(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
121+
return std::tie(lhs.get_name(), lhs.get_value()) < std::tie(rhs.get_name(), rhs.get_value());
122+
}
123+
124+
template <typename BufferType>
125+
bool operator>(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
126+
return std::tie(lhs.get_name(), lhs.get_value()) > std::tie(rhs.get_name(), rhs.get_value());
127+
}
128+
129+
template <typename BufferType>
130+
bool operator<=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
131+
return !(lhs > rhs);
132+
}
133+
134+
template <typename BufferType>
135+
bool operator>=(const Header<BufferType>& lhs, const Header<BufferType>& rhs) {
136+
return !(lhs < rhs);
137+
}
138+
139+
// Implementation
140+
template <typename BufferType>
141+
Header<BufferType>::Header(std::string name,
142+
const BufferType& value)
143+
: name_(std::move(name)),
144+
value_(make_value(value)) {
145+
}
146+
147+
template <typename BufferType>
148+
Header<BufferType>::Header(std::string name,
149+
BufferType&& value)
150+
: name_(std::move(name)),
151+
value_(std::move(value)) {
152+
}
153+
154+
template <typename BufferType>
155+
const std::string& Header<BufferType>::get_name() const {
156+
return name_;
157+
}
158+
159+
template <typename BufferType>
160+
const BufferType& Header<BufferType>::get_value() const {
161+
return value_;
162+
}
163+
164+
template <typename BufferType>
165+
BufferType& Header<BufferType>::get_value() {
166+
return value_;
167+
}
168+
169+
template <typename BufferType>
170+
Header<BufferType>::operator bool() const {
171+
return !value_.empty();
172+
}
173+
174+
template <>
175+
inline
176+
Header<Buffer>::operator bool() const {
177+
return value_.get_size() > 0;
178+
}
179+
180+
template <typename BufferType>
181+
template <typename T>
182+
T Header<BufferType>::make_value(const T& other) {
183+
return other;
184+
}
185+
186+
template <typename BufferType>
187+
Buffer Header<BufferType>::make_value(const Buffer& other) {
188+
return Buffer(other.get_data(), other.get_size());
189+
}
190+
191+
} //namespace cppkafka
192+
193+
#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION
194+
195+
#endif //CPPKAFKA_HEADER_H

0 commit comments

Comments
 (0)