Skip to content

Commit 2381065

Browse files
author
accelerated
committed
Removed dependency on Producer and dr_callback_proxy
1 parent f746653 commit 2381065

File tree

8 files changed

+72
-80
lines changed

8 files changed

+72
-80
lines changed

Diff for: include/cppkafka/message.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ class CPPKAFKA_API Message {
180180

181181
Message(rd_kafka_message_t* handle, NonOwningTag);
182182
Message(HandlePtr handle);
183-
void load_internal(void* user_data, InternalPtr internal);
183+
Message& load_internal();
184184

185185
HandlePtr handle_;
186186
Buffer payload_;

Diff for: include/cppkafka/message_internal.h

+27-7
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,10 @@
3131
#define CPPKAFKA_MESSAGE_INTERNAL_H
3232

3333
#include <memory>
34-
#include "message.h"
3534

3635
namespace cppkafka {
3736

38-
class Producer;
37+
class Message;
3938

4039
struct Internal {
4140
virtual ~Internal() = default;
@@ -45,16 +44,37 @@ using InternalPtr = std::shared_ptr<Internal>;
4544
/**
4645
* \brief Private message data structure
4746
*/
48-
class MessageInternal {
49-
friend Producer;
50-
public:
51-
static std::unique_ptr<MessageInternal> load(const Producer& producer, Message& message);
52-
private:
47+
struct MessageInternal {
5348
MessageInternal(void* user_data, std::shared_ptr<Internal> internal);
49+
static std::unique_ptr<MessageInternal> load(Message& message);
5450
void* user_data_;
5551
InternalPtr internal_;
5652
};
5753

54+
template <typename BuilderType>
55+
struct MessageInternalGuard {
56+
MessageInternalGuard(BuilderType& builder)
57+
: builder_(builder),
58+
user_data_(builder.user_data()) {
59+
if (builder_.internal()) {
60+
// Swap contents with user_data
61+
ptr_.reset(new MessageInternal(user_data_, builder_.internal()));
62+
builder_.user_data(ptr_.get()); //overwrite user data
63+
}
64+
}
65+
~MessageInternalGuard() {
66+
//Restore user data
67+
builder_.user_data(user_data_);
68+
}
69+
void release() {
70+
ptr_.release();
71+
}
72+
private:
73+
BuilderType& builder_;
74+
std::unique_ptr<MessageInternal> ptr_;
75+
void* user_data_;
76+
};
77+
5878
}
5979

6080
#endif //CPPKAFKA_MESSAGE_INTERNAL_H

Diff for: include/cppkafka/producer.h

-7
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,12 @@
3131
#define CPPKAFKA_PRODUCER_H
3232

3333
#include <memory>
34-
#include <tuple>
3534
#include "kafka_handle_base.h"
3635
#include "configuration.h"
3736
#include "buffer.h"
3837
#include "topic.h"
3938
#include "macros.h"
4039
#include "message_builder.h"
41-
#include "message_internal.h"
4240

4341
namespace cppkafka {
4442

@@ -80,7 +78,6 @@ class Message;
8078
*/
8179
class CPPKAFKA_API Producer : public KafkaHandleBase {
8280
public:
83-
friend MessageInternal;
8481
/**
8582
* The policy to use for the payload. The default policy is COPY_PAYLOAD
8683
*/
@@ -159,11 +156,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase {
159156
*/
160157
void flush(std::chrono::milliseconds timeout);
161158
private:
162-
using LoadResult = std::tuple<void*, std::unique_ptr<MessageInternal>>;
163-
LoadResult load_internal(void* user_data, InternalPtr internal);
164-
165159
PayloadPolicy message_payload_policy_;
166-
bool has_internal_data_;
167160
};
168161

169162
} // cppkafka

Diff for: include/cppkafka/utils/buffered_producer.h

+22-27
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class CPPKAFKA_API BufferedProducer {
104104
/**
105105
* Callback to indicate a message failed to be flushed
106106
*/
107-
using FlushFailureCallback = std::function<bool(const Builder&, Error error)>;
107+
using FlushFailureCallback = std::function<bool(const MessageBuilder&, Error error)>;
108108

109109
/**
110110
* \brief Constructs a buffered producer using the provided configuration
@@ -369,24 +369,22 @@ class CPPKAFKA_API BufferedProducer {
369369
if (!has_internal_data_ && (max_number_retries_ > 0)) {
370370
has_internal_data_ = true; //enable once
371371
}
372-
if (has_internal_data_) {
373-
// Add message tracker
372+
if (has_internal_data_ && !builder.internal()) {
373+
// Add message tracker only if it hasn't been added before
374374
TrackerPtr tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
375375
builder.internal(tracker);
376376
return tracker;
377377
}
378378
return nullptr;
379379
}
380-
381380
template <typename BuilderType>
382381
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
383-
void do_add_message(const Message& message, MessagePriority priority, bool do_flush);
384-
template <typename MessageType>
385-
void produce_message(MessageType&& message);
382+
template <typename BuilderType>
383+
void produce_message(BuilderType&& builder);
386384
Configuration prepare_configuration(Configuration config);
387385
void on_delivery_report(const Message& message);
388-
template <typename MessageType>
389-
void async_produce(MessageType&& message, bool throw_on_error);
386+
template <typename BuilderType>
387+
void async_produce(BuilderType&& message, bool throw_on_error);
390388

391389
// Members
392390
Producer producer_;
@@ -466,7 +464,7 @@ void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
466464

467465
template <typename BufferType>
468466
void BufferedProducer<BufferType>::produce(const Message& message) {
469-
async_produce(message, true);
467+
async_produce(MessageBuilder(message), true);
470468
}
471469

472470
template <typename BufferType>
@@ -546,13 +544,6 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
546544
}
547545
}
548546

549-
template <typename BufferType>
550-
void BufferedProducer<BufferType>::do_add_message(const Message& message,
551-
MessagePriority priority,
552-
bool do_flush) {
553-
do_add_messsage(MessageBuilder(message), priority, do_flush);
554-
}
555-
556547
template <typename BufferType>
557548
Producer& BufferedProducer<BufferType>::get_producer() {
558549
return producer_;
@@ -615,11 +606,14 @@ void BufferedProducer<BufferType>::set_flush_failure_callback(FlushFailureCallba
615606
}
616607

617608
template <typename BufferType>
618-
template <typename MessageType>
619-
void BufferedProducer<BufferType>::produce_message(MessageType&& message) {
609+
template <typename BuilderType>
610+
void BufferedProducer<BufferType>::produce_message(BuilderType&& builder) {
611+
using builder_type = typename std::decay<BuilderType>::type;
620612
while (true) {
621613
try {
622-
producer_.produce(std::forward<MessageType>(message));
614+
MessageInternalGuard<builder_type> internal_guard(const_cast<builder_type&>(builder));
615+
producer_.produce(builder);
616+
internal_guard.release();
623617
// Sent successfully
624618
++pending_acks_;
625619
break;
@@ -637,23 +631,23 @@ void BufferedProducer<BufferType>::produce_message(MessageType&& message) {
637631
}
638632

639633
template <typename BufferType>
640-
template <typename MessageType>
641-
void BufferedProducer<BufferType>::async_produce(MessageType&& message, bool throw_on_error) {
634+
template <typename BuilderType>
635+
void BufferedProducer<BufferType>::async_produce(BuilderType&& builder, bool throw_on_error) {
642636
try {
643637
TestParameters* test_params = get_test_parameters();
644638
if (test_params && test_params->force_produce_error_) {
645639
throw HandleException(Error(RD_KAFKA_RESP_ERR_UNKNOWN));
646640
}
647-
produce_message(std::forward<MessageType>(message));
641+
produce_message(std::forward<BuilderType>(builder));
648642
}
649643
catch (const HandleException& ex) {
650644
// If we have a flush failure callback and it returns true, we retry producing this message later
651645
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
652-
if (!callback || callback(std::forward<MessageType>(message), ex.get_error())) {
653-
TrackerPtr tracker = std::static_pointer_cast<Tracker>(message.internal());
646+
if (!callback || callback(std::forward<BuilderType>(builder), ex.get_error())) {
647+
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
654648
if (tracker && tracker->num_retries_ > 0) {
655649
--tracker->num_retries_;
656-
do_add_message(std::forward<MessageType>(message), MessagePriority::High, false);
650+
do_add_message(std::forward<BuilderType>(builder), MessagePriority::High, false);
657651
return;
658652
}
659653
}
@@ -676,7 +670,8 @@ template <typename BufferType>
676670
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
677671
//Get tracker data
678672
TestParameters* test_params = get_test_parameters();
679-
TrackerPtr tracker = std::static_pointer_cast<Tracker>(message.internal());
673+
TrackerPtr tracker = has_internal_data_ ?
674+
std::static_pointer_cast<Tracker>(MessageInternal::load(const_cast<Message&>(message))->internal_) : nullptr;
680675
bool should_retry = false;
681676
if (message.get_error() || (test_params && test_params->force_delivery_error_)) {
682677
// We should produce this message again if we don't have a produce failure callback

Diff for: src/configuration.cpp

+2-5
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
#include <vector>
3232
#include <librdkafka/rdkafka.h>
3333
#include "exceptions.h"
34-
#include "message_internal.h"
34+
#include "message.h"
3535
#include "producer.h"
3636
#include "consumer.h"
3737

@@ -40,10 +40,8 @@ using std::map;
4040
using std::move;
4141
using std::vector;
4242
using std::initializer_list;
43-
using std::unique_ptr;
44-
using boost::optional;
45-
4643
using std::chrono::milliseconds;
44+
using boost::optional;
4745

4846
namespace cppkafka {
4947

@@ -52,7 +50,6 @@ namespace cppkafka {
5250
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
5351
Producer* handle = static_cast<Producer*>(opaque);
5452
Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
55-
unique_ptr<MessageInternal> internal_data(MessageInternal::load(*handle, message));
5653
CallbackInvoker<Configuration::DeliveryReportCallback>
5754
("delivery report", handle->get_configuration().get_delivery_report_callback(), handle)
5855
(*handle, message);

Diff for: src/message.cpp

+8-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
*/
2929

3030
#include "message.h"
31+
#include "message_internal.h"
3132

3233
using std::chrono::milliseconds;
3334

@@ -64,9 +65,13 @@ Message::Message(HandlePtr handle)
6465
user_data_(handle_ ? handle_->_private : nullptr) {
6566
}
6667

67-
void Message::load_internal(void* user_data, InternalPtr internal) {
68-
user_data_ = user_data;
69-
internal_ = internal;
68+
Message& Message::load_internal() {
69+
if (user_data_) {
70+
MessageInternal* mi = static_cast<MessageInternal*>(user_data_);
71+
user_data_ = mi->user_data_;
72+
internal_ = mi->internal_;
73+
}
74+
return *this;
7075
}
7176

7277
// MessageTimestamp

Diff for: src/message_internal.cpp

+9-10
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,22 @@
2727
*
2828
*/
2929
#include "message_internal.h"
30-
#include "producer.h"
30+
#include "message.h"
31+
#include "message_builder.h"
3132

3233
namespace cppkafka {
3334

34-
MessageInternal::MessageInternal(void* user_data, std::shared_ptr<Internal> internal)
35+
// MessageInternal
36+
37+
MessageInternal::MessageInternal(void* user_data,
38+
std::shared_ptr<Internal> internal)
3539
: user_data_(user_data),
3640
internal_(internal) {
3741
}
3842

39-
std::unique_ptr<MessageInternal> MessageInternal::load(const Producer& producer, Message& message) {
40-
if (producer.has_internal_data_ && message.get_user_data()) {
41-
// Unpack internal data
42-
std::unique_ptr<MessageInternal> internal_data(static_cast<MessageInternal*>(message.get_user_data()));
43-
message.load_internal(internal_data->user_data_, internal_data->internal_);
44-
return internal_data;
45-
}
46-
return nullptr;
43+
std::unique_ptr<MessageInternal> MessageInternal::load(Message& message) {
44+
return std::unique_ptr<MessageInternal>(message.load_internal().get_handle() ?
45+
static_cast<MessageInternal*>(message.get_handle()->_private) : nullptr);
4746
}
4847

4948
}

Diff for: src/producer.cpp

+3-20
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ using std::get;
4242
namespace cppkafka {
4343

4444
Producer::Producer(Configuration config)
45-
: KafkaHandleBase(move(config)), message_payload_policy_(PayloadPolicy::COPY_PAYLOAD),
46-
has_internal_data_(false) {
45+
: KafkaHandleBase(move(config)), message_payload_policy_(PayloadPolicy::COPY_PAYLOAD) {
4746
char error_buffer[512];
4847
auto config_handle = get_configuration().get_handle();
4948
rd_kafka_conf_set_opaque(config_handle, this);
@@ -69,37 +68,33 @@ void Producer::produce(const MessageBuilder& builder) {
6968
const Buffer& payload = builder.payload();
7069
const Buffer& key = builder.key();
7170
const int policy = static_cast<int>(message_payload_policy_);
72-
LoadResult load_result = load_internal(builder.user_data(), builder.internal());
7371
auto result = rd_kafka_producev(get_handle(),
7472
RD_KAFKA_V_TOPIC(builder.topic().data()),
7573
RD_KAFKA_V_PARTITION(builder.partition()),
7674
RD_KAFKA_V_MSGFLAGS(policy),
7775
RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()),
7876
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
7977
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
80-
RD_KAFKA_V_OPAQUE(get<0>(load_result)),
78+
RD_KAFKA_V_OPAQUE(builder.user_data()),
8179
RD_KAFKA_V_END);
8280
check_error(result);
83-
get<1>(load_result).release(); //data has been passed-on to rdkafka so we release ownership
8481
}
8582

8683
void Producer::produce(const Message& message) {
8784
const Buffer& payload = message.get_payload();
8885
const Buffer& key = message.get_key();
8986
const int policy = static_cast<int>(message_payload_policy_);
9087
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
91-
LoadResult load_result = load_internal(message.get_user_data(), message.internal());
9288
auto result = rd_kafka_producev(get_handle(),
9389
RD_KAFKA_V_TOPIC(message.get_topic().data()),
9490
RD_KAFKA_V_PARTITION(message.get_partition()),
9591
RD_KAFKA_V_MSGFLAGS(policy),
9692
RD_KAFKA_V_TIMESTAMP(duration),
9793
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
9894
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
99-
RD_KAFKA_V_OPAQUE(get<0>(load_result)),
95+
RD_KAFKA_V_OPAQUE(message.get_user_data()),
10096
RD_KAFKA_V_END);
10197
check_error(result);
102-
get<1>(load_result).release(); //data has been passed-on to rdkafka so we release ownership
10398
}
10499

105100
int Producer::poll() {
@@ -119,16 +114,4 @@ void Producer::flush(milliseconds timeout) {
119114
check_error(result);
120115
}
121116

122-
Producer::LoadResult Producer::load_internal(void* user_data, InternalPtr internal) {
123-
unique_ptr<MessageInternal> internal_data;
124-
if (!has_internal_data_ && internal) {
125-
has_internal_data_ = true; //enable once for this producer
126-
}
127-
if (has_internal_data_ && get_configuration().get_delivery_report_callback()) {
128-
internal_data.reset(new MessageInternal(user_data, internal));
129-
user_data = internal_data.get(); //point to the internal data
130-
}
131-
return LoadResult(user_data, move(internal_data));
132-
}
133-
134117
} // cppkafka

0 commit comments

Comments
 (0)