Skip to content

Commit 1fae0ae

Browse files
author
accelerated
committed
Added logic to conditionally enable internal data
1 parent 4d43d96 commit 1fae0ae

File tree

7 files changed

+85
-35
lines changed

7 files changed

+85
-35
lines changed

Diff for: include/cppkafka/cppkafka.h

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include <cppkafka/macros.h>
4545
#include <cppkafka/message.h>
4646
#include <cppkafka/message_builder.h>
47+
#include <cppkafka/message_internal.h>
4748
#include <cppkafka/metadata.h>
4849
#include <cppkafka/producer.h>
4950
#include <cppkafka/queue.h>

Diff for: include/cppkafka/message_internal.h

+5-17
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535

3636
namespace cppkafka {
3737

38+
class Producer;
39+
3840
struct Internal {
3941
virtual ~Internal() = default;
4042
};
@@ -44,25 +46,11 @@ using InternalPtr = std::shared_ptr<Internal>;
4446
* \brief Private message data structure
4547
*/
4648
class MessageInternal {
47-
friend class Producer;
48-
49+
friend Producer;
4950
public:
50-
static std::unique_ptr<MessageInternal> load(Message& message) {
51-
if (message.get_user_data()) {
52-
// Unpack internal data
53-
std::unique_ptr<MessageInternal> internal_data(static_cast<MessageInternal*>(message.get_user_data()));
54-
message.load_internal(internal_data->user_data_, internal_data->internal_);
55-
return internal_data;
56-
}
57-
return nullptr;
58-
}
59-
51+
static std::unique_ptr<MessageInternal> load(const Producer& producer, Message& message);
6052
private:
61-
MessageInternal(void* user_data, std::shared_ptr<Internal> internal)
62-
: user_data_(user_data),
63-
internal_(internal) {
64-
}
65-
53+
MessageInternal(void* user_data, std::shared_ptr<Internal> internal);
6654
void* user_data_;
6755
InternalPtr internal_;
6856
};

Diff for: include/cppkafka/producer.h

+7
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@
3131
#define CPPKAFKA_PRODUCER_H
3232

3333
#include <memory>
34+
#include <tuple>
3435
#include "kafka_handle_base.h"
3536
#include "configuration.h"
3637
#include "buffer.h"
3738
#include "topic.h"
3839
#include "macros.h"
3940
#include "message_builder.h"
41+
#include "message_internal.h"
4042

4143
namespace cppkafka {
4244

@@ -78,6 +80,7 @@ class Message;
7880
*/
7981
class CPPKAFKA_API Producer : public KafkaHandleBase {
8082
public:
83+
friend MessageInternal;
8184
/**
8285
* The policy to use for the payload. The default policy is COPY_PAYLOAD
8386
*/
@@ -156,7 +159,11 @@ class CPPKAFKA_API Producer : public KafkaHandleBase {
156159
*/
157160
void flush(std::chrono::milliseconds timeout);
158161
private:
162+
using LoadResult = std::tuple<void*, std::unique_ptr<MessageInternal>>;
163+
LoadResult load_internal(void* user_data, InternalPtr internal);
164+
159165
PayloadPolicy message_payload_policy_;
166+
bool has_internal_data_;
160167
};
161168

162169
} // cppkafka

Diff for: src/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ set(SOURCES
77
buffer.cpp
88
queue.cpp
99
message.cpp
10+
message_internal.cpp
1011
topic_partition.cpp
1112
topic_partition_list.cpp
1213
metadata.cpp

Diff for: src/configuration.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ namespace cppkafka {
5252
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
5353
Producer* handle = static_cast<Producer*>(opaque);
5454
Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
55-
unique_ptr<MessageInternal> internal_data(MessageInternal::load(message));
55+
unique_ptr<MessageInternal> internal_data(MessageInternal::load(*handle, message));
5656
CallbackInvoker<Configuration::DeliveryReportCallback>
5757
("delivery report", handle->get_configuration().get_delivery_report_callback(), handle)
5858
(*handle, message);

Diff for: src/message_internal.cpp

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
#include "message_internal.h"
30+
#include "producer.h"
31+
32+
namespace cppkafka {
33+
34+
MessageInternal::MessageInternal(void* user_data, std::shared_ptr<Internal> internal)
35+
: user_data_(user_data),
36+
internal_(internal) {
37+
}
38+
39+
std::unique_ptr<MessageInternal> MessageInternal::load(const Producer& producer, Message& message) {
40+
if (producer.has_internal_data_ && message.get_user_data()) {
41+
// Unpack internal data
42+
std::unique_ptr<MessageInternal> internal_data(static_cast<MessageInternal*>(message.get_user_data()));
43+
message.load_internal(internal_data->user_data_, internal_data->internal_);
44+
return internal_data;
45+
}
46+
return nullptr;
47+
}
48+
49+
}

Diff for: src/producer.cpp

+21-17
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,13 @@ using std::move;
3737
using std::string;
3838
using std::chrono::milliseconds;
3939
using std::unique_ptr;
40+
using std::get;
4041

4142
namespace cppkafka {
4243

4344
Producer::Producer(Configuration config)
44-
: KafkaHandleBase(move(config)), message_payload_policy_(PayloadPolicy::COPY_PAYLOAD) {
45+
: KafkaHandleBase(move(config)), message_payload_policy_(PayloadPolicy::COPY_PAYLOAD),
46+
has_internal_data_(false) {
4547
char error_buffer[512];
4648
auto config_handle = get_configuration().get_handle();
4749
rd_kafka_conf_set_opaque(config_handle, this);
@@ -67,47 +69,37 @@ void Producer::produce(const MessageBuilder& builder) {
6769
const Buffer& payload = builder.payload();
6870
const Buffer& key = builder.key();
6971
const int policy = static_cast<int>(message_payload_policy_);
70-
void* opaque = builder.user_data();
71-
unique_ptr<MessageInternal> internal_data;
72-
if (get_configuration().get_delivery_report_callback()) {
73-
internal_data.reset(new MessageInternal(builder.user_data(), builder.internal()));
74-
opaque = internal_data.get();
75-
}
72+
LoadResult load_result = load_internal(builder.user_data(), builder.internal());
7673
auto result = rd_kafka_producev(get_handle(),
7774
RD_KAFKA_V_TOPIC(builder.topic().data()),
7875
RD_KAFKA_V_PARTITION(builder.partition()),
7976
RD_KAFKA_V_MSGFLAGS(policy),
8077
RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()),
8178
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
8279
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
83-
RD_KAFKA_V_OPAQUE(opaque),
80+
RD_KAFKA_V_OPAQUE(get<0>(load_result)),
8481
RD_KAFKA_V_END);
8582
check_error(result);
86-
internal_data.release(); //data has been passed-on to rdkafka so we release ownership
83+
get<1>(load_result).release(); //data has been passed-on to rdkafka so we release ownership
8784
}
8885

8986
void Producer::produce(const Message& message) {
9087
const Buffer& payload = message.get_payload();
9188
const Buffer& key = message.get_key();
9289
const int policy = static_cast<int>(message_payload_policy_);
9390
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
94-
void* opaque = message.get_user_data();
95-
unique_ptr<MessageInternal> internal_data;
96-
if (get_configuration().get_delivery_report_callback()) {
97-
internal_data.reset(new MessageInternal(message.get_user_data(), message.internal()));
98-
opaque = internal_data.get();
99-
}
91+
LoadResult load_result = load_internal(message.get_user_data(), message.internal());
10092
auto result = rd_kafka_producev(get_handle(),
10193
RD_KAFKA_V_TOPIC(message.get_topic().data()),
10294
RD_KAFKA_V_PARTITION(message.get_partition()),
10395
RD_KAFKA_V_MSGFLAGS(policy),
10496
RD_KAFKA_V_TIMESTAMP(duration),
10597
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
10698
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
107-
RD_KAFKA_V_OPAQUE(opaque),
99+
RD_KAFKA_V_OPAQUE(get<0>(load_result)),
108100
RD_KAFKA_V_END);
109101
check_error(result);
110-
internal_data.release(); //data has been passed-on to rdkafka so we release ownership
102+
get<1>(load_result).release(); //data has been passed-on to rdkafka so we release ownership
111103
}
112104

113105
int Producer::poll() {
@@ -127,4 +119,16 @@ void Producer::flush(milliseconds timeout) {
127119
check_error(result);
128120
}
129121

122+
Producer::LoadResult Producer::load_internal(void* user_data, InternalPtr internal) {
123+
unique_ptr<MessageInternal> internal_data;
124+
if (!has_internal_data_ && internal) {
125+
has_internal_data_ = true; //enable once for this producer
126+
}
127+
if (has_internal_data_ && get_configuration().get_delivery_report_callback()) {
128+
internal_data.reset(new MessageInternal(user_data, internal));
129+
user_data = internal_data.get(); //point to the internal data
130+
}
131+
return LoadResult(user_data, move(internal_data));
132+
}
133+
130134
} // cppkafka

0 commit comments

Comments
 (0)