Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer retry #78

Merged
merged 6 commits into from
Jun 12, 2018
Merged

Producer retry #78

merged 6 commits into from
Jun 12, 2018

Conversation

accelerated
Copy link
Contributor

@accelerated accelerated commented Jun 2, 2018

Description

This PR can be decomposed logically into two segments:

  • Retry logic in the BufferedProducer
  • A generic mechanism to pass internal/private/hidden cppkafka data inside the message user_data.

Scope of changes

  • Allow the tracking of produced messages so that retry logic can be enforced inside the BufferedProducer class. Since the mechanism is generic, other classes may provide their own private data and use it internally.
  • Locality of changes is limited to BufferedProducer, Producer, MessageBuilder and Message class. Most changes are the actual retry logic inside BufferedProducer. The other classes have minimal changes.
  • Dependency injection of test parameters was added to the BufferedProducer class to allow testing of various scenarios. Currently this was not possible.
  • Added BufferedProducer::sync_produce and provided the user with a way to set the max number of message retries. In any application, retrying is an important feature, therefore this PR addresses that at a generic level, not having to be implemented by users, every time.
  • Added retry logic for BufferedProducer::produce which is asynchronous, by refactoring common logic into async_produce function. The original Buffered::produce functions still throw on error after all the retries have been exhausted, as such the behavior is unchanged.
  • Added counter for number of messages dropped in BufferedProducer class.

Private data ownership and tracking

  • The code interposes a small structure named MessageInternal between the user_data pointer and the opaque pointer provided by rdkafka when producing messages. This structure has some additional Internal metadata (different ones can be provided for different purposes as stated above -- in this case we use a Tracker object) and is entirely managed by unique_ptr ownership.
  • On the produce side, the only two overloaded entry points [produce(MessageBuilder) and produce(Message)] create each time a new unique_ptr<MessageInternal> structure before passing it to rdkafka. Should the produce method throw, the objects are deleted, otherwise the pointers are released.
  • On the dr_callback side, this data is loaded inside a unique_ptr<MessageInternal> at the beginning of the dr_callback_proxy and is deleted on exit.
  • Based on the above design, there is no point where the memory leaks.
  • The Tracker objects are managed via shared_ptr and their lifetime depends on all owners (i.e. MessageBuilder and Message instances).
  • Care has been taken to hide all manipulation of the MessageInternal object. As such, it is impossible for any user to cause an error or to misuse the API.

User data

  • All the original methods of setting/getting user data which are part of the MessageBuilder, as well as the user data getter in Message class behave exactly as before. Users can pass and retrieve their own pointers transparently.

Consumer

  • On the consumer side, the Message object remains unchanged.

Testing

  • Test cases have been added to test the sync and async producer as well as the retry mechanism in case of failure. Also various produce tests including re-producing a MessageBuilder and a Message object multiple times.
  • Test cases have been added to pass and retrieve application specific opaque user data.

@accelerated accelerated force-pushed the producer_retry branch 6 times, most recently from 48b3a8e to 9b88b4e Compare June 4, 2018 11:57
@mfontanini
Copy link
Owner

Just a couple of comments, I'll look at this better later.

Based on the above design, there is no point where the memory leaks.

This is not correct as I pointed out on the previous thread. If there's no delivery report callback set then there's going to be a memory leak.

Moreover, you seemed concerned about performance on other PRs you made but now you're forcing at least one memory allocation for the MessageInternal on each Producer::produce call regardless of whether the user is using BufferedProducer, meaning they'll be paying for this but not necessarily (and likely not) using it. I don't think this whole mechanism should be part of Message itself given it's not really a part of a Message at all. Message already has a way of introducing "additional" information via the rdkafka user data pointer. What prevents you to have this entire logic inside BufferedProducer instead of polluting Message?

@accelerated
Copy link
Contributor Author

accelerated commented Jun 4, 2018

