Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

round robin polling for assigned partitions #63

Merged
merged 10 commits into from
May 30, 2018

Conversation

accelerated
Copy link
Contributor

Created two polling strategies to the consumer class - the default one represents the current behavior. The other polling strategy (aka partition round-robin) allows for individual consumption of assigned partitions instead of the batch method (current implementation with common forward queue).
Also created a Queue helper class to better separate polling logic and to generalize the polling mechanism at the Consumer layer i.e. the Consumer now consumes from a list of queues (may contain a single queue in the common forwarding case) irrespective of the polling strategy and is as such decoupled from underlying queue logic.

@accelerated
Copy link
Contributor Author

Initially in my Consumer::poll() method I was thinking to poll all queues until I get a valid message (see below) but the issue with this is the blocking call on rd_kafka_poll. Since I have to respect the timeout argument, I either have the choice to wait in the event poll or in the message consume functions. The problem with spending all the timeout in the rd_kafka_poll call is that events represent a small percentage of total events, the majority being messages, so it's much better to block on consume. So I could have moved the rd_kafka_poll call at the end with a timeout of 0, but then the problem is how to split the timout into N queues? So finally I opted to poll a single partition per Consumer::poll() call.

Message Consumer::poll(milliseconds timeout) {
    // Poll the general event queue
    rd_kafka_poll(get_handle(), static_cast<int>(timeout.count()));

    Message message;
    size_t polls = partition_queues_.size();
    while (polls--) {
        message = partition_queues_[queue_index_++].consume(milliseconds(0));
        if (message) {
            break; //got a valid message
        }
        // adjust the queue index
        if (queue_index_ >= partition_queues_.size()) {
            queue_index_ = 0;
        }
    }
    return message;
}

A similar problem happens in the batch poll case. How do I split batches between N partitions? Do I get batch_size/N messages from each queue? what if some queues are empty? Then I would potentially return an incomplete batch set, while some other queues still contain messages. Also the poll timeout issue applies here as well. So once again I decided to only batch each partition on each call..it makes the logic much simpler and I also leave the rd_kafka_poll at the end, so that most of the blocking is done waiting for messages.

Thoughts on this?

@accelerated accelerated force-pushed the partition_poll branch 2 times, most recently from 2e46e87 to 4c7c98e Compare April 27, 2018 19:02
@accelerated
Copy link
Contributor Author

This final version works well. I fixed all the issues related to rd_kafka_poll not working.

@accelerated accelerated force-pushed the partition_poll branch 2 times, most recently from c729a1d to 5a35ff4 Compare April 27, 2018 19:54
@mfontanini
Copy link
Owner

As I mentioned in #62, other users may want to consume using other strategies. Instead of tying this to the Consumer class, I think this should be an external utility (e.g. RoundRobinConsumerHelper or something like that).

The reason why I created different utility classes (e.g. ConsumerHelper, BackoffCommitter, BufferedProducer) is because I think the Consumer/Producer classes should be kept simple: they're basically just a wrapper over the kafka handle. I think behavior like this one belongs on a separate class, maybe even a hierarchy which allows plugin in a poll strategy. If tomorrow I want to implement some sort of consumption (e.g. sorted by time), it would be nice if I could do it by simply inheriting a helper class and define some function that chooses which queue to consume from next and the rest is done automatically. Tying a "round robin strategy" on Consumer could be chaotic as in the future if 10 different strategies are implemented, the Consumer class will become really bloated and complex.

@accelerated
Copy link
Contributor Author

accelerated commented Apr 27, 2018

Ok I can easily do that. I'll expose a Queue get_queue(const TopicPartition&) method in the consumer class so that users can access to any queue you want (this would be non-owning). Then the helper class could take a reference to a consumer and internally get queues, poll, etc.
I am not sure how to manage these queues externally for the assign/revoke events since these callbacks would now have to be highjacked by the helper class. Unless you would be ok with registering a series of callbacks instead of just one?
Alternatively I could get the 3 rebalance callbacks registered, replace them by the helper ones and inside the helper ones call these ones. Then when the helper class is destructed, I could reset the original callbacks into the Consumer class.

If you can review the rest of the code (minus the Consumer class changes) that would be a good start.

@accelerated
Copy link
Contributor Author

Would you lean more towards inheritance, i.e. some modified consumer derived class which hides the poll methods of the parent class or a helper class where you pass-in a consumer object and then it queries queues on it and polls them? I would lean more towards the latter

@accelerated
Copy link
Contributor Author

Or thirdly, you could inject a polling strategy object into the consumer which would alter its runtime poll functions

@mfontanini
Copy link
Owner

I meant just like the ConsumerHelper class works: get a Consumer and work with it. The Consumer class shouldn't be changed unless absolutely required. The CompactedTopicProcessor class currently wraps whatever assignment/revocation callbacks the consumer has and injects a proxy that adds some extra logic.

@accelerated
Copy link
Contributor Author

accelerated commented Apr 28, 2018

Ok thanks, will submit final PR soon. Btw, the CompactedTopicProcessor is a neat helper class, too bad the poll functions are not virtual, otherwise I could derive from Consumer, override them and still be a able to use the CompactedTopicProcessor as well.

