Skip to content

Commit c5aca98

Browse files
acceleratedmfontanini
authored andcommitted
Invoke error callback if present instead of log callback (#93)
1 parent eb46b88 commit c5aca98

File tree

2 files changed

+16
-10
lines changed

2 files changed

+16
-10
lines changed

Diff for: include/cppkafka/configuration.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,22 @@ class KafkaHandleBase;
6262
class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
6363
public:
6464
using DeliveryReportCallback = std::function<void(Producer& producer, const Message&)>;
65-
using OffsetCommitCallback = std::function<void(Consumer& consumer, Error,
65+
using OffsetCommitCallback = std::function<void(Consumer& consumer,
66+
Error error,
6667
const TopicPartitionList& topic_partitions)>;
67-
using ErrorCallback = std::function<void(KafkaHandleBase& handle, int error,
68+
using ErrorCallback = std::function<void(KafkaHandleBase& handle,
69+
int error,
6870
const std::string& reason)>;
6971
using ThrottleCallback = std::function<void(KafkaHandleBase& handle,
7072
const std::string& broker_name,
7173
int32_t broker_id,
7274
std::chrono::milliseconds throttle_time)>;
73-
using LogCallback = std::function<void(KafkaHandleBase& handle, int level,
75+
using LogCallback = std::function<void(KafkaHandleBase& handle,
76+
int level,
7477
const std::string& facility,
7578
const std::string& message)>;
7679
using StatsCallback = std::function<void(KafkaHandleBase& handle, const std::string& json)>;
77-
using SocketCallback = std::function<int(int domain, int type, int protoco)>;
80+
using SocketCallback = std::function<int(int domain, int type, int protocol)>;
7881

7982
using ConfigurationBase<Configuration>::set;
8083
using ConfigurationBase<Configuration>::get;

Diff for: src/consumer.cpp

+9-6
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,19 @@ Consumer::~Consumer() {
7979
rebalance_error_callback_ = nullptr;
8080
close();
8181
}
82-
catch (const Exception& ex) {
83-
const char* library_name = "cppkafka";
82+
catch (const HandleException& ex) {
8483
ostringstream error_msg;
8584
error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what();
86-
CallbackInvoker<Configuration::LogCallback> logger("log", get_configuration().get_log_callback(), nullptr);
87-
if (logger) {
88-
logger(*this, static_cast<int>(LogLevel::LOG_ERR), library_name, error_msg.str());
85+
CallbackInvoker<Configuration::ErrorCallback> error_cb("error", get_configuration().get_error_callback(), this);
86+
CallbackInvoker<Configuration::LogCallback> logger_cb("log", get_configuration().get_log_callback(), nullptr);
87+
if (error_cb) {
88+
error_cb(*this, static_cast<int>(ex.get_error().get_error()), error_msg.str());
89+
}
90+
else if (logger_cb) {
91+
logger_cb(*this, static_cast<int>(LogLevel::LOG_ERR), "cppkafka", error_msg.str());
8992
}
9093
else {
91-
rd_kafka_log_print(get_handle(), static_cast<int>(LogLevel::LOG_ERR), library_name, error_msg.str().c_str());
94+
rd_kafka_log_print(get_handle(), static_cast<int>(LogLevel::LOG_ERR), "cppkafka", error_msg.str().c_str());
9295
}
9396
}
9497
}

0 commit comments

Comments
 (0)