Hi @mfontanini, yes you're right about the dr_callback, my bad. I was too focused on the BufferedProducer which always registers a dr_callback. I just pushed a fix. Ran valgrind also and it reports no leaks on all tests.

What prevents you to have this entire logic inside BufferedProducer instead of polluting Message

The idea here is that if the BufferedProducer makes use of the native opaque rdkafka pointer, the applications using BufferedProducer can no longer pass anything. That's very restrictive (in my own applicatoin for instance I pass a whole context object) and to implement some correlator for retry logic inside that would be very ugly. The point of introducing a small wrapper of opaque data is to still leave 100% user transparency at the application level. Also if I did as you suggest, won't that break existing applications using the BufferedProducer and passing-in their own data pointers? You have to have some data inside Message, in this case it's the Tracker object, because otherwise there is no way of communicating from the main thread with the polling thread and pass/receive message id information. Right now it's a simple base class Internal which can be derived from by many producers and it can also be enhanced to contain more data, the mechanism is there so it opens up possibilities/flexibility for the future.

Moreover, you seemed concerned about performance on other PRs you made but now you're forcing at least one memory allocation for the MessageInternal on each Producer::produce call regardless of whether the user is using BufferedProducer, meaning they'll be paying for this but not necessarily (and likely not) using it

Good point and this is also a very easy fix. The way I think of fixing this is to have a setter in the Producer class which enables/disables MessageInternal insertion maybe called enable_internal_data(bool). By default this will be turned off so no allocations. Now inside BufferedProducer, when someone sets the set_max_number_retries() > 0, then I can turn that feature on. In this case, default BufferedProducers will not have any allocations. I can also set the default max number of retries to 0, right now it's set at 5. If you have other suggestions here i'd be happy to implement them. But I think overall the retry functionality is a much needed feature. LMK.

@mfontanini
Copy link
Owner

The idea here is that if the BufferedProducer makes use of the native opaque rdkafka pointer, the applications using BufferedProducer can no longer pass anything

Aren't you doing that already? You're overwriting the pointer that gets passed to rd_kafka_producev on with the RD_KAFKA_V_OPAQUE macro so you're basically already removing whatever pointer the user tried setting.

Also, there's something I'm probably missing here. So as I see it the flow goes like:

  • The buffered producer sets a Tracker pointer inside the message when you call any of the produce or add_message overloads.
  • Whenever the message is actually produced, the Producer will use the user_data pointer as the rdkafka opaque pointer.
  • On the delivery report proxy, you call MessageInternal::load which reads the user_data pointer in Message and then re-sets user_data_ and internal_ on the message. Here's where I'm confused: this Message object was constructed in the proxy by calling Message::make_non_owning which simply sets the handle_ member inside Message. The user_data_ and internal_ members are not set at all in that code flow. So after that, when you call MessageInternal::load which reads from the Message's user_data_ pointer... What is it reading? Isn't that a null pointer at that point? MessageInternal::load basically tries to read from a null pointer, but since it's null, it does nothing so Message::get_user_data will return null after that. I have to be missing something here or otherwise this wouldn't work at all and I imagine you ran it and it did work for you so do you mind explaining what's going on?

@accelerated
Copy link
Contributor Author

accelerated commented Jun 4, 2018

I'd be happy to explain:

Aren't you doing that already? You're overwriting the pointer that gets passed to rd_kafka_producev on with the RD_KAFKA_V_OPAQUE macro so you're basically already removing whatever pointer the user tried setting.

No. The original user data is placed inside the MessageInternal::user_data_ member on the produce side and is swapped out again inside the dr_callback, assuming dr_callback is registered.

Inside the produce functions, the original user data is just placed inside the MessageInternal structure.

void* opaque = message.get_user_data();
    unique_ptr<MessageInternal> internal_data;
    if (get_configuration().get_delivery_report_callback()) {
// <<<< original user data transfered here
        internal_data.reset(new MessageInternal(message.get_user_data(), message.internal()));
//<<<< 
        opaque = internal_data.get();
    }

