Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added pause/resume for producers #87

Merged
merged 2 commits into from
Jun 25, 2018

Conversation

accelerated
Copy link
Contributor

Added similar helper functions to consumer::pause and consumer::resume.

@accelerated
Copy link
Contributor Author

Not sure if it's worth caching the Topic object/partition list inside the producer so it can be reused when resume is called?

@@ -37,6 +37,7 @@
#include <set>
#include <librdkafka/rdkafka.h>
#include "macros.h"
#include "metadata.h"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just use a forward declaration here, this pulls in a bunch of crap that I tried avoided pulling in other headers (e.g. kafka_handle_base.h)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will do

@mfontanini
Copy link
Owner

Woah I didn't know pausing works for producers... Do you have a use case and do you know what happens when you produce into a paused topic/partition? Does it just get buffered for a while?

I feel like this should have an overload that takes TopicPartitions. I'm not a super fan of get_metadata but maybe I'm biased because I've faced at least 2 bugs in rdkafka related to this. Anyway, I imagine someone may want to not produce into some particular topic/partition although I'm still not sure of what the use case would be for it.

@accelerated
Copy link
Contributor Author

Well apparently the rdkafka documentation for pause/resume does not involve a handle at all and it specifically refers to both production and consumption. For producers you pause when you get throttled. I'm currently implementing some throttling logic on top of cppkafka so i want to be able to pause on both ends. For the producer side, i assume that if you pause, the internal rdkafka queue won't get flushed at all, thus leaving the broker some time to breathe. During this time, all sync/async_produce() get routed to add_message instead which buffers locally in the BufferedProducer. When the throttle comes off, I resume and flush the buffer.
No need to add an overload for TopicPartitions, you already have that function in KafkaHandleBase which takes a list. However I wanted to add an overload in the consumer to take a topic name. One use case is that i want to start some consumers in "paused" state and only start consuming when i'm ready. This is even before partitions get assigned so I would need a way to pause a topic rather than an "assigned partition list".

@accelerated
Copy link
Contributor Author

I also assume that when pausing the consumer while continuing to poll it, only heartbeats go out and no more batch message requests.

@accelerated
Copy link
Contributor Author

Actually the pause(topic), resume(topic) should go in the KafkaHandleBase as the implementation is generic to both. I'll push a change asap.

@accelerated accelerated force-pushed the producer_pause_resume branch from 471be62 to 1e677f5 Compare June 14, 2018 21:30
@accelerated accelerated force-pushed the producer_pause_resume branch from 1e677f5 to 122b9af Compare June 15, 2018 12:41
@accelerated
Copy link
Contributor Author

BTW posted a question for clarification in librdkafka

@mfontanini
Copy link
Owner

Interesting. The author's answer regarding pausing unassigned partitions makes me wonder if this is a valid change. Like he says it may work but they don't guarantee it does. Let's wait for him to answer your last question.

@accelerated
Copy link
Contributor Author

Tested with the PR pause/resume code as indicated by librdkafka owner and it works fine. Essentially getting the metadata of the partitions makes it known locally and then pausing it works as indicated. I could not use it inside the assignment callback because in cppkafka you call assign(handle, partitions) after the user registered assignment callback is raised. Therefore, unless i call assign twice it cannot be done.

@mfontanini mfontanini merged commit 5c72f3f into mfontanini:master Jun 25, 2018
@accelerated
Copy link
Contributor Author

Cheers!

@accelerated accelerated deleted the producer_pause_resume branch June 25, 2018 16:18
@mfontanini
Copy link
Owner

Thanks for this one!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants