Skip to content

Commit 9af4330

Browse files
acceleratedmfontanini
authored andcommitted
Allocators (#118)
* Added allocator support for consumers and buffered producer * Changed MessageList back to std::vector<Message> for consistency with the allocator API
1 parent d77e746 commit 9af4330

10 files changed

+247
-165
lines changed

Diff for: include/cppkafka/consumer.h

+37-2
Original file line numberDiff line numberDiff line change
@@ -379,10 +379,14 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
379379
* This can return one or more messages
380380
*
381381
* \param max_batch_size The maximum amount of messages expected
382+
* \param alloc The optionally supplied allocator for allocating messages
382383
*
383384
* \return A list of messages
384385
*/
385-
MessageList poll_batch(size_t max_batch_size);
386+
template <typename Allocator>
387+
std::vector<Message, Allocator> poll_batch(size_t max_batch_size,
388+
const Allocator& alloc);
389+
std::vector<Message> poll_batch(size_t max_batch_size);
386390

387391
/**
388392
* \brief Polls for a batch of messages
@@ -391,10 +395,16 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
391395
*
392396
* \param max_batch_size The maximum amount of messages expected
393397
* \param timeout The timeout for this operation
398+
* \param alloc The optionally supplied allocator for allocating messages
394399
*
395400
* \return A list of messages
396401
*/
397-
MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
402+
template <typename Allocator>
403+
std::vector<Message, Allocator> poll_batch(size_t max_batch_size,
404+
std::chrono::milliseconds timeout,
405+
const Allocator& alloc);
406+
std::vector<Message> poll_batch(size_t max_batch_size,
407+
std::chrono::milliseconds timeout);
398408

399409
/**
400410
* \brief Get the global event queue servicing this consumer corresponding to
@@ -430,6 +440,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
430440
private:
431441
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
432442
rd_kafka_topic_partition_list_t *partitions, void *opaque);
443+
static Queue get_queue(rd_kafka_queue_t* handle);
433444
void close();
434445
void commit(const Message& msg, bool async);
435446
void commit(const TopicPartitionList* topic_partitions, bool async);
@@ -440,6 +451,30 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
440451
RebalanceErrorCallback rebalance_error_callback_;
441452
};
442453

454+
// Implementations
455+
template <typename Allocator>
456+
std::vector<Message, Allocator> Consumer::poll_batch(size_t max_batch_size,
457+
const Allocator& alloc) {
458+
return poll_batch(max_batch_size, get_timeout(), alloc);
459+
}
460+
461+
template <typename Allocator>
462+
std::vector<Message, Allocator> Consumer::poll_batch(size_t max_batch_size,
463+
std::chrono::milliseconds timeout,
464+
const Allocator& alloc) {
465+
std::vector<rd_kafka_message_t*> raw_messages(max_batch_size);
466+
// Note that this will leak the queue when using rdkafka < 0.11.5 (see get_queue comment)
467+
Queue queue(get_queue(rd_kafka_queue_get_consumer(get_handle())));
468+
ssize_t result = rd_kafka_consume_batch_queue(queue.get_handle() , timeout.count(), raw_messages.data(),
469+
raw_messages.size());
470+
if (result == -1) {
471+
check_error(rd_kafka_last_error());
472+
// on the off-chance that check_error() does not throw an error
473+
return std::vector<Message, Allocator>(alloc);
474+
}
475+
return std::vector<Message, Allocator>(raw_messages.begin(), raw_messages.begin() + result, alloc);
476+
}
477+
443478
} // cppkafka
444479

445480
#endif // CPP_KAFKA_CONSUMER_H

Diff for: include/cppkafka/queue.h

+40-2
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,14 @@ class CPPKAFKA_API Queue {
138138
*
139139
* \param max_batch_size The max number of messages to consume if available
140140
*
141+
* \param alloc The optionally supplied allocator for the message list
142+
*
141143
* \return A list of messages. Could be empty if there's nothing to consume
142144
*/
143-
MessageList consume_batch(size_t max_batch_size) const;
145+
template <typename Allocator>
146+
std::vector<Message, Allocator> consume_batch(size_t max_batch_size,
147+
const Allocator& alloc) const;
148+
std::vector<Message> consume_batch(size_t max_batch_size) const;
144149

145150
/**
146151
* \brief Consumes a batch of messages from this queue
@@ -151,9 +156,16 @@ class CPPKAFKA_API Queue {
151156
*
152157
* \param timeout The timeout to be used on this call
153158
*
159+
* \param alloc The optionally supplied allocator for the message list
160+
*
154161
* \return A list of messages. Could be empty if there's nothing to consume
155162
*/
156-
MessageList consume_batch(size_t max_batch_size, std::chrono::milliseconds timeout) const;
163+
template <typename Allocator>
164+
std::vector<Message, Allocator> consume_batch(size_t max_batch_size,
165+
std::chrono::milliseconds timeout,
166+
const Allocator& alloc) const;
167+
std::vector<Message> consume_batch(size_t max_batch_size,
168+
std::chrono::milliseconds timeout) const;
157169

158170
/**
159171
* Indicates whether this queue is valid (not null)
@@ -178,6 +190,32 @@ class CPPKAFKA_API Queue {
178190

179191
using QueueList = std::vector<Queue>;
180192

193+
template <typename Allocator>
194+
std::vector<Message, Allocator> Queue::consume_batch(size_t max_batch_size,
195+
const Allocator& alloc) const {
196+
return consume_batch(max_batch_size, timeout_ms_, alloc);
197+
}
198+
199+
template <typename Allocator>
200+
std::vector<Message, Allocator> Queue::consume_batch(size_t max_batch_size,
201+
std::chrono::milliseconds timeout,
202+
const Allocator& alloc) const {
203+
std::vector<rd_kafka_message_t*> raw_messages(max_batch_size);
204+
ssize_t result = rd_kafka_consume_batch_queue(handle_.get(),
205+
static_cast<int>(timeout.count()),
206+
raw_messages.data(),
207+
raw_messages.size());
208+
if (result == -1) {
209+
rd_kafka_resp_err_t error = rd_kafka_last_error();
210+
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
211+
throw QueueException(error);
212+
}
213+
return std::vector<Message, Allocator>(alloc);
214+
}
215+
// Build message list
216+
return std::vector<Message, Allocator>(raw_messages.begin(), raw_messages.begin() + result, alloc);
217+
}
218+
181219
} // cppkafka
182220

183221
#endif //CPPKAFKA_QUEUE_H

0 commit comments

Comments
 (0)