On the dr_callback side, inside Message::Message the rdkafka::_private is loaded inside Message::user_data_ (may or may not contain MessageInternal struct).

Message::Message(HandlePtr handle)
: handle_(move(handle)),
  payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()),
  key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()),
  user_data_(handle_ ? handle_->_private : nullptr) { //<<<<< loaded here
}

Then in the dr_callback_proxy, we simply unpack the MessageInternal from Message::user_data_ to
Message::user_data_ and Message::internal_

void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
    Producer* handle = static_cast<Producer*>(opaque);
    Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
//<<< we unpack here 
    unique_ptr<MessageInternal> internal_data(MessageInternal::load(message));
//<<<
    CallbackInvoker<Configuration::DeliveryReportCallback>
        ("delivery report", handle->get_configuration().get_delivery_report_callback(), handle)
        (*handle, message);
}

and all the MessageInternal::load does is unpack the Message::user_data_ (currently containing a MessageInernal structure) back to it's individual parts.

static std::unique_ptr<MessageInternal> load(Message& message) {
        if (message.get_user_data()) {
            //<<<  Unpack internal data
            std::unique_ptr<MessageInternal> internal_data(static_cast<MessageInternal*>(message.get_user_data()));
            message.load_internal(internal_data->user_data_, internal_data->internal_);
            return internal_data;
        }
        return nullptr;
    }

After MessageInternal::load() the Message::user_data_ contains the original user data pointer.

So the whole logic is simply swap in and out of the original user_data pointer. In the case where dr_callback is not registered OR it's a Message instance on the Consumer side, then Message::user_data_ contains the original rdkafka::_private (because MessageInternal::load is never called) which is what the original code has. That's all there is to it. Also the MessageInternal::load and the Message::load are protected and can't be tampered by the user accidentally.

PS: It's nice to have test cases :)

@mfontanini
Copy link
Owner

I see, I missed where user_data_ was being set from the message. Still why can't you implement this same behavior on BufferedProducer instead? You can still wrap the original user data in your own thing and replace the pointer before writing (this is currently being done in Producer) and unwrap it on the delivery report callback in BufferedProducer? As I see it, you're polluting Message with something completely unrelated to that class: this entire thing just wraps the message's user data on another pointer.

PS: It's nice to have test cases :)
Yep, hence why they exist in the project and I asked you to implement them on other PRs, even though they sporadically fail but hopefully I'll fix that when I have some time to do so.

@accelerated
Copy link
Contributor Author

You can still wrap the original user data in your own thing and replace the pointer before writing (this is currently being done in Producer) and unwrap it on the delivery report callback in BufferedProducer?

I thought about this but unfortunately it's not possible. The Message class does not have a setter for user_data like MessageBuilder and I need access to the lower level produce function for that. Imagine the BufferedProducer::produce(Message) function...how can I inject in this function the MessageInternal object? In the MessageBuilder class I can get user data and set user data at will, but not here.

Also, say I inject it in BufferedProducer::produce(MessageBuilder), then in BufferedProducer::on_delivery_callback I receive the Message with my hidden MessageInternal object. Ok so far so good. I now must call the user supplied failure or success callbacks with the same Message object but now it must contain their own user_data. So how do I swap out the MessageInternal data? Impossible. The only way is to swap it inside the free function dr_callback_proxy like before, but then how do I now determine if the message was produced by a BufferedProducer or a reguler Producer. Impossible. It's a Catch22 situation. Hence the only way is to pack/unpack the data as close as possible to rdkafka i.e. the rd_kafka_producev and dr_callback_proxy methods.

I don't see it as polluting the Message class, it's delivering additional out of bound data to any end application. The Message class is the endpoint of any produced message so I think it's fine if it contains an additional shared_ptr. In the future this may be helpful for all sort of other stuff. It will give more flexibility to the class.

@accelerated
Copy link
Contributor Author

