@@ -425,8 +425,7 @@ BufferedProducer<BufferType>::BufferedProducer(Configuration config)
425
425
426
426
template <typename BufferType>
427
427
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
428
- add_tracker (const_cast <MessageBuilder&>(builder));
429
- do_add_message (builder, MessagePriority::Low, true );
428
+ add_message (Builder (builder)); // make ConcreteBuilder
430
429
}
431
430
432
431
template <typename BufferType>
@@ -437,19 +436,26 @@ void BufferedProducer<BufferType>::add_message(Builder builder) {
437
436
438
437
template <typename BufferType>
439
438
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
440
- add_tracker (const_cast <MessageBuilder&>(builder));
441
- async_produce (builder, true );
439
+ if (has_internal_data_) {
440
+ MessageBuilder builder_copy (builder.piecewise_construct ());
441
+ add_tracker (builder_copy);
442
+ async_produce (builder_copy, true );
443
+ }
444
+ else {
445
+ async_produce (builder, true );
446
+ }
442
447
}
443
448
444
449
template <typename BufferType>
445
450
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
446
- TrackerPtr tracker = add_tracker (const_cast <MessageBuilder&>(builder));
447
- if (tracker) {
451
+ if (has_internal_data_) {
452
+ MessageBuilder builder_copy (builder.piecewise_construct ());
453
+ TrackerPtr tracker = add_tracker (builder_copy);
448
454
// produce until we succeed or we reach max retry limit
449
455
std::future<bool > should_retry;
450
456
do {
451
457
should_retry = tracker->get_new_future ();
452
- produce_message (builder );
458
+ produce_message (builder_copy );
453
459
wait_for_acks ();
454
460
}
455
461
while (should_retry.get ());
@@ -575,6 +581,9 @@ size_t BufferedProducer<BufferType>::get_flushes_in_progress() const {
575
581
576
582
template <typename BufferType>
577
583
void BufferedProducer<BufferType>::set_max_number_retries(size_t max_number_retries) {
584
+ if (!has_internal_data_ && (max_number_retries > 0 )) {
585
+ has_internal_data_ = true ; // enable once
586
+ }
578
587
max_number_retries_ = max_number_retries;
579
588
}
580
589
@@ -637,12 +646,12 @@ void BufferedProducer<BufferType>::async_produce(BuilderType&& builder, bool thr
637
646
if (test_params && test_params->force_produce_error_ ) {
638
647
throw HandleException (Error (RD_KAFKA_RESP_ERR_UNKNOWN));
639
648
}
640
- produce_message (std::forward<BuilderType>( builder) );
649
+ produce_message (builder);
641
650
}
642
651
catch (const HandleException& ex) {
643
652
// If we have a flush failure callback and it returns true, we retry producing this message later
644
653
CallbackInvoker<FlushFailureCallback> callback (" flush failure" , flush_failure_callback_, &producer_);
645
- if (!callback || callback (std::forward<BuilderType>( builder) , ex.get_error ())) {
654
+ if (!callback || callback (builder, ex.get_error ())) {
646
655
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal ());
647
656
if (tracker && tracker->num_retries_ > 0 ) {
648
657
--tracker->num_retries_ ;
@@ -670,7 +679,7 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
670
679
// Get tracker data
671
680
TestParameters* test_params = get_test_parameters ();
672
681
TrackerPtr tracker = has_internal_data_ ?
673
- std::static_pointer_cast<Tracker>(MessageInternal::load (const_cast <Message&>(message))->internal_ ) : nullptr ;
682
+ std::static_pointer_cast<Tracker>(MessageInternal::load (const_cast <Message&>(message))->get_internal () ) : nullptr ;
674
683
bool should_retry = false ;
675
684
if (message.get_error () || (test_params && test_params->force_delivery_error_ )) {
676
685
// We should produce this message again if we don't have a produce failure callback
0 commit comments