Skip to content

Commit ef5ed27

Browse files
author
accelerated
committed
Changed based on feedback
1 parent 4c1d107 commit ef5ed27

File tree

4 files changed

+105
-58
lines changed

4 files changed

+105
-58
lines changed

Diff for: include/cppkafka/message_builder.h

+19
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "buffer.h"
3535
#include "topic.h"
3636
#include "macros.h"
37+
#include "message.h"
3738

3839
namespace cppkafka {
3940

@@ -49,6 +50,14 @@ class BasicMessageBuilder {
4950
* \param topic The topic into which this message would be produced
5051
*/
5152
BasicMessageBuilder(std::string topic);
53+
54+
/**
55+
* Construct a BasicMessageBuilder from a Message object
56+
*
57+
* \remark The application must guarantee the lifetime of the Message exceeds that of this
58+
* BasicMessageBuilder as this class does not take ownership of any Message buffers
59+
*/
60+
BasicMessageBuilder(const Message& message);
5261

5362
/**
5463
* \brief Construct a message builder from another one that uses a different buffer type
@@ -177,6 +186,16 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(std::string topic)
177186
: topic_(std::move(topic)) {
178187
}
179188

189+
template <typename T, typename C>
190+
BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
191+
: topic_(message.get_topic()),
192+
key_(Buffer(message.get_key().get_data(), message.get_key().get_size())),
193+
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
194+
user_data_(message.get_user_data())
195+
{
196+
197+
}
198+
180199
template <typename T, typename C>
181200
template <typename U, typename V>
182201
BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs)

Diff for: include/cppkafka/producer.h

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase {
8282
* The policy to use for the payload. The default policy is COPY_PAYLOAD
8383
*/
8484
enum class PayloadPolicy {
85+
PASSTHROUGH_PAYLOAD = 0, ///< Rdkafka will not copy nor free the payload.
8586
COPY_PAYLOAD = RD_KAFKA_MSG_F_COPY, ///< Means RD_KAFKA_MSG_F_COPY
8687
FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE
8788
};

Diff for: include/cppkafka/utils/buffered_producer.h

+85-56
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
#define CPPKAFKA_BUFFERED_PRODUCER_H
3232

3333
#include <string>
34-
#include <queue>
34+
#include <list>
3535
#include <cstdint>
3636
#include <algorithm>
3737
#include <unordered_set>
@@ -55,10 +55,12 @@ namespace cppkafka {
5555
* produced messages (either in a buffer or non buffered way) are acknowledged by the kafka
5656
* brokers.
5757
*
58-
* When producing messages, this class will handle cases where the producer's queue is full so it\
58+
* When producing messages, this class will handle cases where the producer's queue is full so it
5959
* will poll until the production is successful.
6060
*
61-
* This class is not thread safe.
61+
* \remark This class is thread safe
62+
*
63+
* \warning The application *MUST NOT* change the payload policy on the underlying Producer object.
6264
*/
6365
template <typename BufferType>
6466
class CPPKAFKA_API BufferedProducer {
@@ -69,9 +71,14 @@ class CPPKAFKA_API BufferedProducer {
6971
using Builder = ConcreteMessageBuilder<BufferType>;
7072

7173
/**
72-
* Callback to indicate a message failed to be produced.
74+
* Callback to indicate a message failed to be produced by the broker
7375
*/
7476
using ProduceFailureCallback = std::function<bool(const Message&)>;
77+
78+
/**
79+
* Callback to indicate a message failed to be flushed
80+
*/
81+
using FlushFailureCallback = std::function<bool(const Builder&, Error error)>;
7582

7683
/**
7784
* \brief Constructs a buffered producer using the provided configuration
@@ -108,6 +115,8 @@ class CPPKAFKA_API BufferedProducer {
108115
* wait for it to be acknowledged.
109116
*
110117
* \param builder The builder that contains the message to be produced
118+
*
119+
* \remark This method throws cppkafka::HandleException on failure
111120
*/
112121
void produce(const MessageBuilder& builder);
113122

@@ -118,6 +127,8 @@ class CPPKAFKA_API BufferedProducer {
118127
* wait for it to be acknowledged.
119128
*
120129
* \param message The message to be produced
130+
*
131+
* \remark This method throws cppkafka::HandleException on failure
121132
*/
122133
void produce(const Message& message);
123134

@@ -168,20 +179,11 @@ class CPPKAFKA_API BufferedProducer {
168179
size_t get_buffer_size() const;
169180

170181
/**
171-
* \brief Returns the total number of messages ack-ed by the broker
182+
* \brief Returns the total number of messages ack-ed by the broker since the beginning
172183
*
173-
* \return The total number of messages since the beginning or since the last roll-over
174-
*
175-
* \remark Call get_rollover_count() to get the number of times the counter has rolled over
184+
* \return The number of messages
176185
*/
177186
size_t get_total_messages_acked() const;
178-
179-
/**
180-
* \brief Roll-over counter for get_total_messages_acked
181-
*
182-
* \return The number of rolls
183-
*/
184-
uint16_t get_rollover_count() const;
185187

186188
/**
187189
* Gets the Producer object
@@ -206,46 +208,67 @@ class CPPKAFKA_API BufferedProducer {
206208
* false. Note that if the callback return false, then the message will be discarded.
207209
*
208210
* \param callback The callback to be set
211+
*
212+
* \remark It is *highly* recommended to set this callback as your message may be produced
213+
* indefinitely if there's a remote error.
214+
*
215+
* \warning Do not call any method on the BufferedProducer while inside this callback.
209216
*/
210217
void set_produce_failure_callback(ProduceFailureCallback callback);
211218

219+
/**
220+
* \brief Sets the local message produce failure callback
221+
*
222+
* This callback will be called when local message production fails during a flush() operation.
223+
* Failure errors are typically payload too large, unknown topic or unknown partition.
224+
* Note that if the callback returns false, the message will be dropped from the buffer,
225+
* otherwise it will be re-enqueued for later retry.
226+
*
227+
* \param callback
228+
*
229+
* \warning Do not call any method on the BufferedProducer while inside this callback
230+
*/
231+
void set_flush_failure_callback(FlushFailureCallback callback);
232+
212233
private:
213-
using QueueType = std::queue<Builder>;
234+
using QueueType = std::list<Builder>;
235+
enum class MessagePriority { Low, High };
214236

215237
template <typename BuilderType>
216-
void do_add_message(BuilderType&& builder);
238+
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
217239
template <typename MessageType>
218240
void produce_message(const MessageType& message);
219241
Configuration prepare_configuration(Configuration config);
220242
void on_delivery_report(const Message& message);
221-
222243

244+
// Members
223245
Configuration::DeliveryReportCallback delivery_report_callback_;
224246
Producer producer_;
225247
QueueType messages_;
226248
mutable std::mutex mutex_;
227249
ProduceFailureCallback produce_failure_callback_;
250+
FlushFailureCallback flush_failure_callback_;
228251
ssize_t max_buffer_size_{-1};
229252
std::atomic_ulong expected_acks_{0};
230253
std::atomic_ullong total_messages_acked_{0};
231-
std::atomic_ushort rollover_counter_{0};
232254
};
233255

234256
template <typename BufferType>
235257
BufferedProducer<BufferType>::BufferedProducer(Configuration config)
236258
: delivery_report_callback_(config.get_delivery_report_callback()),
237259
producer_(prepare_configuration(std::move(config))) {
238-
260+
// Allow re-queuing failed messages
261+
producer_.set_payload_policy(Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD);
239262
}
240263

241264
template <typename BufferType>
242265
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
243-
do_add_message(builder);
266+
do_add_message(builder, MessagePriority::Low, true);
244267
}
245268

246269
template <typename BufferType>
247270
void BufferedProducer<BufferType>::add_message(Builder builder) {
248-
do_add_message(move(builder));
271+
do_add_message(move(builder), MessagePriority::Low, true);
249272
}
250273

251274
template <typename BufferType>
@@ -256,19 +279,27 @@ void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
256279
template <typename BufferType>
257280
void BufferedProducer<BufferType>::produce(const Message& message) {
258281
produce_message(message);
259-
expected_acks_++;
260282
}
261283

262284
template <typename BufferType>
263285
void BufferedProducer<BufferType>::flush() {
264-
size_t num_messages = messages_.size();
265-
while (num_messages--) {
286+
QueueType flush_queue; // flush from temporary queue
287+
{
266288
std::lock_guard<std::mutex> lock(mutex_);
267-
if (messages_.empty()) {
268-
break; //perhaps clear() was called
289+
std::swap(messages_, flush_queue);
290+
}
291+
while (!flush_queue.empty()) {
292+
try {
293+
produce_message(flush_queue.front());
269294
}
270-
produce_message(messages_.front());
271-
messages_.pop();
295+
catch (const HandleException& ex) {
296+
if (flush_failure_callback_ &&
297+
flush_failure_callback_(flush_queue.front(), ex.get_error())) {
298+
// retry again later
299+
do_add_message(std::move(flush_queue.front()), MessagePriority::Low, false);
300+
}
301+
}
302+
flush_queue.pop_front();
272303
}
273304
wait_for_acks();
274305
}
@@ -318,12 +349,19 @@ size_t BufferedProducer<BufferType>::get_buffer_size() const {
318349

319350
template <typename BufferType>
320351
template <typename BuilderType>
321-
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {
352+
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
353+
MessagePriority priority,
354+
bool do_flush) {
322355
{
323356
std::lock_guard<std::mutex> lock(mutex_);
324-
messages_.push(std::move(builder));
357+
if (priority == MessagePriority::High) {
358+
messages_.emplace_front(std::move(builder));
359+
}
360+
else {
361+
messages_.emplace_back(std::move(builder));
362+
}
325363
}
326-
if ((max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
364+
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
327365
flush();
328366
}
329367
}
@@ -338,21 +376,11 @@ const Producer& BufferedProducer<BufferType>::get_producer() const {
338376
return producer_;
339377
}
340378

341-
template <typename BufferType>
342-
size_t BufferedProducer<BufferType>::get_buffer_size() const {
343-
return messages_.size();
344-
}
345-
346379
template <typename BufferType>
347380
size_t BufferedProducer<BufferType>::get_total_messages_acked() const {
348381
return total_messages_acked_;
349382
}
350383

351-
template <typename BufferType>
352-
uint16_t BufferedProducer<BufferType>::get_rollover_count() const {
353-
return rollover_counter_;
354-
}
355-
356384
template <typename BufferType>
357385
typename BufferedProducer<BufferType>::Builder
358386
BufferedProducer<BufferType>::make_builder(std::string topic) {
@@ -364,18 +392,23 @@ void BufferedProducer<BufferType>::set_produce_failure_callback(ProduceFailureCa
364392
produce_failure_callback_ = std::move(callback);
365393
}
366394

395+
template <typename BufferType>
396+
void BufferedProducer<BufferType>::set_flush_failure_callback(FlushFailureCallback callback) {
397+
flush_failure_callback_ = std::move(callback);
398+
}
399+
367400
template <typename BufferType>
368401
template <typename MessageType>
369402
void BufferedProducer<BufferType>::produce_message(const MessageType& message) {
370-
bool sent = false;
371-
while (!sent) {
403+
while (true) {
372404
try {
373405
producer_.produce(message);
374-
sent = true;
406+
// Sent successfully
407+
++expected_acks_;
408+
break;
375409
}
376410
catch (const HandleException& ex) {
377-
const Error error = ex.get_error();
378-
if (error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
411+
if (ex.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
379412
// If the output queue is full, then just poll
380413
producer_.poll();
381414
}
@@ -384,8 +417,6 @@ void BufferedProducer<BufferType>::produce_message(const MessageType& message) {
384417
}
385418
}
386419
}
387-
// Sent successfully
388-
++expected_acks_;
389420
}
390421

391422
template <typename BufferType>
@@ -412,14 +443,12 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
412443
bool should_produce = message.get_error() &&
413444
(!produce_failure_callback_ || produce_failure_callback_(message));
414445
if (should_produce) {
415-
produce_message(message);
416-
return;
446+
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
447+
do_add_message(Builder(message), MessagePriority::High, false);
417448
}
418-
419-
// Increment the total successful transmissions
420-
++total_messages_acked_;
421-
if (total_messages_acked_ == 0) {
422-
++rollover_counter_;
449+
else {
450+
// Increment the total successful transmissions
451+
++total_messages_acked_;
423452
}
424453
}
425454

Diff for: tests/producer_test.cpp

-2
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,6 @@ TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") {
370370
const auto& messages = runner.get_messages();
371371
REQUIRE(messages.size() == num_messages);
372372
REQUIRE(producer.get_total_messages_acked() == num_messages);
373-
REQUIRE(producer.get_rollover_count() == 0);
374373
REQUIRE(producer.get_buffer_size() == 0);
375374
}
376375

@@ -392,6 +391,5 @@ TEST_CASE("clear multi-threaded buffered producer", "[producer][buffered_produce
392391
}
393392

394393
REQUIRE(producer.get_total_messages_acked() == 0);
395-
REQUIRE(producer.get_rollover_count() == 0);
396394
REQUIRE(producer.get_buffer_size() < num_messages);
397395
}

0 commit comments

Comments
 (0)