Skip to content

Callback invoker to sink all thrown exceptions #74

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
link_libraries(cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread)
link_libraries(cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread rt ssl crypto dl z)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include)
include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR})

Expand Down
1 change: 1 addition & 0 deletions include/cppkafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "queue.h"
#include "macros.h"
#include "error.h"
#include "detail/callback_invoker.h"

namespace cppkafka {

Expand Down
127 changes: 127 additions & 0 deletions include/cppkafka/detail/callback_invoker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

#ifndef CPPKAFKA_CALLBACK_INVOKER_H
#define CPPKAFKA_CALLBACK_INVOKER_H

#include <sstream>
#include <assert.h>
#include "../logging.h"
#include "../kafka_handle_base.h"

namespace cppkafka {

// Error values
template <typename T>
T error_value() { return T{}; }

template<> inline
void error_value<void>() {};

template<> inline
bool error_value<bool>() { return false; }

template<> inline
int error_value<int>() { return -1; }

/**
* \brief Wraps an std::function object and runs it while preventing all exceptions from escaping
* \tparam Func An std::function object
*/
template <typename Func>
class CallbackInvoker
{
public:
using RetType = typename Func::result_type;
using LogCallback = std::function<void(KafkaHandleBase& handle,
int level,
const std::string& facility,
const std::string& message)>;
CallbackInvoker(const char* callback_name,
const Func& callback,
KafkaHandleBase* handle)
: callback_name_(callback_name),
callback_(callback),
handle_(handle) {
}

explicit operator bool() const {
return (bool)callback_;
}

template <typename ...Args>
RetType operator()(Args&&... args) const {
static const char* library_name = "cppkafka";
std::ostringstream error_msg;
try {
if (callback_) {
return callback_(std::forward<Args>(args)...);
}
return error_value<RetType>();
}
catch (const std::exception& ex) {
if (handle_) {
error_msg << "Caught exception in " << callback_name_ << " callback: " << ex.what();
}
}
catch (...) {
if (handle_) {
error_msg << "Caught unknown exception in " << callback_name_ << " callback";
}
}
// Log error
if (handle_) {
if (handle_->get_configuration().get_log_callback()) {
try {
// Log it
handle_->get_configuration().get_log_callback()(*handle_,
static_cast<int>(LogLevel::LOG_ERR),
library_name,
error_msg.str());
}
catch (...) {} // sink everything
}
else {
rd_kafka_log_print(handle_->get_handle(),
static_cast<int>(LogLevel::LOG_ERR),
library_name,
error_msg.str().c_str());
}
}
return error_value<RetType>();
}
private:
const char* callback_name_;
const Func& callback_;
KafkaHandleBase* handle_;
};

}

#endif
12 changes: 6 additions & 6 deletions include/cppkafka/utils/backoff_committer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <chrono>
#include <functional>
#include <thread>
#include <string>
#include "../consumer.h"
#include "backoff_performer.h"

Expand Down Expand Up @@ -118,6 +119,7 @@ class BackoffCommitter : public BackoffPerformer {
*/
void commit(const TopicPartitionList& topic_partitions);
private:
// Return true to abort and false to continue committing
template <typename T>
bool do_commit(const T& object) {
try {
Expand All @@ -131,13 +133,11 @@ class BackoffCommitter : public BackoffPerformer {
if (ex.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET) {
return true;
}
// If there's a callback and it returns false for this message, abort
if (callback_ && !callback_(ex.get_error())) {
return true;
}
// If there's a callback and it returns false for this message, abort.
// Otherwise keep committing.
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
return callback && !callback(ex.get_error());
}
// In any other case, we failed. Keep committing
return false;
}

Consumer& consumer_;
Expand Down
25 changes: 12 additions & 13 deletions include/cppkafka/utils/buffered_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,9 @@ void BufferedProducer<BufferType>::flush() {
produce_message(flush_queue.front());
}
catch (const HandleException& ex) {
if (flush_failure_callback_ &&
flush_failure_callback_(flush_queue.front(), ex.get_error())) {
// retry again later
// If we have a flush failure callback and it returns true, we retry producing this message later
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
if (callback && callback(flush_queue.front(), ex.get_error())) {
do_add_message(std::move(flush_queue.front()), MessagePriority::Low, false);
}
}
Expand Down Expand Up @@ -519,19 +519,18 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
--pending_acks_;
assert(pending_acks_ != (size_t)-1); // Prevent underflow

// We should produce this message again if it has an error and we either don't have a
// produce failure callback or we have one but it returns true
bool should_produce = message.get_error() &&
(!produce_failure_callback_ || produce_failure_callback_(message));
if (should_produce) {
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
do_add_message(Builder(message), MessagePriority::High, false);
if (message.get_error()) {
// We should produce this message again if we don't have a produce failure callback
// or we have one but it returns true
CallbackInvoker<ProduceFailureCallback> callback("produce failure", produce_failure_callback_, &producer_);
if (!callback || callback(message)) {
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
do_add_message(Builder(message), MessagePriority::High, false);
}
}
else {
// Successful delivery
if (produce_success_callback_) {
produce_success_callback_(message);
}
CallbackInvoker<ProduceSuccessCallback>("delivery success", produce_success_callback_, &producer_)(message);
// Increment the total successful transmissions
++total_messages_produced_;
}
Expand Down
52 changes: 21 additions & 31 deletions src/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,66 +52,56 @@ namespace cppkafka {
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
Producer* handle = static_cast<Producer*>(opaque);
Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
const auto& callback = handle->get_configuration().get_delivery_report_callback();
if (callback) {
callback(*handle, message);
}
CallbackInvoker<Configuration::DeliveryReportCallback>
("delivery report", handle->get_configuration().get_delivery_report_callback(), handle)
(*handle, message);
}

void offset_commit_callback_proxy(rd_kafka_t*, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets, void *opaque) {
Consumer* handle = static_cast<Consumer*>(opaque);
TopicPartitionList list = offsets ? convert(offsets) : TopicPartitionList{};
const auto& callback = handle->get_configuration().get_offset_commit_callback();
if (callback) {
callback(*handle, err, list);
}
CallbackInvoker<Configuration::OffsetCommitCallback>
("offset commit", handle->get_configuration().get_offset_commit_callback(), handle)
(*handle, err, list);
}

void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
const auto& callback = handle->get_configuration().get_error_callback();
if (callback) {
callback(*handle, err, reason);
}
CallbackInvoker<Configuration::ErrorCallback>
("error", handle->get_configuration().get_error_callback(), handle)
(*handle, err, reason);
}

void throttle_callback_proxy(rd_kafka_t*, const char* broker_name,
int32_t broker_id, int throttle_time_ms, void *opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
const auto& callback = handle->get_configuration().get_throttle_callback();
if (callback) {
callback(*handle, broker_name, broker_id, milliseconds(throttle_time_ms));
}
CallbackInvoker<Configuration::ThrottleCallback>
("throttle", handle->get_configuration().get_throttle_callback(), handle)
(*handle, broker_name, broker_id, milliseconds(throttle_time_ms));
}

void log_callback_proxy(const rd_kafka_t* h, int level,
const char* facility, const char* message) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(rd_kafka_opaque(h));
const auto& callback = handle->get_configuration().get_log_callback();
if (callback) {
callback(*handle, level, facility, message);
}
CallbackInvoker<Configuration::LogCallback>
("log", handle->get_configuration().get_log_callback(), nullptr)
(*handle, level, facility, message);
}

int stats_callback_proxy(rd_kafka_t*, char *json, size_t json_len, void *opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
const auto& callback = handle->get_configuration().get_stats_callback();
if (callback) {
callback(*handle, string(json, json + json_len));
}
CallbackInvoker<Configuration::StatsCallback>
("statistics", handle->get_configuration().get_stats_callback(), handle)
(*handle, string(json, json + json_len));
return 0;
}

int socket_callback_proxy(int domain, int type, int protocol, void* opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
const auto& callback = handle->get_configuration().get_socket_callback();
if (callback) {
return callback(domain, type, protocol);
}
else {
return -1;
}
return CallbackInvoker<Configuration::SocketCallback>
("socket", handle->get_configuration().get_socket_callback(), handle)
(domain, type, protocol);
}

// Configuration
Expand Down
21 changes: 8 additions & 13 deletions src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "logging.h"
#include "configuration.h"
#include "topic_partition_list.h"
#include "detail/callback_invoker.h"

using std::vector;
using std::string;
Expand Down Expand Up @@ -79,12 +80,12 @@ Consumer::~Consumer() {
close();
}
catch (const Exception& ex) {
constexpr const char* library_name = "cppkafka";
const char* library_name = "cppkafka";
ostringstream error_msg;
error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what();
const auto& callback = get_configuration().get_log_callback();
if (callback) {
callback(*this, static_cast<int>(LogLevel::LOG_ERR), library_name, error_msg.str());
CallbackInvoker<Configuration::LogCallback> logger("log", get_configuration().get_log_callback(), nullptr);
if (logger) {
logger(*this, static_cast<int>(LogLevel::LOG_ERR), library_name, error_msg.str());
}
else {
rd_kafka_log_print(get_handle(), static_cast<int>(LogLevel::LOG_ERR), library_name, error_msg.str().c_str());
Expand Down Expand Up @@ -292,21 +293,15 @@ void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) {
void Consumer::handle_rebalance(rd_kafka_resp_err_t error,
TopicPartitionList& topic_partitions) {
if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
if (assignment_callback_) {
assignment_callback_(topic_partitions);
}
CallbackInvoker<AssignmentCallback>("assignment", assignment_callback_, this)(topic_partitions);
assign(topic_partitions);
}
else if (error == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) {
if (revocation_callback_) {
revocation_callback_(topic_partitions);
}
CallbackInvoker<RevocationCallback>("revocation", revocation_callback_, this)(topic_partitions);
unassign();
}
else {
if (rebalance_error_callback_) {
rebalance_error_callback_(error);
}
CallbackInvoker<RebalanceErrorCallback>("rebalance error", rebalance_error_callback_, this)(error);
unassign();
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/topic_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "exceptions.h"
#include "topic.h"
#include "buffer.h"
#include "detail/callback_invoker.h"

using std::string;
using std::map;
Expand All @@ -49,7 +50,8 @@ int32_t partitioner_callback_proxy(const rd_kafka_topic_t* handle, const void *k
if (callback) {
Topic topic = Topic::make_non_owning(const_cast<rd_kafka_topic_t*>(handle));
Buffer key(static_cast<const char*>(key_ptr), key_size);
return callback(topic, key, partition_count);
return CallbackInvoker<TopicConfiguration::PartitionerCallback>("topic partitioner", callback, nullptr)
(topic, key, partition_count);
}
else {
return rd_kafka_msg_partitioner_consistent_random(handle, key_ptr, key_size,
Expand Down
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ add_executable(
# Main file
test_main.cpp
)
target_link_libraries(cppkafka_tests cppkafka ${RDKAFKA_LIBRARY} pthread)
target_link_libraries(cppkafka_tests cppkafka ${RDKAFKA_LIBRARY} pthread rt ssl crypto dl z)
add_dependencies(tests cppkafka_tests)
add_test(cppkafka cppkafka_tests)