-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added queue full notification #149
Conversation
4fd57e2
to
631bb4b
Compare
@@ -772,6 +819,13 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil | |||
if (ex.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) { | |||
// If the output queue is full, then just poll | |||
producer_.poll(); | |||
// Notify application so it can slow-down production | |||
if (queue_full_notify) { | |||
queue_full_notify = queue_full_trigger_ = false; //clear trigger and local state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is always resetting queue_full_notify
, so if you actually use QueueFullNotification::EachOccurence
you'll still get only once per produce_message
call, right? Not sure if the issue is here or if you wanted "each occurrence" to still mean "every time but only once per message"/
Yes that's the idea... |
Right but what if you keep hitting the limit forever for a single message? There's currently no way of actually knowing you get stuck. "Each occurrence" makes me think it's actually every occurrence of a full queue rather than "only one occurrence per message". |
Yes, there's no way of knowing if you're stuck or even which message got stuck. It's better than being silently loopinh though. Another alternative would be to also add one enum for "every single time" which would essentially be for each error multiple times per message, if it so happens. And instead of using the error callback we can have a separate callback just for this, with an argument taking a MessageBuilder reference, so the application can also see what got stuck (topic,partition). This would be better than the generic error callback. Otherwise, if you have other suggestions i'd be glad to implement them. So then: |
For the importance of having this notification, you can read the response in the rdkafka issue |
Hi @mfontanini, i am ready to resume this. Can you please lmk what changes you want? Thanks! |
Hi! So, looking at these changes and your comments, I think:
Cheers! |
Ok that makes sense. Will make changes as you requested. Stay tuned. |
631bb4b
to
97d1bb9
Compare
* | ||
* This will call the error callback for this producer. By default this is set to QueueFullNotification::None. | ||
*/ | ||
void set_queue_full_notification(QueueFullNotification notification); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the setter for the notifiaction enum and the callback should be done in just one? You'll always call either both or none of them so I think that would make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the setter of the notification type? I think there might be cases when the app wants to set the notifocation to some enum and then it might want to turn it off (None) for a period and re-enable it later. In that case it would have to set the callback each time...prob not desirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, fair enough. Merging this one, thanks!
Added queue full notification with various triggering options. This is needed so that the application can be notified when the rdkafka queue is backed up and can be allowed to reduce upstream pressure on the producers.