31
31
#define CPPKAFKA_BUFFERED_PRODUCER_H
32
32
33
33
#include < string>
34
- #include < queue >
34
+ #include < list >
35
35
#include < cstdint>
36
36
#include < algorithm>
37
37
#include < unordered_set>
@@ -55,10 +55,12 @@ namespace cppkafka {
55
55
* produced messages (either in a buffer or non buffered way) are acknowledged by the kafka
56
56
* brokers.
57
57
*
58
- * When producing messages, this class will handle cases where the producer's queue is full so it\
58
+ * When producing messages, this class will handle cases where the producer's queue is full so it
59
59
* will poll until the production is successful.
60
60
*
61
- * This class is not thread safe.
61
+ * \remark This class is thread safe
62
+ *
63
+ * \warning The application *MUST NOT* change the payload policy on the underlying Producer object.
62
64
*/
63
65
template <typename BufferType>
64
66
class CPPKAFKA_API BufferedProducer {
@@ -69,9 +71,14 @@ class CPPKAFKA_API BufferedProducer {
69
71
using Builder = ConcreteMessageBuilder<BufferType>;
70
72
71
73
/* *
72
- * Callback to indicate a message failed to be produced.
74
+ * Callback to indicate a message failed to be produced by the broker
73
75
*/
74
76
using ProduceFailureCallback = std::function<bool (const Message&)>;
77
+
78
+ /* *
79
+ * Callback to indicate a message failed to be flushed
80
+ */
81
+ using FlushFailureCallback = std::function<bool (const Builder&, Error error)>;
75
82
76
83
/* *
77
84
* \brief Constructs a buffered producer using the provided configuration
@@ -108,6 +115,8 @@ class CPPKAFKA_API BufferedProducer {
108
115
* wait for it to be acknowledged.
109
116
*
110
117
* \param builder The builder that contains the message to be produced
118
+ *
119
+ * \remark This method throws cppkafka::HandleException on failure
111
120
*/
112
121
void produce (const MessageBuilder& builder);
113
122
@@ -118,6 +127,8 @@ class CPPKAFKA_API BufferedProducer {
118
127
* wait for it to be acknowledged.
119
128
*
120
129
* \param message The message to be produced
130
+ *
131
+ * \remark This method throws cppkafka::HandleException on failure
121
132
*/
122
133
void produce (const Message& message);
123
134
@@ -168,20 +179,11 @@ class CPPKAFKA_API BufferedProducer {
168
179
size_t get_buffer_size () const ;
169
180
170
181
/* *
171
- * \brief Returns the total number of messages ack-ed by the broker
182
+ * \brief Returns the total number of messages ack-ed by the broker since the beginning
172
183
*
173
- * \return The total number of messages since the beginning or since the last roll-over
174
- *
175
- * \remark Call get_rollover_count() to get the number of times the counter has rolled over
184
+ * \return The number of messages
176
185
*/
177
186
size_t get_total_messages_acked () const ;
178
-
179
- /* *
180
- * \brief Roll-over counter for get_total_messages_acked
181
- *
182
- * \return The number of rolls
183
- */
184
- uint16_t get_rollover_count () const ;
185
187
186
188
/* *
187
189
* Gets the Producer object
@@ -206,46 +208,67 @@ class CPPKAFKA_API BufferedProducer {
206
208
* false. Note that if the callback return false, then the message will be discarded.
207
209
*
208
210
* \param callback The callback to be set
211
+ *
212
+ * \remark It is *highly* recommended to set this callback as your message may be produced
213
+ * indefinitely if there's a remote error.
214
+ *
215
+ * \warning Do not call any method on the BufferedProducer while inside this callback.
209
216
*/
210
217
void set_produce_failure_callback (ProduceFailureCallback callback);
211
218
219
+ /* *
220
+ * \brief Sets the local message produce failure callback
221
+ *
222
+ * This callback will be called when local message production fails during a flush() operation.
223
+ * Failure errors are typically payload too large, unknown topic or unknown partition.
224
+ * Note that if the callback returns false, the message will be dropped from the buffer,
225
+ * otherwise it will be re-enqueued for later retry.
226
+ *
227
+ * \param callback
228
+ *
229
+ * \warning Do not call any method on the BufferedProducer while inside this callback
230
+ */
231
+ void set_flush_failure_callback (FlushFailureCallback callback);
232
+
212
233
private:
213
- using QueueType = std::queue<Builder>;
234
+ using QueueType = std::list<Builder>;
235
+ enum class MessagePriority { Low, High };
214
236
215
237
template <typename BuilderType>
216
- void do_add_message (BuilderType&& builder);
238
+ void do_add_message (BuilderType&& builder, MessagePriority priority, bool do_flush );
217
239
template <typename MessageType>
218
240
void produce_message (const MessageType& message);
219
241
Configuration prepare_configuration (Configuration config);
220
242
void on_delivery_report (const Message& message);
221
-
222
243
244
+ // Members
223
245
Configuration::DeliveryReportCallback delivery_report_callback_;
224
246
Producer producer_;
225
247
QueueType messages_;
226
248
mutable std::mutex mutex_;
227
249
ProduceFailureCallback produce_failure_callback_;
250
+ FlushFailureCallback flush_failure_callback_;
228
251
ssize_t max_buffer_size_{-1 };
229
252
std::atomic_ulong expected_acks_{0 };
230
253
std::atomic_ullong total_messages_acked_{0 };
231
- std::atomic_ushort rollover_counter_{0 };
232
254
};
233
255
234
256
template <typename BufferType>
235
257
BufferedProducer<BufferType>::BufferedProducer(Configuration config)
236
258
: delivery_report_callback_(config.get_delivery_report_callback()),
237
259
producer_ (prepare_configuration(std::move(config))) {
238
-
260
+ // Allow re-queuing failed messages
261
+ producer_.set_payload_policy (Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD);
239
262
}
240
263
241
264
template <typename BufferType>
242
265
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
243
- do_add_message (builder);
266
+ do_add_message (builder, MessagePriority::Low, true );
244
267
}
245
268
246
269
template <typename BufferType>
247
270
void BufferedProducer<BufferType>::add_message(Builder builder) {
248
- do_add_message (move (builder));
271
+ do_add_message (move (builder), MessagePriority::Low, true );
249
272
}
250
273
251
274
template <typename BufferType>
@@ -256,19 +279,27 @@ void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
256
279
template <typename BufferType>
257
280
void BufferedProducer<BufferType>::produce(const Message& message) {
258
281
produce_message (message);
259
- expected_acks_++;
260
282
}
261
283
262
284
template <typename BufferType>
263
285
void BufferedProducer<BufferType>::flush() {
264
- size_t num_messages = messages_. size ();
265
- while (num_messages--) {
286
+ QueueType flush_queue; // flush from temporary queue
287
+ {
266
288
std::lock_guard<std::mutex> lock (mutex_);
267
- if (messages_.empty ()) {
268
- break ; // perhaps clear() was called
289
+ std::swap (messages_, flush_queue);
290
+ }
291
+ while (!flush_queue.empty ()) {
292
+ try {
293
+ produce_message (flush_queue.front ());
269
294
}
270
- produce_message (messages_.front ());
271
- messages_.pop ();
295
+ catch (const HandleException& ex) {
296
+ if (flush_failure_callback_ &&
297
+ flush_failure_callback_ (flush_queue.front (), ex.get_error ())) {
298
+ // retry again later
299
+ do_add_message (std::move (flush_queue.front ()), MessagePriority::Low, false );
300
+ }
301
+ }
302
+ flush_queue.pop_front ();
272
303
}
273
304
wait_for_acks ();
274
305
}
@@ -318,12 +349,19 @@ size_t BufferedProducer<BufferType>::get_buffer_size() const {
318
349
319
350
template <typename BufferType>
320
351
template <typename BuilderType>
321
- void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {
352
+ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
353
+ MessagePriority priority,
354
+ bool do_flush) {
322
355
{
323
356
std::lock_guard<std::mutex> lock (mutex_);
324
- messages_.push (std::move (builder));
357
+ if (priority == MessagePriority::High) {
358
+ messages_.emplace_front (std::move (builder));
359
+ }
360
+ else {
361
+ messages_.emplace_back (std::move (builder));
362
+ }
325
363
}
326
- if ((max_buffer_size_ >= 0 ) && (max_buffer_size_ <= (ssize_t )messages_.size ())) {
364
+ if (do_flush && (max_buffer_size_ >= 0 ) && (max_buffer_size_ <= (ssize_t )messages_.size ())) {
327
365
flush ();
328
366
}
329
367
}
@@ -338,21 +376,11 @@ const Producer& BufferedProducer<BufferType>::get_producer() const {
338
376
return producer_;
339
377
}
340
378
341
- template <typename BufferType>
342
- size_t BufferedProducer<BufferType>::get_buffer_size() const {
343
- return messages_.size ();
344
- }
345
-
346
379
template <typename BufferType>
347
380
size_t BufferedProducer<BufferType>::get_total_messages_acked() const {
348
381
return total_messages_acked_;
349
382
}
350
383
351
- template <typename BufferType>
352
- uint16_t BufferedProducer<BufferType>::get_rollover_count() const {
353
- return rollover_counter_;
354
- }
355
-
356
384
template <typename BufferType>
357
385
typename BufferedProducer<BufferType>::Builder
358
386
BufferedProducer<BufferType>::make_builder(std::string topic) {
@@ -364,18 +392,23 @@ void BufferedProducer<BufferType>::set_produce_failure_callback(ProduceFailureCa
364
392
produce_failure_callback_ = std::move (callback);
365
393
}
366
394
395
+ template <typename BufferType>
396
+ void BufferedProducer<BufferType>::set_flush_failure_callback(FlushFailureCallback callback) {
397
+ flush_failure_callback_ = std::move (callback);
398
+ }
399
+
367
400
template <typename BufferType>
368
401
template <typename MessageType>
369
402
void BufferedProducer<BufferType>::produce_message(const MessageType& message) {
370
- bool sent = false ;
371
- while (!sent) {
403
+ while (true ) {
372
404
try {
373
405
producer_.produce (message);
374
- sent = true ;
406
+ // Sent successfully
407
+ ++expected_acks_;
408
+ break ;
375
409
}
376
410
catch (const HandleException& ex) {
377
- const Error error = ex.get_error ();
378
- if (error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
411
+ if (ex.get_error () == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
379
412
// If the output queue is full, then just poll
380
413
producer_.poll ();
381
414
}
@@ -384,8 +417,6 @@ void BufferedProducer<BufferType>::produce_message(const MessageType& message) {
384
417
}
385
418
}
386
419
}
387
- // Sent successfully
388
- ++expected_acks_;
389
420
}
390
421
391
422
template <typename BufferType>
@@ -412,14 +443,12 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
412
443
bool should_produce = message.get_error () &&
413
444
(!produce_failure_callback_ || produce_failure_callback_ (message));
414
445
if (should_produce) {
415
- produce_message (message);
416
- return ;
446
+ // Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
447
+ do_add_message ( Builder (message), MessagePriority::High, false ) ;
417
448
}
418
-
419
- // Increment the total successful transmissions
420
- ++total_messages_acked_;
421
- if (total_messages_acked_ == 0 ) {
422
- ++rollover_counter_;
449
+ else {
450
+ // Increment the total successful transmissions
451
+ ++total_messages_acked_;
423
452
}
424
453
}
425
454
0 commit comments