Skip to content

Commit 577bbb0

Browse files
acceleratedmfontanini
authored andcommitted
added error check for partition list (#90)
1 parent 6158d93 commit 577bbb0

File tree

3 files changed

+41
-14
lines changed

3 files changed

+41
-14
lines changed

Diff for: include/cppkafka/kafka_handle_base.h

+2
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ class CPPKAFKA_API KafkaHandleBase {
253253

254254
void set_handle(rd_kafka_t* handle);
255255
void check_error(rd_kafka_resp_err_t error) const;
256+
void check_error(rd_kafka_resp_err_t error,
257+
const rd_kafka_topic_partition_list_t* list_ptr) const;
256258
rd_kafka_conf_t* get_configuration_handle();
257259
private:
258260
static const std::chrono::milliseconds DEFAULT_TIMEOUT;

Diff for: src/consumer.cpp

+21-11
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,16 @@ void Consumer::unsubscribe() {
121121
}
122122

123123
void Consumer::assign(const TopicPartitionList& topic_partitions) {
124-
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
125-
// If the list is empty, then we need to use a null pointer
126-
auto handle = topic_partitions.empty() ? nullptr : topic_list_handle.get();
127-
rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), handle);
128-
check_error(error);
124+
rd_kafka_resp_err_t error;
125+
if (topic_partitions.empty()) {
126+
error = rd_kafka_assign(get_handle(), nullptr);
127+
check_error(error);
128+
}
129+
else {
130+
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
131+
error = rd_kafka_assign(get_handle(), topic_list_handle.get());
132+
check_error(error, topic_list_handle.get());
133+
}
129134
}
130135

131136
void Consumer::unassign() {
@@ -181,15 +186,15 @@ Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) cons
181186
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
182187
rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(),
183188
static_cast<int>(get_timeout().count()));
184-
check_error(error);
189+
check_error(error, topic_list_handle.get());
185190
return convert(topic_list_handle);
186191
}
187192

188193
TopicPartitionList
189194
Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const {
190195
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
191196
rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get());
192-
check_error(error);
197+
check_error(error, topic_list_handle.get());
193198
return convert(topic_list_handle);
194199
}
195200

@@ -287,10 +292,15 @@ void Consumer::commit(const Message& msg, bool async) {
287292

288293
void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) {
289294
rd_kafka_resp_err_t error;
290-
error = rd_kafka_commit(get_handle(),
291-
!topic_partitions ? nullptr : convert(*topic_partitions).get(),
292-
async ? 1 : 0);
293-
check_error(error);
295+
if (topic_partitions == nullptr) {
296+
error = rd_kafka_commit(get_handle(), nullptr, async ? 1 : 0);
297+
check_error(error);
298+
}
299+
else {
300+
TopicPartitionsListPtr topic_list_handle = convert(*topic_partitions);
301+
error = rd_kafka_commit(get_handle(), topic_list_handle.get(), async ? 1 : 0);
302+
check_error(error, topic_list_handle.get());
303+
}
294304
}
295305

296306
void Consumer::handle_rebalance(rd_kafka_resp_err_t error,

Diff for: src/kafka_handle_base.cpp

+18-3
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partition
6161
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
6262
rd_kafka_resp_err_t error = rd_kafka_pause_partitions(get_handle(),
6363
topic_list_handle.get());
64-
check_error(error);
64+
check_error(error, topic_list_handle.get());
6565
}
6666

6767
void KafkaHandleBase::pause(const std::string& topic) {
@@ -72,7 +72,7 @@ void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitio
7272
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
7373
rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(),
7474
topic_list_handle.get());
75-
check_error(error);
75+
check_error(error, topic_list_handle.get());
7676
}
7777

7878
void KafkaHandleBase::resume(const std::string& topic) {
@@ -153,7 +153,7 @@ KafkaHandleBase::get_offsets_for_times(const TopicPartitionsTimestampsMap& queri
153153
const int timeout = static_cast<int>(timeout_ms_.count());
154154
rd_kafka_resp_err_t result = rd_kafka_offsets_for_times(handle_.get(), topic_list_handle.get(),
155155
timeout);
156-
check_error(result);
156+
check_error(result, topic_list_handle.get());
157157
return convert(topic_list_handle);
158158
}
159159

@@ -228,6 +228,21 @@ void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) const {
228228
}
229229
}
230230

231+
void KafkaHandleBase::check_error(rd_kafka_resp_err_t error,
232+
const rd_kafka_topic_partition_list_t* list_ptr) const {
233+
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
234+
throw HandleException(error);
235+
}
236+
if (list_ptr) {
237+
//check if any partition has errors
238+
for (int i = 0; i < list_ptr->cnt; ++i) {
239+
if (list_ptr->elems[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) {
240+
throw HandleException(error);
241+
}
242+
}
243+
}
244+
}
245+
231246
rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() {
232247
return config_.get_handle();
233248
}

0 commit comments

Comments
 (0)