@accelerated
Copy link
Contributor Author

@mfontanini Code complete, please review. Will fix the failing assertion...

@accelerated accelerated force-pushed the partition_poll branch 4 times, most recently from 5a661dd to f2a5cb9 Compare April 30, 2018 21:42
@mfontanini
Copy link
Owner

Please don't use git reset and git push -f. It's really hard to see the deltas of your changes if there's always just a single commit that contains all the changes, especially if you push while I'm reviewing the code (but also for any changes you have to apply after the review).

@accelerated
Copy link
Contributor Author

Hi, yes will do. I'm actually making one more change, so stay tuned.

@accelerated
Copy link
Contributor Author

Ok done. I missed the scenario where the topic partitions list could contain multiple (different) topics during assignment/revocation.

Copy link
Owner

@mfontanini mfontanini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments, I'll give it another round soon.

* The returned message *might* be empty. If's necessary to check that it's a valid one before
* using it:
*
* \return A message. The returned message *might* be empty. If's necessary to check
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: If's

/**
* Gets the configured timeout.
*
* \sa Queue::set_timeout
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be set_consume_timeout although I think I prefer set_timeout for consistency with the rest of the code.

* \brief This adapter changes the default polling strategy of the Consumer into a fair round-robin
* polling mechanism.
*
* The default librdkafka (and cppkafka) poll() and poll_batch() behavior is to consume batches of
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment explains way too much. I don't think there's a need to explain the internals on rdkafka here. I would just remove all of this and keep the last paragraph, the "This adapter allows fair..." one.

utils/backoff_performer.cpp
utils/backoff_committer.cpp
)
file(GLOB SOURCES *.cpp utils/*.cpp)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not using glob for source files as it can have some bad side effects. Also please keep your changes self contained: you could have just added one line here instead of removing an entire block of code and replacing it with something else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you be more specific as to the bad side effects? I've been using this as long as I can remember and never had problems. Also we're using it for generating the global header file which is why I wanted to add it here. I also don't like having to edit the CMakeLists.txt file every time a cpp file gets added.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://stackoverflow.com/questions/32411963/why-is-cmake-file-glob-evil. This is not a massive project with hundreds of files and it's not like new files get added every day anyway.

return messages;
}
// concatenate both lists
messages.reserve(messages.size() + partition_messages.size());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to reserve here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use reserve for performance reasons and I think we should keep it as the batch size could be quite large. That's the reason I added the return line above the reserve, in case it's empty there's not need to go forward.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insert already expands the vector (once) so this call is useless. insert knows how many elements there are between the two you provide so it knows how much to enlarge the vector.

qlist& ref() { return queues_; }
// typedefs
using toppar_t = std::pair<std::string, int>; //<topic, partition>
using qmap_t = std::map<toppar_t, Queue>;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this QueueMap, the name is short enough that it's not worth abbreviating.

using qmap_t = std::map<toppar_t, Queue>;
using qiter_t = qmap_t::iterator;

qmap_t& ref() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this get_queues or something like that. Again, no need for extreme abbreviations for names this short.

qmap_t& ref() {
return queues_;
}

Queue& next() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_next_queue here maybe?

on_rebalance_error(error);
});
// make sure we don't have any active subscriptions
if (!consumer_.get_subscription().empty()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be checked before setting all the callback, otherwise you'll leave the Consumer dirty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the logic...instead of rejecting, I get current assignments which is the desired behavior.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Are you referring to the current code or some uncommitted fix?

if (messages.empty()) {
return partition_messages;
}
if (partition_messages.empty()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed. If partition_messages is empty, the lines below won't do anything anyway.

Copy link
Contributor Author

@accelerated accelerated May 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added to prevent the reserve(). Ideally MessageList would not be a vector but a list, as vectors don't scale well for large number of elements as mem allocations need to be contiguous so new() can fail.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok fixed.

@accelerated
Copy link
Contributor Author

accelerated commented May 1, 2018

So I removed that non-owning builder when creating a queue and tested with and without. The queues are still there, however if I create a second adapter (after the first one is destroyed), even though the queue handles are the same (now I'm decrementing the ref count - i.e. they are "owning") I don't get any messages whatsoever, both with the batch and non-batch versions. It's quite strange...
I can confirm another bug tho...calling commit() with NULL as partition list definitely does not commit the current assignments (as the library documentation says).

@mfontanini
Copy link
Owner

There should have been a test for commit with no topic/partitions as the parameter (my bad, should have asked for it). What happens when you call commit with null?

Also, there should be some tests for this so we can guarantee it works. You can have a look at how the travis build sets up kafka to make tests work. There basically need to be 2 topics (cppkafka_test1 and cppkafka_test2) with 3 partitions each, and when you invoke cmake you should pass -DKAFKA_TEST_INSTANCE=<endpoint where kafka is>.

src/consumer.cpp Outdated
output.emplace_back(ptr);
}
return output;
return MessageList(raw_messages.begin(), raw_messages.end());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, looks much better.

@accelerated
Copy link
Contributor Author

accelerated commented May 1, 2018

Will add the travis tests...stay tuned. Ref to commit() it returns an error saying nothing was committed. The code itself is a very thin wrapper on rdkafka library, there's not much to test in your library.

@mfontanini
Copy link
Owner

Keep in mind you can run this locally by running ./tests/cppkafka_tests.

Regarding the commit part, it could at least check if something is actually committed (see the consumer offset commit test in consumer_test.cpp).

@accelerated
Copy link
Contributor Author

My suspicions with destroying the queues after use are confirmed. librdkafka will flush and destroy all messages and the queue instance, however if you query a new reference to this queue at a later point in time, a new queue is created yet it's not functional - i.e. no messages are enqueued and no messages are fetched from the broker - same happens with the global consumer queue which stops working completely.
I believe that one must call rd_kafka_consume_start_queue() with a specific offset but even that fails for me and if I then poll the main event queue, I get a crash. I've tried a bunch of different combinations of rd_kafka_consume_stop_queue then destroy and start...and nothing works.
The current implementation with non-owning works very well, I even tested polling via one adapter, deleting it, polling directly via consumer and then polling again via a new adapter and all works fine and the queues transition well even during a rebalance. Haven't seen duplicate messages being consumed either which means offsets are properly managed. I will add Travis test cases asap.

@mfontanini
Copy link
Owner

Could you create a ticket on rdkafka regarding this with your findings? I'm not doubting what you did but this looks very broken on the rdkafka side and it shouldn't be happening. At some point the queues should be destroyed and I don't know what happens if you don't do so.

@accelerated
Copy link
Contributor Author

See opened ticket with librdkafka.

@accelerated accelerated force-pushed the partition_poll branch 4 times, most recently from e034ded to deee472 Compare May 11, 2018 17:02
@accelerated
Copy link
Contributor Author

Tests passed. This looks like the final version unless you have more comments.

@mfontanini
Copy link
Owner

I've been quite busy recently and haven't had a chance to look at this. I'll review it in the next couple of days.

@accelerated accelerated force-pushed the partition_poll branch 4 times, most recently from 48d93af to 0788395 Compare May 15, 2018 15:07
if (partition_messages.empty()) {
return messages;
return;
}
// concatenate both lists
messages.reserve(messages.size() + partition_messages.size());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one's still not fixed.

MessageList& messages,
ssize_t& count,
milliseconds timeout)
{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal preference, but to be consistent with the rest of the code, this brace should be in the line above after the ).

*
* Each call to poll() will first consume from the global event queue and if there are
* no pending events, will attempt to consume from all partitions until a valid message is found.
* The timeout used on this call will be the one configured via RoundRobinPollStrategy::set_timeout.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should say PollInterface

/**
* \brief Polls for new messages
*
* Same as the other overload of RoundRobinPollStrategy::poll but the provided
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about PollInterface

/**
* \brief Polls all assigned partitions for a batch of new messages in round-robin fashion
*
* Same as the other overload of RoundRobinPollStrategy::poll_batch but the provided
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, re: PollInterface

*/

