-
Notifications
You must be signed in to change notification settings - Fork 215
header support implementation #115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Sweet! Thanks for the PR, this is great. I'll have a look at it in more detail later but on a first pass it seems alright. |
I presume given point 1, there can be multiple headers with the same name? |
Yes, any number. The rdkafka list is a vector. Any add or remove operations will invalidate iterators. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I managed to get back to this and have a look at the code. It looks good, just a few comments. Regarding your third comment about the produce performance, I think that's a good idea. Maybe having something like this would work?
void produce(MessageBuilder&& b) {
// move them
do_produce(b, move(b.header_list());
}
void produce(const MessageBuilder& b) {
// copy them
do_produce(MessageBuilder::HeaderListType(b.header_list));
}
include/cppkafka/header.h
Outdated
using ValueType = BufferType; | ||
Header() = default; | ||
|
||
Header(const std::string& name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since name
is being stored here, I would normally take it by value and then std::move
it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BufferType is also stored...shouldn't it also in this case be passed by value and moved ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you pls comment on this...I'm not sure if you also want the BufferType to be passed by value instead of lvalue/rvalue ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given a BufferType
can't be moved automatically, I don't think this one should take a value. Probably just have both constructors take a string by value.
include/cppkafka/header.h
Outdated
const BufferType& value) | ||
: name_(name), | ||
value_(make_value(value)) { | ||
assert(!name.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't played around with headers but just in case: is it impossible for a header to have an empty name? It looks like the header is defined as just a string in the protocol so I don't know if the brokers validate the length before accepting them. I know this wouldn't make sense but I'd rather be sure this won't backfire.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I will remove the assert and I will add a null-name test to see if kafka likes it.
include/cppkafka/header_list.h
Outdated
|
||
// prefix increment | ||
HeaderIterator& operator++() | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: brace on line above please (same for all methods below)
include/cppkafka/message.h
Outdated
/** | ||
* \brief Gets the message latency in microseconds as measured from the produce() call. | ||
*/ | ||
int64_t get_latency() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably return std::chrono::microseconds
include/cppkafka/header_list.h
Outdated
// prefix decrement | ||
HeaderIterator& operator--() | ||
{ | ||
if (index_ == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these types of checks should be asserts. If people don't know how to use an iterator, it's their problem. I'd rather take the undefined behavior way like STL does.
include/cppkafka/header_list.h
Outdated
// prefix increment | ||
HeaderIterator& operator++() | ||
{ | ||
if (*this == header_list_.end()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as below regarding the use of asserts for this.
include/cppkafka/header_list.h
Outdated
throw Exception("Over bounds"); | ||
} | ||
if (++index_ >= header_list_.size()) { | ||
*this = header_list_.end(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mixed comparisons against header_list_.size()
and header_list_.end()
are kind of confusing. Why can't this just use the index for everything rather than assigning end
to itself? e.g. this method's implementation could simply be:
HeaderIterator& operator++() {
assert(index_ < header_list_.size());
++index_;
return *this;
}
I think the reason why you currently can't do this is because your implementation of HeaderList::end
returns Iterator{make_non_owning(handle_.get()), -1}
(btw I think that -1 is wrong given the index is a size_t
, that should be at least throwing a warning). If that method instead returned Iterator{make_non_owning(handle_.get()), size()}
then you could just use the index to identify the iterator and you wouldn't have to compare against header_list_.end()
all the time. Plus, this is in line with what e.g. std::vector
does where end
returns a one-past-the-end iterator.
Am I missing something as to why it couldn't be implemented that way? It just seems like there's too many checks here and methods could be a lot simpler instead. Right now the logic for operator--
seems overly complicated just because when you're at end
, your index is not actually pointing at size()
but to -1 (which I presume means numeric_limits<size_t>::max()
in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so there's a bit of a story about this particular implementation. I know it can easily be achieved with the last index (and I will change the code to reflect your simpler approach), however my reasons initially were the following:
- I assumed that the rdkafka header list implementation was a list (turns out it's a vector) and because we don't have access to the underlying container I still wanted to keep the list iterator behavior where:
auto endIt = list.end();
list.emplace_back(...); //add one ore more elements or delete them
assert (list.end() == endIt); //end() is immutable no matter how many operations you do
//obviously this is not valid for a vector, and since the underlying
//container is a vector, we can just put a warning that any operation
//on the list will invalidate all iterators (including end())
In order to achieve immutability, I had to create this -1 (max size_t) trick and assign it to the iterator if rdkafka getter returns ...NO_ENT
error.
- I initially had two more overloads for doing name-based iteration (
begin(name)
andend(name)
) to support that API as well...and when you iterate by name, the last element is not necessarily the highest index in the list. Even if I userd_kafka_header_get_last
to find the last element, it does not return the index it's at so I have no clue what the next position is...hence, both these overloads had to have a way to indicateend()
.
Since we now know it's a vector, and we don't care about the name-based iterator, then using your simplified logic is better! Just thought I'd give you my reasoning so you don't think i wrote overly complicated code for no reason :).
include/cppkafka/header_list.h
Outdated
HeaderType at(size_t index) const; //throws | ||
HeaderType front() const; | ||
HeaderType back() const; | ||
HeaderType back(const std::string& name) const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason I feel like we shouldn't even wrap rd_kafka_header_get_last
or it shouldn't be called back
. I think the semantics are a bit confusing and I don't really see the point of it. Why would someone want to find the last occurrence of a header with a particular name? You also can't call this function find
because it doesn't really find the header, it gets the last one with that name. So, I think it may be less confusing to just remove it altogether.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i just had it for completeness reasons, but I agree with you, seems like a totally useless function. I'll remove it.
include/cppkafka/header_list.h
Outdated
Error HeaderList<HeaderType>::add(const HeaderType& header) { | ||
assert(handle_); | ||
return rd_kafka_header_add(handle_.get(), | ||
header.get_name().c_str(), header.get_name().size(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I'd rather do header.get_name().data()
for consistency (same in next 2 methods).
* \brief Detaches the message's header list | ||
*/ | ||
template <typename HeaderType> | ||
HeaderList<HeaderType> detach_header_list() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up on your comment, I think this is fine. I'd rather be explicit about wanting to detach them than having to do move(message).detach_header_list()
.
Regarding ...
no this will not work. What needs to be done for the In the case of For the const ref methods you have to clone. There's not option there. PS: regarding detaching the handle from the
Not sure what you prefer. Please indicate and I will implement it. |
I see your point, makes sense. I think I would go the |
updated travis file with v0.11.5
Not really sure why all these tests are failing, everything passes in my environment. Also the consumer tests haven't changed at all. Could it be some timing issue on the build server where travis runs ?
|
Hmm it's weird because the last two (#1 and #2) builds failed on exactly the same tests. If this was a timing issue I'd expect it to be more unpredictable. Unrelated to failing tests but I forgot to mention: the header support should be allowed to be disabled via some compilation flag. Otherwise nobody would be able to use this with rdkafka < whatever version header support was introduced in. |
Yeah makes sense...we can do something similar to the Queue memory leak issue. I'll fix it. The producer code will look somewhat uglier as that's the entry point of use... |
Good to go! |
It might well be that the rdkafka changes from 0.11.0 to 0.11.5 make a difference in timing. They made many perf improvements, hence why the same tests failed. Your consumer runner class might need a tweak perhaps? |
include/cppkafka/header.h
Outdated
* \param name The header name | ||
* \param value The header data to be moved | ||
*/ | ||
Header(const std::string name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be const
, otherwise it won't be moved. Same with the overload above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silly me. Of course...good catch!
src/producer.cpp
Outdated
|
||
void Producer::produce(MessageBuilder&& builder) { | ||
MessageBuilder temp(std::move(builder)); //owns header list after the move | ||
do_produce(temp, MessageBuilder::HeaderListType(temp.header_list().release_handle())); //move headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I missing something as to why you can't just use builder
without moving it into a temporary? Normally after moving the moved object is left in an unspecified state, so if your worries were in the lines of "we should still move it away" then it's fine, since it's unspecified I think moving only what's necessary (the headers in this case) is alright.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that was the reason, I wanted to make sure a moved object will be empty afterwards (not just the headers) regardless if it's called from a temp rvalue or from a moved lvalue. Ok will remove it.
src/producer.cpp
Outdated
|
||
void Producer::produce(Message&& message) { | ||
Message temp(std::move(message)); //rdakfka still owns the header list at this point | ||
do_produce(temp, HeaderList<Message::HeaderType>(temp.detach_header_list<Message::HeaderType>())); //move headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the HeaderList<Message::HeaderType>
is unnecessary, unless I'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you're right, I'll remove it.
CHECK(messages[0].get_header_list() == messages[1].get_header_list()); | ||
CHECK(messages[0].get_header_list().get_handle() != messages[1].get_header_list().get_handle()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job on the tests!
I think this looks good! I'm going to clone it over the weekend, play around a bit and see if I can fix the test issues and we can probably merge it after that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good. It looks like tests started failing sporadically when I upgraded my kafka broker to 2.0.0 (it was ~0.10 before). As far as I can see, when a consumer unsubscribes and another subscribes right after (such is the case when running tests) kafka doesn't immediately trigger a partition assignment. I presume this is to give it some grace period or something like that, maybe to reduce the number of rebalances.
It seems like if you increase the timeout in the BasicConsumerRunner
from 20 seconds to something > 30 (35?), there's fewer tests that fail. I still can't find a way to make it work every time though.
tests/headers_test.cpp
Outdated
//Create an owning header list and copy it | ||
HeaderList<BufferHeader> list(3), list2(3); | ||
string payload1 = "payload1", payload2 = "payload2", payload3 = "payload3"; | ||
list.add(BufferHeader(std::string("header1"), payload1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for the record, this can simply be
list.add({ "header1", payload1 });
No need to change it, but just to point out there's no need to be that explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure but how will the compiler know the instantiate a BufferHeader
vs a StringHeader
with that initializer list? Both can be created with the same arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HeaderList::add
takes a HeaderType
which for that particular instantiation (list
), that type is BufferHeader
so there's no ambiguity of which one you're referring to.
Instead of using a consumer runner, can't you produce say N messages, then start the consumer with relative offsets (last-N) since there are no local offsets cached at this point, read N messages till EOF and then compare? This way you eliminate the runner altogether. |
Well, some tests require simply subscribing and there could be some uncommitted messages in the topic you subscribe to. Also you need to make sure that you get the messages at some point, as if there's some bug and you don't produce them, you don't want the runner to be stuck forever. It's a pretty hacky solution but it kinda worked until it didn't. |
The consumer block can be solved easily by only polling x amount of times or with a max timer. I'm not sure what happens if you have uncomitted messages in the topic, and you open the consumer with relative offset "latest". Is it latest committed + 1 or just latest timewise, aka top of the queue? |
Creating 500 topics will still be an issue if a previous run on the same topic fails. Plus, that's not very maintainable :P. If you keep using the same consumer group id, the auto offset reset policy is irrelevant because that's a fallback in case the consumer doesn't have any already committed offset for a topic/partition. Since there's been previous runs of the tests and offsets were committed, this is irrelevant. Now, if you used a random consumer group id on each run and use reset policy "latest", that would work. I'm not a fan of this approach but if it's the only way to make it work reliably then that's fine. |
Yeah i see what you mean. Ideally a consumer group id made from a GUID would be unique for each test, esp a v1Guid which is time based. Alternatively maybe blowing away the topics and recreating them on startup ensures everything is empty. I was assuming the travis sandbox would be a blank state each time you run kafka, meaning no topic or partitions leftovers from previous runs. |
Travis does run in a clean state, but the issue is that all tests share the same topics and consumer groups. I think using a random string as the consumer group is enough here. Something like 16 alphanums generated via |
Right, each test gets a new producer, consumer pair. One other possibility is to implement the admin api and generate/teardown topics dynamically? Not too familiar with this api though... |
Yeah, I think we can merge this. I wouldn't go the admin API way because that only works in some recent version of rdkafka so you wouldn't be able to test it locally. Can you just rebase this one? I should have some time in the afternoon today or tomorrow so I can give it a try. |
Thanks! |
Hi @mfontanini, please take a look. This is just the first pass of the implementation for review purposes only. I haven't written any tests yet and also have not decorated methods with doxygen comments as things may change.
Some considerations:
rd_kafka_header_get
the reason being that the more genericrd_kafka_header_get_all
can be used as a forward/reverse iterator and can actually have anend()
function which can be used for bound checking when iterating and matches well with STL container API. With the name based getter, there is no way of guaranteeing whatend()
will be, hence iterating can only be implemented as a forward iterator with a while loop. I felt that in any case, the number of headers won't be very large and anyone using the generic iterator over all elements can simply check for the header name they want, so there's no loss in functionality and the code is less complicated.The rdkafka iterator API is strange because it lets you iterate by index, but there's no API to delete a header at a certain position, instead it deletes by name, which could have duplicates, in which case one delete operation can remove entire portions of the underlying vector (I checked the rdkafka code and the header list is indeed a vector).
HeaderList<HeaderType> Message::detach_header_list()
function could be implemented in a more c++ way asHeaderList<HeaderType> Message::get_header_list() &&
which would be an overload of the constget_header_list()
but would apply only to rvalue messages. The effect would be the same. Not sure what your preference is.Producer::produce(const MessageBuilder)
andProducer::produce(const Message)
functions. The problem is thatrd_kafka_producev()
takes ownership of the header list and because bothMessageBuilder
andMessage
are const, you can't make the underlying handle non-owning (which would be the fastest). Also a user might produce multiple messages with the same builder object so you have to maintain ownership in theconst
case,which forces you to clone the header list each time. Perhaps we can add two more produce functions with move semantics which would take
MessageBuilder&&
andMessage&&
?HeaderList
class, I made the following assumptions. If the list object is non-owning, the copy would not invoke clone. If on the other hand you own the handle, then a copy operation would clone the headers. You can also have an owning object which on copy simply returns a non-owning object, but I don't think it's the proper logic.Message::get_latency()
api which was missing.Message
has aHeaderList<Buffer>
type which is non-owning, whereasMessageBuilder
has aHeaderList<T>
where T matches theBufferType
template parameter.