Skip to content

Commit 841e632

Browse files
acceleratedmfontanini
authored andcommitted
Allow access to the user-supplied delivery callback. (#66)
* Allow access to the user-supplied delivery callback. * Remove valgrind warning * Added buffer size watermark * added ability to produce a message directly * Updated on_delivery_report function
1 parent 46c396f commit 841e632

File tree

4 files changed

+178
-28
lines changed

4 files changed

+178
-28
lines changed

Diff for: include/cppkafka/producer.h

+15-9
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ namespace cppkafka {
4343
class Topic;
4444
class Buffer;
4545
class TopicConfiguration;
46+
class Message;
4647

4748
/**
4849
* \brief Producer class
@@ -86,39 +87,44 @@ class CPPKAFKA_API Producer : public KafkaHandleBase {
8687
};
8788

8889
/**
89-
* Constructs a producer using the given configuration
90+
* \brief Constructs a producer using the given configuration
9091
*
9192
* \param config The configuration to use
9293
*/
9394
Producer(Configuration config);
9495

9596
/**
96-
* Sets the payload policy
97+
* \brief Sets the payload policy
9798
*
9899
* \param policy The payload policy to be used
99100
*/
100101
void set_payload_policy(PayloadPolicy policy);
101102

102103
/**
103-
* Returns the current payload policy
104+
* \brief Returns the current payload policy
104105
*/
105106
PayloadPolicy get_payload_policy() const;
106107

107108
/**
108-
* Produces a message
109+
* \brief Produces a message
109110
*
110-
* \param topic The topic to write the message to
111-
* \param partition The partition to write the message to
112-
* \param payload The message payload
111+
* \param builder The builder class used to compose a message
113112
*/
114113
void produce(const MessageBuilder& builder);
114+
115+
/**
116+
* \brief Produces a message
117+
*
118+
* \param message The message to be produced
119+
*/
120+
void produce(const Message& message);
115121

116122
/**
117123
* \brief Polls on this handle
118124
*
119125
* This translates into a call to rd_kafka_poll.
120126
*
121-
* The timeout used on this call is the one configured via Producer::set_timeout.
127+
* \remark The timeout used on this call is the one configured via Producer::set_timeout.
122128
*/
123129
int poll();
124130

@@ -136,7 +142,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase {
136142
*
137143
* This translates into a call to rd_kafka_flush.
138144
*
139-
* The timeout used on this call is the one configured via Producer::set_timeout.
145+
* \remark The timeout used on this call is the one configured via Producer::set_timeout.
140146
*/
141147
void flush();
142148

Diff for: include/cppkafka/utils/buffered_producer.h

+82-18
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,16 @@ class CPPKAFKA_API BufferedProducer {
108108
* \param builder The builder that contains the message to be produced
109109
*/
110110
void produce(const MessageBuilder& builder);
111+
112+
/**
113+
* \brief Produces a message without buffering it
114+
*
115+
* The message will still be tracked so that a call to flush or wait_for_acks will actually
116+
* wait for it to be acknowledged.
117+
*
118+
* \param message The message to be produced
119+
*/
120+
void produce(const Message& message);
111121

112122
/**
113123
* \brief Flushes the buffered messages.
@@ -126,6 +136,34 @@ class CPPKAFKA_API BufferedProducer {
126136
* Clears any buffered messages
127137
*/
128138
void clear();
139+
140+
/**
141+
* \brief Sets the maximum amount of messages to be enqueued in the buffer.
142+
*
143+
* After 'max_buffer_size' is reached, flush() will be called automatically.
144+
*
145+
* \param size The max size of the internal buffer. Allowed values are:
146+
* -1 : Unlimited buffer size. Must be flushed manually (default value)
147+
* 0 : Don't buffer anything. add_message() behaves like produce()
148+
* > 0 : Max number of messages before flush() is called.
149+
*
150+
* \remark add_message() will block when 'max_buffer_size' is reached due to flush()
151+
*/
152+
void set_max_buffer_size(ssize_t max_buffer_size);
153+
154+
/**
155+
* \brief Return the maximum allowed buffer size.
156+
*
157+
* \return The max buffer size. A value of -1 indicates an unbounded buffer.
158+
*/
159+
ssize_t get_max_buffer_size() const;
160+
161+
/**
162+
* \brief Get the number of messages in the buffer
163+
*
164+
* \return The number of messages
165+
*/
166+
size_t get_buffer_size() const;
129167

130168
/**
131169
* Gets the Producer object
@@ -157,20 +195,25 @@ class CPPKAFKA_API BufferedProducer {
157195

158196
template <typename BuilderType>
159197
void do_add_message(BuilderType&& builder);
160-
void produce_message(const MessageBuilder& message);
198+
template <typename MessageType>
199+
void produce_message(const MessageType& message);
161200
Configuration prepare_configuration(Configuration config);
162201
void on_delivery_report(const Message& message);
163202

203+
204+
Configuration::DeliveryReportCallback delivery_report_callback_;
164205
Producer producer_;
165206
QueueType messages_;
166207
ProduceFailureCallback produce_failure_callback_;
167208
size_t expected_acks_{0};
168209
size_t messages_acked_{0};
210+
ssize_t max_buffer_size_{-1};
169211
};
170212

171213
template <typename BufferType>
172214
BufferedProducer<BufferType>::BufferedProducer(Configuration config)
173-
: producer_(prepare_configuration(std::move(config))) {
215+
: delivery_report_callback_(config.get_delivery_report_callback()),
216+
producer_(prepare_configuration(std::move(config))) {
174217

175218
}
176219

@@ -190,13 +233,18 @@ void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
190233
expected_acks_++;
191234
}
192235

236+
template <typename BufferType>
237+
void BufferedProducer<BufferType>::produce(const Message& message) {
238+
produce_message(message);
239+
expected_acks_++;
240+
}
241+
193242
template <typename BufferType>
194243
void BufferedProducer<BufferType>::flush() {
195244
while (!messages_.empty()) {
196245
produce_message(messages_.front());
197246
messages_.pop();
198247
}
199-
200248
wait_for_acks();
201249
}
202250

@@ -228,11 +276,32 @@ void BufferedProducer<BufferType>::clear() {
228276
messages_acked_ = 0;
229277
}
230278

279+
template <typename BufferType>
280+
void BufferedProducer<BufferType>::set_max_buffer_size(ssize_t max_buffer_size) {
281+
if (max_buffer_size < -1) {
282+
throw Exception("Invalid buffer size.");
283+
}
284+
max_buffer_size_ = max_buffer_size;
285+
}
286+
287+
template <typename BufferType>
288+
ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const {
289+
return max_buffer_size_;
290+
}
291+
292+
template <typename BufferType>
293+
size_t BufferedProducer<BufferType>::get_buffer_size() const {
294+
return messages_.size();
295+
}
296+
231297
template <typename BufferType>
232298
template <typename BuilderType>
233299
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {
234300
expected_acks_++;
235-
messages_.push(std::move(builder));
301+
messages_.push(std::forward<BuilderType>(builder));
302+
if ((max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
303+
flush();
304+
}
236305
}
237306

238307
template <typename BufferType>
@@ -257,11 +326,12 @@ void BufferedProducer<BufferType>::set_produce_failure_callback(ProduceFailureCa
257326
}
258327

259328
template <typename BufferType>
260-
void BufferedProducer<BufferType>::produce_message(const MessageBuilder& builder) {
329+
template <typename MessageType>
330+
void BufferedProducer<BufferType>::produce_message(const MessageType& message) {
261331
bool sent = false;
262332
while (!sent) {
263333
try {
264-
producer_.produce(builder);
334+
producer_.produce(message);
265335
sent = true;
266336
}
267337
catch (const HandleException& ex) {
@@ -287,22 +357,16 @@ Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration
287357

288358
template <typename BufferType>
289359
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
290-
// We should produce this message again if it has an error and we either don't have a
360+
// Call the user-supplied delivery report callback if any
361+
if (delivery_report_callback_) {
362+
delivery_report_callback_(producer_, message);
363+
}
364+
// We should produce this message again if it has an error and we either don't have a
291365
// produce failure callback or we have one but it returns true
292366
bool should_produce = message.get_error() &&
293367
(!produce_failure_callback_ || produce_failure_callback_(message));
294368
if (should_produce) {
295-
MessageBuilder builder(message.get_topic());
296-
const auto& key = message.get_key();
297-
const auto& payload = message.get_payload();
298-
builder.partition(message.get_partition())
299-
.key(Buffer(key.get_data(), key.get_size()))
300-
.payload(Buffer(payload.get_data(), payload.get_size()))
301-
.user_data(message.get_user_data());
302-
if (message.get_timestamp()) {
303-
builder.timestamp(message.get_timestamp()->get_timestamp());
304-
}
305-
produce_message(builder);
369+
produce_message(message);
306370
return;
307371
}
308372
// If production was successful or the produce failure callback returned false, then

Diff for: src/producer.cpp

+18-1
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
#include <errno.h>
3131
#include "producer.h"
3232
#include "exceptions.h"
33+
#include "message.h"
3334

3435
using std::move;
3536
using std::string;
36-
3737
using std::chrono::milliseconds;
3838

3939
namespace cppkafka {
@@ -77,6 +77,23 @@ void Producer::produce(const MessageBuilder& builder) {
7777
check_error(result);
7878
}
7979

80+
void Producer::produce(const Message& message) {
81+
const Buffer& payload = message.get_payload();
82+
const Buffer& key = message.get_key();
83+
const int policy = static_cast<int>(message_payload_policy_);
84+
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
85+
auto result = rd_kafka_producev(get_handle(),
86+
RD_KAFKA_V_TOPIC(message.get_topic().data()),
87+
RD_KAFKA_V_PARTITION(message.get_partition()),
88+
RD_KAFKA_V_MSGFLAGS(policy),
89+
RD_KAFKA_V_TIMESTAMP(duration),
90+
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
91+
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
92+
RD_KAFKA_V_OPAQUE(message.get_user_data()),
93+
RD_KAFKA_V_END);
94+
check_error(result);
95+
}
96+
8097
int Producer::poll() {
8198
return poll(get_timeout());
8299
}

Diff for: tests/producer_test.cpp

+63
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,38 @@ TEST_CASE("simple production", "[producer]") {
101101
REQUIRE(!!message.get_timestamp() == true);
102102
CHECK(message.get_timestamp()->get_timestamp() == timestamp);
103103
}
104+
105+
SECTION("message without message builder") {
106+
const string payload = "Goodbye cruel world!";
107+
const string key = "replay key";
108+
const milliseconds timestamp{15};
109+
Producer producer(config);
110+
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition)
111+
.key(key)
112+
.payload(payload)
113+
.timestamp(timestamp));
114+
runner.try_join();
115+
ConsumerRunner runner2(consumer, 1, 1);
116+
117+
const auto& replay_messages = runner.get_messages();
118+
REQUIRE(replay_messages.size() == 1);
119+
const auto& replay_message = replay_messages[0];
120+
121+
//produce the same message again
122+
producer.produce(replay_message);
123+
runner2.try_join();
124+
125+
const auto& messages = runner2.get_messages();
126+
REQUIRE(messages.size() == 1);
127+
const auto& message = messages[0];
128+
CHECK(message.get_payload() == payload);
129+
CHECK(message.get_key() == key);
130+
CHECK(message.get_topic() == KAFKA_TOPIC);
131+
CHECK(message.get_partition() == partition);
132+
CHECK(!!message.get_error() == false);
133+
REQUIRE(!!message.get_timestamp() == true);
134+
CHECK(message.get_timestamp()->get_timestamp() == timestamp);
135+
}
104136

105137
SECTION("callbacks") {
106138
// Now create a producer and produce a message
@@ -240,3 +272,34 @@ TEST_CASE("buffered producer", "[producer]") {
240272
CHECK(message.get_payload() == payload);
241273
}
242274
}
275+
276+
TEST_CASE("buffered producer with limited buffer", "[producer]") {
277+
int partition = 0;
278+
int num_messages = 4;
279+
280+
// Create a consumer and assign this topic/partition
281+
Consumer consumer(make_consumer_config());
282+
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
283+
ConsumerRunner runner(consumer, 3, 1);
284+
285+
// Now create a buffered producer and produce two messages
286+
BufferedProducer<string> producer(make_producer_config());
287+
const string payload = "Hello world! 2";
288+
const string key = "such key";
289+
REQUIRE(producer.get_buffer_size() == 0);
290+
REQUIRE(producer.get_max_buffer_size() == -1);
291+
292+
// Limit the size of the internal buffer
293+
producer.set_max_buffer_size(num_messages-1);
294+
while (num_messages--) {
295+
producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition).key(key).payload(payload));
296+
}
297+
REQUIRE(producer.get_buffer_size() == 1);
298+
299+
// Finish the runner
300+
runner.try_join();
301+
302+
// Validate messages received
303+
const auto& messages = runner.get_messages();
304+
REQUIRE(messages.size() == producer.get_max_buffer_size());
305+
}

0 commit comments

Comments
 (0)