Skip to content

Commit 9881efb

Browse files
author
accelerated
committed
Changes based on latest review
1 parent ef5ed27 commit 9881efb

File tree

2 files changed

+55
-18
lines changed

2 files changed

+55
-18
lines changed

Diff for: include/cppkafka/message_builder.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@ class BasicMessageBuilder {
5353

5454
/**
5555
* Construct a BasicMessageBuilder from a Message object
56-
*
57-
* \remark The application must guarantee the lifetime of the Message exceeds that of this
58-
* BasicMessageBuilder as this class does not take ownership of any Message buffers
5956
*/
6057
BasicMessageBuilder(const Message& message);
6158

@@ -191,6 +188,8 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
191188
: topic_(message.get_topic()),
192189
key_(Buffer(message.get_key().get_data(), message.get_key().get_size())),
193190
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
191+
timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
192+
std::chrono::milliseconds(0)),
194193
user_data_(message.get_user_data())
195194
{
196195

Diff for: include/cppkafka/utils/buffered_producer.h

+53-15
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
#define CPPKAFKA_BUFFERED_PRODUCER_H
3232

3333
#include <string>
34-
#include <list>
34+
#include <deque>
3535
#include <cstdint>
3636
#include <algorithm>
3737
#include <unordered_set>
@@ -52,15 +52,23 @@ namespace cppkafka {
5252
* to produce them just as you would using the Producer class.
5353
*
5454
* When calling either flush or wait_for_acks, the buffered producer will block until all
55-
* produced messages (either in a buffer or non buffered way) are acknowledged by the kafka
56-
* brokers.
55+
* produced messages (either buffered or sent directly) are acknowledged by the kafka brokers.
5756
*
5857
* When producing messages, this class will handle cases where the producer's queue is full so it
5958
* will poll until the production is successful.
6059
*
6160
* \remark This class is thread safe
6261
*
63-
* \warning The application *MUST NOT* change the payload policy on the underlying Producer object.
62+
* \warning
63+
* Delivery Report Callback: This class makes internal use of this function and will overwrite anything
64+
* the user has supplied as part of the configuration options. Instead user should call
65+
* set_produce_success_callback() and set_produce_failure_callback() respectively.
66+
*
67+
* Payload Policy: For payload-owning BufferTypes such as std::string or std::vector<char> the default
68+
* policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type
69+
* cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka
70+
* shall not make any internal copies of the message and it is the application's responsability to free
71+
* the messages *after* the delivery report callback has reported a successful delivery to avoid corruption.
6472
*/
6573
template <typename BufferType>
6674
class CPPKAFKA_API BufferedProducer {
@@ -69,6 +77,11 @@ class CPPKAFKA_API BufferedProducer {
6977
* Concrete builder
7078
*/
7179
using Builder = ConcreteMessageBuilder<BufferType>;
80+
81+
/**
82+
* Callback to indicate a message was delivered to the broker
83+
*/
84+
using ProduceSuccessCallback = std::function<void(const Message&)>;
7285

7386
/**
7487
* Callback to indicate a message failed to be produced by the broker
@@ -137,6 +150,10 @@ class CPPKAFKA_API BufferedProducer {
137150
*
138151
* This will send all messages and keep waiting until all of them are acknowledged (this is
139152
* done by calling wait_for_acks).
153+
*
154+
* \remark Although it is possible to call flush from multiple threads concurrently, better
155+
* performance is achieved when called from the same thread or when serialized wrt
156+
* to other threads.
140157
*/
141158
void flush();
142159

@@ -216,6 +233,15 @@ class CPPKAFKA_API BufferedProducer {
216233
*/
217234
void set_produce_failure_callback(ProduceFailureCallback callback);
218235

236+
/**
237+
* \brief Sets the successful delivery callback
238+
*
239+
* The user can use this function to cleanup any application-owned message buffers.
240+
*
241+
* \param callback The callback to be set
242+
*/
243+
void set_produce_success_callback(ProduceSuccessCallback callback);
244+
219245
/**
220246
* \brief Sets the local message produce failure callback
221247
*
@@ -231,7 +257,7 @@ class CPPKAFKA_API BufferedProducer {
231257
void set_flush_failure_callback(FlushFailureCallback callback);
232258

233259
private:
234-
using QueueType = std::list<Builder>;
260+
using QueueType = std::deque<Builder>;
235261
enum class MessagePriority { Low, High };
236262

237263
template <typename BuilderType>
@@ -242,23 +268,31 @@ class CPPKAFKA_API BufferedProducer {
242268
void on_delivery_report(const Message& message);
243269

244270
// Members
245-
Configuration::DeliveryReportCallback delivery_report_callback_;
246271
Producer producer_;
247272
QueueType messages_;
248273
mutable std::mutex mutex_;
274+
ProduceSuccessCallback produce_success_callback_;
249275
ProduceFailureCallback produce_failure_callback_;
250276
FlushFailureCallback flush_failure_callback_;
251277
ssize_t max_buffer_size_{-1};
252278
std::atomic_ulong expected_acks_{0};
253279
std::atomic_ullong total_messages_acked_{0};
254280
};
255281

282+
template <typename BufferType>
283+
Producer::PayloadPolicy get_default_payload_policy() {
284+
return Producer::PayloadPolicy::COPY_PAYLOAD;
285+
}
286+
287+
template <> inline
288+
Producer::PayloadPolicy get_default_payload_policy<Buffer>() {
289+
return Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD;
290+
}
291+
256292
template <typename BufferType>
257293
BufferedProducer<BufferType>::BufferedProducer(Configuration config)
258-
: delivery_report_callback_(config.get_delivery_report_callback()),
259-
producer_(prepare_configuration(std::move(config))) {
260-
// Allow re-queuing failed messages
261-
producer_.set_payload_policy(Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD);
294+
: producer_(prepare_configuration(std::move(config))) {
295+
producer_.set_payload_policy(get_default_payload_policy<BufferType>());
262296
}
263297

264298
template <typename BufferType>
@@ -392,6 +426,11 @@ void BufferedProducer<BufferType>::set_produce_failure_callback(ProduceFailureCa
392426
produce_failure_callback_ = std::move(callback);
393427
}
394428

429+
template <typename BufferType>
430+
void BufferedProducer<BufferType>::set_produce_success_callback(ProduceSuccessCallback callback) {
431+
produce_success_callback_ = std::move(callback);
432+
}
433+
395434
template <typename BufferType>
396435
void BufferedProducer<BufferType>::set_flush_failure_callback(FlushFailureCallback callback) {
397436
flush_failure_callback_ = std::move(callback);
@@ -433,11 +472,6 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
433472
--expected_acks_;
434473
assert(expected_acks_ != (unsigned long)-1); // Prevent underflow
435474

436-
// Call the user-supplied delivery report callback if any
437-
if (delivery_report_callback_) {
438-
delivery_report_callback_(producer_, message);
439-
}
440-
441475
// We should produce this message again if it has an error and we either don't have a
442476
// produce failure callback or we have one but it returns true
443477
bool should_produce = message.get_error() &&
@@ -447,6 +481,10 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
447481
do_add_message(Builder(message), MessagePriority::High, false);
448482
}
449483
else {
484+
// Successful delivery
485+
if (produce_success_callback_) {
486+
produce_success_callback_(message);
487+
}
450488
// Increment the total successful transmissions
451489
++total_messages_acked_;
452490
}

0 commit comments

Comments
 (0)