Skip to content

Commit 507a135

Browse files
author
accelerated
committed
initial version
1 parent f543810 commit 507a135

8 files changed

+183
-46
lines changed

include/cppkafka/callback_invoker.h

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#ifndef CPPKAFKA_CALLBACK_INVOKER_H
31+
#define CPPKAFKA_CALLBACK_INVOKER_H
32+
33+
#include <sstream>
34+
#include <assert.h>
35+
#include "logging.h"
36+
#include "kafka_handle_base.h"
37+
38+
namespace cppkafka {
39+
40+
// Success values
41+
template <typename T>
42+
T success_value() { return T(); }
43+
44+
template<> inline
45+
void success_value<void>() {};
46+
47+
template<> inline
48+
bool success_value<bool>() { return true; }
49+
50+
// Error values
51+
template <typename T>
52+
T error_value() { return T(); }
53+
54+
template<> inline
55+
void error_value<void>() {};
56+
57+
template<> inline
58+
bool error_value<bool>() { return false; }
59+
60+
template<> inline
61+
int error_value<int>() { return -1; }
62+
63+
// CallbackInvoker
64+
template <typename RetType, typename ...Args>
65+
class CallbackInvoker;
66+
67+
template <typename RetType, typename ...Args>
68+
class CallbackInvoker<RetType(Args...)>
69+
{
70+
public:
71+
using Func = std::function<RetType(Args...)>;
72+
using LogCallback = std::function<void(KafkaHandleBase& handle,
73+
int level,
74+
const std::string& facility,
75+
const std::string& message)>;
76+
CallbackInvoker(const std::string& callback_name,
77+
const Func& callback,
78+
KafkaHandleBase* handle)
79+
: callback_name_(callback_name),
80+
callback_(callback),
81+
handle_(handle) {
82+
}
83+
RetType operator()(Args... args) const {
84+
static const char* library_name = "cppkafka";
85+
std::ostringstream error_msg;
86+
try {
87+
if (callback_) {
88+
return callback_(std::forward<Args>(args)...);
89+
}
90+
return success_value<RetType>();
91+
}
92+
catch (const std::exception& ex) {
93+
if (handle_) {
94+
error_msg << "Caught exception in " << callback_name_ << " callback: " << ex.what();
95+
}
96+
}
97+
catch (...) {
98+
if (handle_) {
99+
error_msg << "Caught unknown exception in " << callback_name_ << " callback";
100+
}
101+
}
102+
// Log error
103+
if (handle_) {
104+
if (handle_->get_configuration().get_log_callback()) {
105+
try {
106+
// Log it
107+
handle_->get_configuration().get_log_callback()(*handle_,
108+
static_cast<int>(LogLevel::LOG_ERR),
109+
library_name,
110+
error_msg.str());
111+
}
112+
catch (...) {} // sink everything
113+
}
114+
else {
115+
rd_kafka_log_print(handle_->get_handle(), static_cast<int>(LogLevel::LOG_ERR),
116+
library_name, error_msg.str().c_str());
117+
}
118+
}
119+
return error_value<RetType>();
120+
}
121+
private:
122+
const std::string& callback_name_;
123+
const Func& callback_;
124+
KafkaHandleBase* handle_;
125+
};
126+
127+
}
128+
129+
#endif

include/cppkafka/consumer.h

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "message.h"
3939
#include "macros.h"
4040
#include "error.h"
41+
#include "callback_invoker.h"
4142