class RoundRobinPollStrategy : public PollStrategyBase
{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment regarding brace

* util classes such as BasicConsumerDispatcher.
*/
class PollStrategyAdapter : public Consumer
{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment regarding the brace

buffer_test.cpp
compacted_topic_processor_test.cpp
configuration_test.cpp
topic_partition_list_test.cpp
kafka_handle_base_test.cpp
producer_test.cpp
consumer_test.cpp
roundrobin_poll_test.cpp
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra spaces here as well

vector<int> partition_order = make_roundrobin_partition_vector(total_messages);

for (int i = 0; i < total_messages; ++i) {
REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+1]);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something but how does this guarantee the starting topic/partition will be what you expect? If say, you somehow took slightly longer to produce messages than you expect, you could end up having the consumer poll through the first partition, jump to the second and then get a message on that one. The tests seem to have succeeded but I'm curious about that potential race condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well most likely the partitions are emtpy, which means all the EOFs arrive in sequence, and then the first poll timeout is 1s which is more then enough to publish 3 messages, especially since kafka broker is on localhost. In my testing, the broker is in a remote datacenter cluster and it still works. I have slightly changed the test now, allowing for a different start partition in case the first poll (or other ones) times out.

PollStrategyAdapter consumer(make_consumer_config());
TopicPartitionList partitions;
for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace_back(KAFKA_TOPICS[0], i++));
consumer.assign(partitions);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why you're not using subscribe here? That involves a slightly trickier code flow so it may be worth using it instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will add subscribe.

@accelerated
Copy link
Contributor Author

@mfontanini any chance you can merge this?

@mfontanini
Copy link
Owner

Seems like this needs a rebase.

@accelerated
Copy link
Contributor Author

yeah because of other PRs which came after and were accepted before :). I'll fix it

@mfontanini mfontanini merged commit 15fdab6 into mfontanini:master May 30, 2018
@mfontanini
Copy link
Owner

Thanks!

@accelerated accelerated deleted the partition_poll branch May 30, 2018 19:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants