Skip to content

Commit 332beb1

Browse files
author
accelerated
committed
Changes as per code review
1 parent 286bc53 commit 332beb1

File tree

8 files changed

+62
-80
lines changed

8 files changed

+62
-80
lines changed

examples/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
link_libraries(cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread)
1+
link_libraries(cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread rt ssl crypto dl z)
22
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include)
33
include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR})
44

include/cppkafka/callback_invoker.h

+21-23
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,9 @@
3737

3838
namespace cppkafka {
3939

40-
// Success values
41-
template <typename T>
42-
T success_value() { return T(); }
43-
44-
template<> inline
45-
void success_value<void>() {};
46-
47-
template<> inline
48-
bool success_value<bool>() { return true; }
49-
5040
// Error values
5141
template <typename T>
52-
T error_value() { return T(); }
42+
T error_value() { return T{}; }
5343

5444
template<> inline
5545
void error_value<void>() {};
@@ -60,34 +50,40 @@ bool error_value<bool>() { return false; }
6050
template<> inline
6151
int error_value<int>() { return -1; }
6252

63-
// CallbackInvoker
64-
template <typename RetType, typename ...Args>
65-
class CallbackInvoker;
66-
67-
template <typename RetType, typename ...Args>
68-
class CallbackInvoker<RetType(Args...)>
53+
/**
54+
* \brief Wraps an std::function object and runs it while preventing all exceptions from escaping
55+
* \tparam Func An std::function object
56+
*/
57+
template <typename Func>
58+
class CallbackInvoker
6959
{
7060
public:
71-
using Func = std::function<RetType(Args...)>;
61+
using RetType = typename Func::result_type;
7262
using LogCallback = std::function<void(KafkaHandleBase& handle,
7363
int level,
7464
const std::string& facility,
7565
const std::string& message)>;
76-
CallbackInvoker(const std::string& callback_name,
66+
CallbackInvoker(const char* callback_name,
7767
const Func& callback,
7868
KafkaHandleBase* handle)
7969
: callback_name_(callback_name),
8070
callback_(callback),
8171
handle_(handle) {
8272
}
83-
RetType operator()(Args... args) const {
73+
74+
explicit operator bool() const {
75+
return (bool)callback_;
76+
}
77+
78+
template <typename ...Args>
79+
RetType operator()(Args&&... args) const {
8480
static const char* library_name = "cppkafka";
8581
std::ostringstream error_msg;
8682
try {
8783
if (callback_) {
8884
return callback_(std::forward<Args>(args)...);
8985
}
90-
return success_value<RetType>();
86+
return error_value<RetType>();
9187
}
9288
catch (const std::exception& ex) {
9389
if (handle_) {
@@ -112,8 +108,10 @@ class CallbackInvoker<RetType(Args...)>
112108
catch (...) {} // sink everything
113109
}
114110
else {
115-
rd_kafka_log_print(handle_->get_handle(), static_cast<int>(LogLevel::LOG_ERR),
116-
library_name, error_msg.str().c_str());
111+
rd_kafka_log_print(handle_->get_handle(),
112+
static_cast<int>(LogLevel::LOG_ERR),
113+
library_name,
114+
error_msg.str().c_str());
117115
}
118116
}
119117
return error_value<RetType>();

include/cppkafka/utils/backoff_committer.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ class BackoffCommitter : public BackoffPerformer {
128128
return true;
129129
}
130130
catch (const HandleException& ex) {
131-
static const std::string callback_name("backoff committer");
132131
// If there were actually no offsets to commit, return. Retrying won't solve
133132
// anything here
134133
if (ex.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET) {
135134
return true;
136135
}
137136
// If there's a callback and it returns false for this message, abort.
138137
// Otherwise keep committing.
139-
return !CallbackInvoker<bool(Error)>(callback_name, callback_, &consumer_)(ex.get_error());
138+
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
139+
return callback && !callback(ex.get_error());
140140
}
141141
}
142142

include/cppkafka/utils/buffered_producer.h

+13-14
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,9 @@ void BufferedProducer<BufferType>::flush() {
364364
produce_message(flush_queue.front());
365365
}
366366
catch (const HandleException& ex) {
367-
if (flush_failure_callback_ &&
368-
flush_failure_callback_(flush_queue.front(), ex.get_error())) {
369-
// retry again later
367+
// If we have a flush failure callback and it returns true, we retry producing this message later
368+
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
369+
if (callback && callback(flush_queue.front(), ex.get_error())) {
370370
do_add_message(std::move(flush_queue.front()), MessagePriority::Low, false);
371371
}
372372
}
@@ -515,23 +515,22 @@ Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration
515515

516516
template <typename BufferType>
517517
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
518-
static const std::string ds_callback_name("delivery success");
519-
static const std::string pf_callback_name("produce failure");
520518
// Decrement the expected acks
521519
--pending_acks_;
522520
assert(pending_acks_ != (size_t)-1); // Prevent underflow
523-
// We should produce this message again if it has an error and we either don't have a
524-
// produce failure callback or we have one but it returns true
525-
bool should_produce = message.get_error() &&
526-
CallbackInvoker<bool(const Message&)>
527-
(pf_callback_name, produce_failure_callback_, &producer_)(message);
528-
if (should_produce) {
529-
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
530-
do_add_message(Builder(message), MessagePriority::High, false);
521+
522+
if (message.get_error()) {
523+
// We should produce this message again if we don't have a produce failure callback
524+
// or we have one but it returns true
525+
CallbackInvoker<ProduceFailureCallback> callback("produce failure", produce_failure_callback_, &producer_);
526+
if (!callback || callback(message)) {
527+
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
528+
do_add_message(Builder(message), MessagePriority::High, false);
529+
}
531530
}
532531
else {
533532
// Successful delivery
534-
CallbackInvoker<void(const Message&)>(ds_callback_name, produce_success_callback_, &producer_)(message);
533+
CallbackInvoker<ProduceSuccessCallback>("delivery success", produce_success_callback_, &producer_)(message);
535534
// Increment the total successful transmissions
536535
++total_messages_produced_;
537536
}

src/configuration.cpp

+16-26
Original file line numberDiff line numberDiff line change
@@ -50,68 +50,58 @@ namespace cppkafka {
5050
// Callback proxies
5151

5252
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
53-
static const string callback_name("delivery report");
5453
Producer* handle = static_cast<Producer*>(opaque);
5554
Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
56-
CallbackInvoker<void(Producer&, const Message&)>
57-
(callback_name, handle->get_configuration().get_delivery_report_callback(), handle)
55+
CallbackInvoker<Configuration::DeliveryReportCallback>
56+
("delivery report", handle->get_configuration().get_delivery_report_callback(), handle)
5857
(*handle, message);
5958
}
6059

6160
void offset_commit_callback_proxy(rd_kafka_t*, rd_kafka_resp_err_t err,
6261
rd_kafka_topic_partition_list_t *offsets, void *opaque) {
63-
static const string callback_name("offset commit");
6462
Consumer* handle = static_cast<Consumer*>(opaque);
6563
TopicPartitionList list = offsets ? convert(offsets) : TopicPartitionList{};
66-
CallbackInvoker<void(Consumer&, Error, const TopicPartitionList&)>
67-
(callback_name, handle->get_configuration().get_offset_commit_callback(), handle)
64+
CallbackInvoker<Configuration::OffsetCommitCallback>
65+
("offset commit", handle->get_configuration().get_offset_commit_callback(), handle)
6866
(*handle, err, list);
6967
}
7068

7169
void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque) {
72-
static const string callback_name("error");
7370
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
74-
CallbackInvoker<void(KafkaHandleBase&, int, const std::string&)>
75-
(callback_name, handle->get_configuration().get_error_callback(), handle)
71+
CallbackInvoker<Configuration::ErrorCallback>
72+
("error", handle->get_configuration().get_error_callback(), handle)
7673
(*handle, err, reason);
7774
}
7875

7976
void throttle_callback_proxy(rd_kafka_t*, const char* broker_name,
8077
int32_t broker_id, int throttle_time_ms, void *opaque) {
81-
static const string callback_name("throttle");
8278
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
83-
CallbackInvoker<void(KafkaHandleBase&, const std::string&, int32_t, std::chrono::milliseconds)>
84-
(callback_name, handle->get_configuration().get_throttle_callback(), handle)
79+
CallbackInvoker<Configuration::ThrottleCallback>
80+
("throttle", handle->get_configuration().get_throttle_callback(), handle)
8581
(*handle, broker_name, broker_id, milliseconds(throttle_time_ms));
8682
}
8783

8884
void log_callback_proxy(const rd_kafka_t* h, int level,
8985
const char* facility, const char* message) {
9086
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(rd_kafka_opaque(h));
91-
const auto& callback = handle->get_configuration().get_log_callback();
92-
if (callback) {
93-
callback(*handle, level, facility, message);
94-
}
87+
CallbackInvoker<Configuration::LogCallback>
88+
("log", handle->get_configuration().get_log_callback(), nullptr)
89+
(*handle, level, facility, message);
9590
}
9691

9792
int stats_callback_proxy(rd_kafka_t*, char *json, size_t json_len, void *opaque) {
98-
static const string callback_name("statistics");
9993
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
100-
CallbackInvoker<void(KafkaHandleBase&, const std::string&)>
101-
(callback_name, handle->get_configuration().get_stats_callback(), handle)
94+
CallbackInvoker<Configuration::StatsCallback>
95+
("statistics", handle->get_configuration().get_stats_callback(), handle)
10296
(*handle, string(json, json + json_len));
10397
return 0;
10498
}
10599

106100
int socket_callback_proxy(int domain, int type, int protocol, void* opaque) {
107-
static const string callback_name("socket");
108101
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
109-
if (handle->get_configuration().get_socket_callback()) {
110-
return CallbackInvoker<int(int, int, int)>
111-
(callback_name, handle->get_configuration().get_socket_callback(), handle)
112-
(domain, type, protocol);
113-
}
114-
return -1;
102+
return CallbackInvoker<Configuration::SocketCallback>
103+
("socket", handle->get_configuration().get_socket_callback(), handle)
104+
(domain, type, protocol);
115105
}
116106

117107
// Configuration

src/consumer.cpp

+7-10
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,12 @@ Consumer::~Consumer() {
8080
close();
8181
}
8282
catch (const Exception& ex) {
83-
constexpr const char* library_name = "cppkafka";
83+
const char* library_name = "cppkafka";
8484
ostringstream error_msg;
8585
error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what();
86-
const auto& callback = get_configuration().get_log_callback();
87-
if (callback) {
88-
callback(*this, static_cast<int>(LogLevel::LOG_ERR), library_name, error_msg.str());
86+
CallbackInvoker<Configuration::LogCallback> logger("log", get_configuration().get_log_callback(), nullptr);
87+
if (logger) {
88+
logger(*this, static_cast<int>(LogLevel::LOG_ERR), library_name, error_msg.str());
8989
}
9090
else {
9191
rd_kafka_log_print(get_handle(), static_cast<int>(LogLevel::LOG_ERR), library_name, error_msg.str().c_str());
@@ -293,18 +293,15 @@ void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) {
293293
void Consumer::handle_rebalance(rd_kafka_resp_err_t error,
294294
TopicPartitionList& topic_partitions) {
295295
if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
296-
static const string callback_name("assignment");
297-
CallbackInvoker<void(TopicPartitionList&)>(callback_name, assignment_callback_, this)(topic_partitions);
296+
CallbackInvoker<AssignmentCallback>("assignment", assignment_callback_, this)(topic_partitions);
298297
assign(topic_partitions);
299298
}
300299
else if (error == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) {
301-
static const string callback_name("revocation");
302-
CallbackInvoker<void(const TopicPartitionList&)>(callback_name, revocation_callback_, this)(topic_partitions);
300+
CallbackInvoker<RevocationCallback>("revocation", revocation_callback_, this)(topic_partitions);
303301
unassign();
304302
}
305303
else {
306-
static const string callback_name("rebalance error");
307-
CallbackInvoker<void(Error)>(callback_name, rebalance_error_callback_, this)(error);
304+
CallbackInvoker<RebalanceErrorCallback>("rebalance error", rebalance_error_callback_, this)(error);
308305
unassign();
309306
}
310307
}

src/topic_configuration.cpp

+1-3
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,9 @@ int32_t partitioner_callback_proxy(const rd_kafka_topic_t* handle, const void *k
4848
const TopicConfiguration* config = static_cast<TopicConfiguration*>(topic_opaque);
4949
const auto& callback = config->get_partitioner_callback();
5050
if (callback) {
51-
static const string callback_name("topic partitioner");
5251
Topic topic = Topic::make_non_owning(const_cast<rd_kafka_topic_t*>(handle));
5352
Buffer key(static_cast<const char*>(key_ptr), key_size);
54-
return CallbackInvoker<int32_t(const Topic&, const Buffer&, int32_t)>
55-
(callback_name, callback, nullptr)
53+
return CallbackInvoker<TopicConfiguration::PartitionerCallback>("topic partitioner", callback, nullptr)
5654
(topic, key, partition_count);
5755
}
5856
else {

tests/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ add_executable(
2323
# Main file
2424
test_main.cpp
2525
)
26-
target_link_libraries(cppkafka_tests cppkafka ${RDKAFKA_LIBRARY} pthread)
26+
target_link_libraries(cppkafka_tests cppkafka ${RDKAFKA_LIBRARY} pthread rt ssl crypto dl z)
2727
add_dependencies(tests cppkafka_tests)
2828
add_test(cppkafka cppkafka_tests)

0 commit comments

Comments
 (0)