4243
namespace cppkafka {
4344

include/cppkafka/cppkafka.h

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#define CPPKAFKA_H
3232

3333
#include <cppkafka/buffer.h>
34+
#include <cppkafka/callback_invoker.h>
3435
#include <cppkafka/clonable_ptr.h>
3536
#include <cppkafka/configuration.h>
3637
#include <cppkafka/configuration_base.h>

include/cppkafka/utils/backoff_committer.h

+6-6
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <chrono>
3434
#include <functional>
3535
#include <thread>
36+
#include <string>
3637
#include "../consumer.h"
3738
#include "backoff_performer.h"
3839

@@ -118,6 +119,7 @@ class BackoffCommitter : public BackoffPerformer {
118119
*/
119120
void commit(const TopicPartitionList& topic_partitions);
120121
private:
122+
// Return true to abort and false to continue committing
121123
template <typename T>
122124
bool do_commit(const T& object) {
123125
try {
@@ -126,18 +128,16 @@ class BackoffCommitter : public BackoffPerformer {
126128
return true;
127129
}
128130
catch (const HandleException& ex) {
131+
static const std::string callback_name("backoff committer");
129132
// If there were actually no offsets to commit, return. Retrying won't solve
130133
// anything here
131134
if (ex.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET) {
132135
return true;
133136
}
134-
// If there's a callback and it returns false for this message, abort
135-
if (callback_ && !callback_(ex.get_error())) {
136-
return true;
137-
}
137+
// If there's a callback and it returns false for this message, abort.
138+
// Otherwise keep committing.
139+
return !CallbackInvoker<bool(Error)>(callback_name, callback_, &consumer_)(ex.get_error());
138140
}
139-
// In any other case, we failed. Keep committing
140-
return false;
141141
}
142142

143143
Consumer& consumer_;

include/cppkafka/utils/buffered_producer.h

+8-4
Original file line numberDiff line numberDiff line change
@@ -357,14 +357,18 @@ Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration
357357

358358
template <typename BufferType>
359359
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
360+
static const std::string dr_callback_name("delivery report");
361+
static const std::string pf_callback_name("produce failure");
362+
360363
// Call the user-supplied delivery report callback if any
361-
if (delivery_report_callback_) {
362-
delivery_report_callback_(producer_, message);
363-
}
364+
CallbackInvoker<void(Producer& producer, const Message&)>
365+
(dr_callback_name, delivery_report_callback_, &producer_)(producer_, message);
366+
364367
// We should produce this message again if it has an error and we either don't have a
365368
// produce failure callback or we have one but it returns true
366369
bool should_produce = message.get_error() &&
367-
(!produce_failure_callback_ || produce_failure_callback_(message));
370+
CallbackInvoker<bool(const Message&)>
371+
(pf_callback_name, produce_failure_callback_, &producer_)(message);
368372
if (should_produce) {
369373
produce_message(message);
370374
return;

src/configuration.cpp

+26-26
Original file line numberDiff line numberDiff line change
@@ -50,39 +50,39 @@ namespace cppkafka {
5050
// Callback proxies
5151

5252
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
53+
static const string callback_name("delivery report");
5354
Producer* handle = static_cast<Producer*>(opaque);
5455
Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
55-
const auto& callback = handle->get_configuration().get_delivery_report_callback();
56-
if (callback) {
57-
callback(*handle, message);
58-
}
56+
CallbackInvoker<void(Producer&, const Message&)>
57+
(callback_name, handle->get_configuration().get_delivery_report_callback(), handle)
58+
(*handle, message);
5959
}
6060

6161
void offset_commit_callback_proxy(rd_kafka_t*, rd_kafka_resp_err_t err,
6262
rd_kafka_topic_partition_list_t *offsets, void *opaque) {
63+
static const string callback_name("offset commit");
6364
Consumer* handle = static_cast<Consumer*>(opaque);
6465
TopicPartitionList list = offsets ? convert(offsets) : TopicPartitionList{};
65-
const auto& callback = handle->get_configuration().get_offset_commit_callback();
66-
if (callback) {
67-
callback(*handle, err, list);
68-
}
66+
CallbackInvoker<void(Consumer&, Error, const TopicPartitionList&)>
67+
(callback_name, handle->get_configuration().get_offset_commit_callback(), handle)
68+
(*handle, err, list);
6969
}
7070

7171
void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque) {
72+
static const string callback_name("error");
7273
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
73-
const auto& callback = handle->get_configuration().get_error_callback();
74-
if (callback) {
75-
callback(*handle, err, reason);
76-
}
74+
CallbackInvoker<void(KafkaHandleBase&, int, const std::string&)>
75+
(callback_name, handle->get_configuration().get_error_callback(), handle)
76+
(*handle, err, reason);
7777
}
7878

7979
void throttle_callback_proxy(rd_kafka_t*, const char* broker_name,
8080
int32_t broker_id, int throttle_time_ms, void *opaque) {
81+
static const string callback_name("throttle");
8182
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
82-
const auto& callback = handle->get_configuration().get_throttle_callback();
83-
if (callback) {
84-
callback(*handle, broker_name, broker_id, milliseconds(throttle_time_ms));
85-
}
83+
CallbackInvoker<void(KafkaHandleBase&, const std::string&, int32_t, std::chrono::milliseconds)>
84+
(callback_name, handle->get_configuration().get_throttle_callback(), handle)
85+
(*handle, broker_name, broker_id, milliseconds(throttle_time_ms));
8686
}
8787

8888
void log_callback_proxy(const rd_kafka_t* h, int level,
@@ -95,23 +95,23 @@ void log_callback_proxy(const rd_kafka_t* h, int level,
9595
}
9696

9797
int stats_callback_proxy(rd_kafka_t*, char *json, size_t json_len, void *opaque) {
98+
static const string callback_name("statistics");
9899
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
99-
const auto& callback = handle->get_configuration().get_stats_callback();
100-
if (callback) {
101-
callback(*handle, string(json, json + json_len));
102-
}
100+
CallbackInvoker<void(KafkaHandleBase&, const std::string&)>
101+
(callback_name, handle->get_configuration().get_stats_callback(), handle)
102+
(*handle, string(json, json + json_len));
103103
return 0;
104104
}
105105

106106
int socket_callback_proxy(int domain, int type, int protocol, void* opaque) {
107+
static const string callback_name("socket");
107108
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
108-
const auto& callback = handle->get_configuration().get_socket_callback();
109-
if (callback) {
110-
return callback(domain, type, protocol);
111-
}
112-
else {
113-
return -1;
109+
if (handle->get_configuration().get_socket_callback()) {
110+
return CallbackInvoker<int(int, int, int)>
111+
(callback_name, handle->get_configuration().get_socket_callback(), handle)
112+
(domain, type, protocol);
114113
}
114+
return -1;
115115
}
116116

117117
// Configuration

src/consumer.cpp

+7-9
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "logging.h"
3535
#include "configuration.h"
3636
#include "topic_partition_list.h"
37+
#include "callback_invoker.h"
3738

3839
using std::vector;
3940
using std::string;
@@ -282,21 +283,18 @@ void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) {
282283
void Consumer::handle_rebalance(rd_kafka_resp_err_t error,
283284
TopicPartitionList& topic_partitions) {
284285
if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
285-
if (assignment_callback_) {
286-
assignment_callback_(topic_partitions);
287-
}
286+
static const string callback_name("assignment");
287+
CallbackInvoker<void(TopicPartitionList&)>(callback_name, assignment_callback_, this)(topic_partitions);
288288
assign(topic_partitions);
289289
}
290290
else if (error == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) {
291-
if (revocation_callback_) {
292-
revocation_callback_(topic_partitions);
293-
}
291+
static const string callback_name("revocation");
292+
CallbackInvoker<void(const TopicPartitionList&)>(callback_name, revocation_callback_, this)(topic_partitions);
294293
unassign();
295294
}
296295
else {
297-
if (rebalance_error_callback_) {
298-
rebalance_error_callback_(error);
299-
}
296+
static const string callback_name("rebalance error");
297+
CallbackInvoker<void(Error)>(callback_name, rebalance_error_callback_, this)(error);
300298
unassign();
301299
}
302300
}

src/topic_configuration.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "exceptions.h"
3434
#include "topic.h"
3535
#include "buffer.h"
36+
#include "callback_invoker.h"
3637

3738
using std::string;
3839
using std::map;
@@ -47,9 +48,12 @@ int32_t partitioner_callback_proxy(const rd_kafka_topic_t* handle, const void *k
4748
const TopicConfiguration* config = static_cast<TopicConfiguration*>(topic_opaque);
4849
const auto& callback = config->get_partitioner_callback();
4950
if (callback) {
51+
static const string callback_name("topic partitioner");
5052
Topic topic = Topic::make_non_owning(const_cast<rd_kafka_topic_t*>(handle));
5153
Buffer key(static_cast<const char*>(key_ptr), key_size);
52-
return callback(topic, key, partition_count);
54+
return CallbackInvoker<int32_t(const Topic&, const Buffer&, int32_t)>
55+
(callback_name, callback, nullptr)
56+
(topic, key, partition_count);
5357
}
5458
else {
5559
return rd_kafka_msg_partitioner_consistent_random(handle, key_ptr, key_size,

0 commit comments

Comments
 (0)