Ok, I just remembered the other problem. Say for instance that we somehow add a setter in the Message object for user data. Perhaps I modify the private Message::load function, or add a new setter, whatever. Now inside BufferedProducer::on_delivery_callback I receive the Message, I swap out the MessageInternal object and I call the failure delivery function provided by the user. The user has now access to his original user data inside Message so all is good.
But what happens now if he decides to call BufferedProducer::produce(Message) again with this same message I just passed to him? My MessageInternal data is lost! I have no more context. I already swapped it out and there's no way to correlate back. So the retry functionality would be broken. This is why the Message object must contain one ownership of the shared_ptr, otherwise each time you produce you lose the data. Makes sense?

@accelerated
Copy link
Contributor Author

Ok, you can check my last checkin. I added logic to only conditionally create the MessageInternal data structure and I also cleaned-up the producer code a bit. The way it works is that if the producer "sees" that there is an internal data set on a Message or MessageBuilder object, it enables itself for internal data. This is done only once per producer instance.

@mfontanini
Copy link
Owner

Alright, your first comment actually doesn't provide a good reason as you can always allow the buffered producer to overwrite this stuff. However, your last comment raises a good point and I see the issue now.

@accelerated
Copy link
Contributor Author

Ok i think i found a way to keep changes local to BufferedProducer, MessageBuilder and Message class only IF you're ok with keeping the shared ptr inside the Message class. Without that it's impossible. That means that the Producer and free function dr_callback_proxy will remain unchanged. Lmk pls. Message and MessageBuilder changes will be almost identical to now and just a few lines of code.

@mfontanini
Copy link
Owner

That sounds better, I'd like to have a look :)

@accelerated
Copy link
Contributor Author

Ok done! Lmk what you think. This was a bit trickier to implement...

@mfontanini
Copy link
Owner

I'll check this one tomorrow, sorry for the delay!

public:
static std::unique_ptr<MessageInternal> load(const Producer& producer, Message& message);
private:
struct MessageInternal {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this a class with just getters for user_data_ and internal_? I know this is only used internally but anyway, it's good to keep this encapsulated, especially given these members aren't modified after construction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense

try {
TestParameters* test_params = get_test_parameters();
if (test_params && test_params->force_produce_error_) {
throw HandleException(Error(RD_KAFKA_RESP_ERR_UNKNOWN));
}
produce_message(std::forward<MessageType>(message));
produce_message(std::forward<BuilderType>(builder));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't use forward here. Otherwise unless I go into produce_message to check what the signature is, this makes me think you may be move constructing the parameter (e.g. produce_message taking something by value), which would make the call to callback below bogus. I don't think you're really gaining anything by using forward here besides maybe confuse who's reading the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, good point

}
catch (const HandleException& ex) {
// If we have a flush failure callback and it returns true, we retry producing this message later
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
if (!callback || callback(std::forward<MessageType>(message), ex.get_error())) {
TrackerPtr tracker = std::static_pointer_cast<Tracker>(message.internal());
if (!callback || callback(std::forward<BuilderType>(builder), ex.get_error())) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, the callback takes a const MessageBuilder& so this forward doesn't do anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


class Message;

struct Internal {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use class here? Even if they're equivalent, this is more of a class than a struct.


template <typename BufferType>
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
TrackerPtr tracker = add_tracker(const_cast<MessageBuilder&>(builder));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a general issue now with these member functions taking const refs and using const_cast here. What happens if you want to produce the same MessageBuilder twice? The second time, add_tracker will return nullptr so you won't be using it. I'm not sure if this would break something but I think all functions that call add_tracker should probably remove the tracker before returning (using some RAII wrapper probably). That way the MessageBuilders will be clean afterwards.

The way this works, you can't really call any produce/add_message overloads using the same builder in 2 threads at the same time because they'll be fighting to set the tracker, which goes against what I'd think given the functions take const refs.

As I see it, MessageBuilder::internal is only used in BufferedProducer (also via MessageInternalGuard but that one's only used in that class as well). Can't you just wrap these builders in a thin template struct that contains the original builder (probably allowing to pass along reference types so you can avoid copies) along with the tracker which will be used internally? That way you won't modify the builders at all and you'll keep the same behavior.

Copy link
Contributor Author

@accelerated accelerated Jun 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tricky scenario with a single MessageBuilder produced multiple times. You're absolutely right that right now two threads will compete for the internal structure, but consider this: imagine you build a MessageBuilder from a Message object. The Message::internal gets transposed inside the MessageBuilder::internal, so now if you want to produce the same MessageBuilder instance multiple times you will be using the same Internal data whatever that is (Tracker in this case or other stuff). Then this is no different from producing multiple MessageBuilders (with initial empty Internal data) and eventually the first thread gets to set the Internal data which will be used again and again. So what should be the real behavior here? Also if you produce the same Message multiple times, the behavior is identical, you only use the same Internal data instance each time. Maybe the behavior should be:

  • If I have a MessgeBuilder with empty internal data, then add_tracker should behave like RAII.
  • If MessageBuilder already has some internal data then add_tracker should be a no-op.
  • Also the MessageBuilder should be synchronized via a mutex (inside a shared_ptr so the class stays copyable) so that when two threads produce the same MessageBuilder, one thread will block until the other one finishes. But this will introduce a mutex inside the MessageBuilder....And then the shared_ptr has to be atomic and atomics are not copyable...argh...

Or

  • Wrap the MessageBuilder inside a std::pair<BuilderType, Tracker> and convert the internal queue into a queue of these pairs? So that when I actually call produce, if BuilderType::internal() is set, I use that one, otherwise I use pair::second? This way MessageBuilder stays completly untouched until produce_message is called but that means changing some of the internal APIs to accept this pair structure instead of the current BuilderType.

@accelerated
Copy link
Contributor Author

accelerated commented Jun 10, 2018

Actually i think there's even a simpler solution...all you need to do is work with a copy of MessageBuilder when you call produce () and sync_produce(). The copies are very cheap as this is a viewer class. And then you can easily modify the internal ptr at will. Method add_message (MessageBuilder) converts to a Builder anyway so you're safe there.

template <typename BufferType>
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
    add_message(Builder(builder)); //<--- HERE 
}

template <typename BufferType>
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
    if (has_internal_data_) {
        add_tracker(MessageBuilder(builder)); //<--- HERE
        async_produce(builder, true);
    }
    else {
        async_produce(builder, true);
    }
    
}

template <typename BufferType>
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
    if (has_internal_data_) {
        TrackerPtr tracker = add_tracker(MessageBuilder(builder)); //<--- HERE
        // produce until we succeed or we reach max retry limit
        std::future<bool> should_retry;
        do {
            should_retry = tracker->get_new_future();
            produce_message(builder);
            wait_for_acks();
        }
        while (should_retry.get());
    }
    else {
        // produce once
        produce_message(builder);
        wait_for_acks();
    }
}

But now I realize the MessageBuilder is non-copyable... :( because of the Buffer class. So unless you allow copies of MessageBuilder, everything else (even the RAII wrapper you suggested) will require some sort of locking of internal or user_data access at the MessageBuilder level since that object requires access by multiple threads.

@accelerated
Copy link
Contributor Author

Ok I made a change...pls take a look.

@mfontanini
Copy link
Owner

I think this looks good! I'll have a better look at it tonight but I think it's ready to be merged.

@accelerated
Copy link
Contributor Author

I modified a test case to produce twice the same MessageBuilder intance. Btw, is there a way to PM you separately?

@mfontanini mfontanini merged commit 0c7a3b0 into mfontanini:master Jun 12, 2018
@mfontanini
Copy link
Owner

Thanks for this one, it's a very useful addition!

@accelerated
Copy link
Contributor Author

Thanks for approving :), it was an interesting PR, same as the round robin one! Quite challenging in some ways.

@accelerated accelerated deleted the producer_retry branch June 12, 2018 13:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants