@@ -426,8 +426,7 @@ BufferedProducer<BufferType>::BufferedProducer(Configuration config)
426
426
427
427
template <typename BufferType>
428
428
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
429
- add_tracker (const_cast <MessageBuilder&>(builder));
430
- do_add_message (builder, MessagePriority::Low, true );
429
+ add_message (Builder (builder)); // make ConcreteBuilder
431
430
}
432
431
433
432
template <typename BufferType>
@@ -438,19 +437,26 @@ void BufferedProducer<BufferType>::add_message(Builder builder) {
438
437
439
438
template <typename BufferType>
440
439
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
441
- add_tracker (const_cast <MessageBuilder&>(builder));
442
- async_produce (builder, true );
440
+ if (has_internal_data_) {
441
+ MessageBuilder builder_copy (builder.clone ());
442
+ add_tracker (builder_copy);
443
+ async_produce (builder_copy, true );
444
+ }
445
+ else {
446
+ async_produce (builder, true );
447
+ }
443
448
}
444
449
445
450
template <typename BufferType>
446
451
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
447
- TrackerPtr tracker = add_tracker (const_cast <MessageBuilder&>(builder));
448
- if (tracker) {
452
+ if (has_internal_data_) {
453
+ MessageBuilder builder_copy (builder.clone ());
454
+ TrackerPtr tracker = add_tracker (builder_copy);
449
455
// produce until we succeed or we reach max retry limit
450
456
std::future<bool > should_retry;
451
457
do {
452
458
should_retry = tracker->get_new_future ();
453
- produce_message (builder );
459
+ produce_message (builder_copy );
454
460
wait_for_acks ();
455
461
}
456
462
while (should_retry.get ());
@@ -576,6 +582,9 @@ size_t BufferedProducer<BufferType>::get_flushes_in_progress() const {
576
582
577
583
template <typename BufferType>
578
584
void BufferedProducer<BufferType>::set_max_number_retries(size_t max_number_retries) {
585
+ if (!has_internal_data_ && (max_number_retries > 0 )) {
586
+ has_internal_data_ = true ; // enable once
587
+ }
579
588
max_number_retries_ = max_number_retries;
580
589
}
581
590
@@ -638,12 +647,12 @@ void BufferedProducer<BufferType>::async_produce(BuilderType&& builder, bool thr
638
647
if (test_params && test_params->force_produce_error_ ) {
639
648
throw HandleException (Error (RD_KAFKA_RESP_ERR_UNKNOWN));
640
649
}
641
- produce_message (std::forward<BuilderType>( builder) );
650
+ produce_message (builder);
642
651
}
643
652
catch (const HandleException& ex) {
644
653
// If we have a flush failure callback and it returns true, we retry producing this message later
645
654
CallbackInvoker<FlushFailureCallback> callback (" flush failure" , flush_failure_callback_, &producer_);
646
- if (!callback || callback (std::forward<BuilderType>( builder) , ex.get_error ())) {
655
+ if (!callback || callback (builder, ex.get_error ())) {
647
656
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal ());
648
657
if (tracker && tracker->num_retries_ > 0 ) {
649
658
--tracker->num_retries_ ;
@@ -671,7 +680,7 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
671
680
// Get tracker data
672
681
TestParameters* test_params = get_test_parameters ();
673
682
TrackerPtr tracker = has_internal_data_ ?
674
- std::static_pointer_cast<Tracker>(MessageInternal::load (const_cast <Message&>(message))->internal_ ) : nullptr ;
683
+ std::static_pointer_cast<Tracker>(MessageInternal::load (const_cast <Message&>(message))->get_internal () ) : nullptr ;
675
684
bool should_retry = false ;
676
685
if (message.get_error () || (test_params && test_params->force_delivery_error_ )) {
677
686
// We should produce this message again if we don't have a produce failure callback
0 commit comments