Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause/resume a consumer by topic #67

Merged
merged 7 commits into from
May 23, 2018

Conversation

accelerated
Copy link
Contributor

The base class KafkaHandleBase currently allows pausing/resuming by partitions which has fine granularity. In some use cases, the partitions are not really known or exposed to the application and furthermore, pausing/resuming action is mostly thought conceptually of at the topic level rather than at the partition level. These methods facilitate this.

Note: I debated whether or not these should go into the base class as well, but there are a few caveats with the producer.

  1. Pausing a producer can easily be done at the application level.
  2. Producers can always produce to new topics (as long as they are on the registered broker list) ad-hoc and the partitioning is not available unless one queries the metadata from the brokers or uses a partitioning callback. Making this metadata call is fairly expensive and I'm not sure it's worth doing with each pause/resume. Metadata could be cached locally but this makes things more complicated.
  3. In the case where this is really needed, the base class method can be used.

Copy link
Owner

@mfontanini mfontanini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just a couple of comments.

src/consumer.cpp Outdated
TopicPartitionList matches;
for (const auto& topic : topics) {
for (auto& partition : partitions) {
bool match = equal(topic.begin(), topic.end(), partition.get_topic().begin(),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this blow up if topic.size() > partition.get_topic().size()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah looks like the signature was improved in C++17 with a fourth end iterator. I'll put a boundary check.

src/consumer.cpp Outdated
@@ -48,6 +52,23 @@ void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
static_cast<Consumer*>(opaque)->handle_rebalance(error, list);
}

TopicPartitionList Consumer::get_matching_partitions(TopicPartitionList&& partitions,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an rvalue reference and not a const ref?

Copy link
Contributor Author

@accelerated accelerated May 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I move it internally. I don't need to make a copy. The return of get_assignement is passed-in directly as an rvalue. Unless you think you may need to use this function elsewhere in the class...?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see the move. But anyway, in that case it should be taken by value, not by rvalue ref. The value will be moved into this function so won't be making any copies.

src/consumer.cpp Outdated
pause_partitions(get_assignment());
}

void Consumer::pause_topics(const std::vector<string>& topics) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No std here please.

@accelerated
Copy link
Contributor Author

I made the subset selector function global since lots of methods take a topic partition list which could be used by topic instead. Therefore instead of providing overloads for all of them (very tedious) the selector by topic is now a util.

@@ -54,6 +54,16 @@ CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_part
CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions);
CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle);

// Extracts a partition list subset belonging to the provided topics (case-insensitive)
CPPKAFKA_API TopicPartitionList make_subset(const TopicPartitionList& partitions,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is okay here but I'm not sure about the name, "make_subset" doesn't really explain what the function does. Maybe find_matches or something like that? As much as this does return a subset of the input, the important thing is how the subset is created and not the fact that it will return a subset.

const vector<string>& topics) {
vector<bool> skip(partitions.size(), false);
TopicPartitionList subset;
for (const auto& topic : topics) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you flip this loop (first over partitions, then over topics), won't you get rid of the skip vector? Is that there in case there's duplicates in topics or am I missing something? Normally I'd use a set/unordered_set for topics as you want to enforce that there's actually no dupes (plus it makes the lookup much simpler once the loop is flipped). Also, even if this is a simple function, a simple test making sure it does what you want would be nice.

@accelerated
Copy link
Contributor Author

Agreed, the loop flip did make things simpler. I also added the set instead of the vector and two test cases.

const set<int>& ids) {
TopicPartitionList subset;
for (const auto& partition : partitions) {
for (const auto& id : ids) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole loop can be replaced by

if (ids.count(partition.get_partition()) > 0) {
    subset.emplace_back(partition);
}

I forgot you were doing the case insensitive comparisons for the topic case so you can't do this there, but this is mostly what I was going for with the set suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can put this code for the partition case. Not a bad idea. I think I've seen it elsewhere in your code

@mfontanini
Copy link
Owner

Pushed some simplification, hopefully I didn't mess anything up as I edited it through the gh editor. I'll merge once build succeeds.

@mfontanini mfontanini merged commit 46c396f into mfontanini:master May 23, 2018
@accelerated accelerated deleted the pause_resume branch May 23, 2018 20:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants