Skip to content

Commit b8fdb4c

Browse files
author
accelerated
committed
Changes per code review
1 parent 951b0d5 commit b8fdb4c

8 files changed

+29
-46
lines changed

Diff for: include/cppkafka/utils/poll_interface.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ struct PollInterface {
7272
*
7373
* Each call to poll() will first consume from the global event queue and if there are
7474
* no pending events, will attempt to consume from all partitions until a valid message is found.
75-
* The timeout used on this call will be the one configured via RoundRobinPollStrategy::set_timeout.
75+
* The timeout used on this call will be the one configured via PollInterface::set_timeout.
7676
*
7777
* \return A message. The returned message *might* be empty. It's necessary to check
7878
* that it's a valid one before using it (see example above).
@@ -86,7 +86,7 @@ struct PollInterface {
8686
/**
8787
* \brief Polls for new messages
8888
*
89-
* Same as the other overload of RoundRobinPollStrategy::poll but the provided
89+
* Same as the other overload of PollInterface::poll but the provided
9090
* timeout will be used instead of the one configured on this Consumer.
9191
*
9292
* \param timeout The timeout to be used on this call
@@ -113,7 +113,7 @@ struct PollInterface {
113113
/**
114114
* \brief Polls all assigned partitions for a batch of new messages in round-robin fashion
115115
*
116-
* Same as the other overload of RoundRobinPollStrategy::poll_batch but the provided
116+
* Same as the other overload of PollInterface::poll_batch but the provided
117117
* timeout will be used instead of the one configured on this Consumer.
118118
*
119119
* \param max_batch_size The maximum amount of messages expected

Diff for: include/cppkafka/utils/poll_strategy_base.h

+3-16
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,16 @@ namespace cppkafka {
4343
* related (user-specific) information.
4444
*/
4545
struct QueueData {
46-
Queue queue_;
47-
boost::any metadata_;
46+
Queue queue;
47+
boost::any metadata;
4848
};
4949

5050
/**
5151
* \class PollStrategyBase
5252
*
5353
* \brief Base implementation of the PollInterface
5454
*/
55-
class PollStrategyBase : public PollInterface
56-
{
55+
class PollStrategyBase : public PollInterface {
5756
public:
5857
using QueueMap = std::map<TopicPartition, QueueData>;
5958

@@ -99,18 +98,6 @@ class PollStrategyBase : public PollInterface
9998
*/
10099
QueueData& get_consumer_queue();
101100

102-
/**
103-
* \brief Return the next queue to be processed
104-
*
105-
* Depending on the polling strategy, each implementation must define it's own algorithm for
106-
* determining the next queue to poll.
107-
*
108-
* \param opaque Application specific data which can help determine the next queue.
109-
*
110-
* \return A partition queue
111-
*/
112-
virtual QueueData& get_next_queue(void* opaque = nullptr) = 0;
113-
114101
/**
115102
* \brief Reset the internal state of the queues.
116103
*

Diff for: include/cppkafka/utils/roundrobin_poll_strategy.h

+3-7
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,7 @@ namespace cppkafka {
8383
* the Consumer instance it owns.
8484
*/
8585

86-
class RoundRobinPollStrategy : public PollStrategyBase
87-
{
86+
class RoundRobinPollStrategy : public PollStrategyBase {
8887
public:
8988
RoundRobinPollStrategy(Consumer& consumer);
9089

@@ -112,16 +111,13 @@ class RoundRobinPollStrategy : public PollStrategyBase
112111
std::chrono::milliseconds timeout) override;
113112

114113
protected:
115-
/**
116-
* \sa PollStrategyBase::get_next_queue
117-
*/
118-
QueueData& get_next_queue(void* opaque = nullptr) final;
119-
120114
/**
121115
* \sa PollStrategyBase::reset_state
122116
*/
123117
void reset_state() final;
124118

119+
QueueData& get_next_queue();
120+
125121
private:
126122
void consume_batch(Queue& queue,
127123
MessageList& messages,

Diff for: src/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ set(SOURCES
1919

2020
utils/backoff_performer.cpp
2121
utils/backoff_committer.cpp
22-
utils/poll_strategy_base.cpp
22+
utils/poll_strategy_base.cpp
2323
utils/roundrobin_poll_strategy.cpp
2424
)
2525

Diff for: src/utils/roundrobin_poll_strategy.cpp

+9-10
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,20 @@ Message RoundRobinPollStrategy::poll() {
5151

5252
Message RoundRobinPollStrategy::poll(milliseconds timeout) {
5353
// Always give priority to group and global events
54-
Message message = get_consumer_queue().queue_.consume(milliseconds(0));
54+
Message message = get_consumer_queue().queue.consume(milliseconds(0));
5555
if (message) {
5656
return message;
5757
}
5858
size_t num_queues = get_partition_queues().size();
5959
while (num_queues--) {
6060
//consume the next partition (non-blocking)
61-
message = get_next_queue().queue_.consume(milliseconds(0));
61+
message = get_next_queue().queue.consume(milliseconds(0));
6262
if (message) {
6363
return message;
6464
}
6565
}
6666
// We still don't have a valid message so we block on the event queue
67-
return get_consumer_queue().queue_.consume(timeout);
67+
return get_consumer_queue().queue.consume(timeout);
6868
}
6969

7070
MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size) {
@@ -76,25 +76,24 @@ MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size, millisecon
7676
ssize_t count = max_batch_size;
7777

7878
// batch from the group event queue first (non-blocking)
79-
consume_batch(get_consumer_queue().queue_, messages, count, milliseconds(0));
79+
consume_batch(get_consumer_queue().queue, messages, count, milliseconds(0));
8080
size_t num_queues = get_partition_queues().size();
8181
while ((count > 0) && (num_queues--)) {
8282
// batch from the next partition (non-blocking)
83-
consume_batch(get_next_queue().queue_, messages, count, milliseconds(0));
83+
consume_batch(get_next_queue().queue, messages, count, milliseconds(0));
8484
}
8585
// we still have space left in the buffer
8686
if (count > 0) {
8787
// wait on the event queue until timeout
88-
consume_batch(get_consumer_queue().queue_, messages, count, timeout);
88+
consume_batch(get_consumer_queue().queue, messages, count, timeout);
8989
}
9090
return messages;
9191
}
9292

9393
void RoundRobinPollStrategy::consume_batch(Queue& queue,
9494
MessageList& messages,
9595
ssize_t& count,
96-
milliseconds timeout)
97-
{
96+
milliseconds timeout) {
9897
MessageList queue_messages = queue.consume_batch(count, timeout);
9998
if (queue_messages.empty()) {
10099
return;
@@ -111,11 +110,11 @@ void RoundRobinPollStrategy::consume_batch(Queue& queue,
111110
void RoundRobinPollStrategy::restore_forwarding() {
112111
// forward all partition queues
113112
for (const auto& toppar : get_partition_queues()) {
114-
toppar.second.queue_.forward_to_queue(get_consumer_queue().queue_);
113+
toppar.second.queue.forward_to_queue(get_consumer_queue().queue);
115114
}
116115
}
117116

118-
QueueData& RoundRobinPollStrategy::get_next_queue(void* opaque) {
117+
QueueData& RoundRobinPollStrategy::get_next_queue() {
119118
if (get_partition_queues().empty()) {
120119
throw QueueException(RD_KAFKA_RESP_ERR__STATE);
121120
}

Diff for: tests/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ add_executable(
1818
kafka_handle_base_test.cpp
1919
producer_test.cpp
2020
consumer_test.cpp
21-
roundrobin_poll_test.cpp
21+
roundrobin_poll_test.cpp
2222

2323
# Main file
2424
test_main.cpp

Diff for: tests/roundrobin_poll_test.cpp

+8-6
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,7 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
108108

109109
// Create a consumer and subscribe to the topic
110110
PollStrategyAdapter consumer(make_consumer_config());
111-
TopicPartitionList partitions;
112-
for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace_back(KAFKA_TOPICS[0], i++));
113-
consumer.assign(partitions);
111+
consumer.subscribe({ KAFKA_TOPICS[0] });
114112
consumer.add_polling_strategy(unique_ptr<PollInterface>(new RoundRobinPollStrategy(consumer)));
115113

116114
PollConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS);
@@ -130,10 +128,14 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
130128
REQUIRE(runner.get_messages().size() == total_messages);
131129

132130
// Check that we have one message from each partition in desired order
133-
vector<int> partition_order = make_roundrobin_partition_vector(total_messages);
134-
131+
vector<int> partition_order = make_roundrobin_partition_vector(total_messages+KAFKA_NUM_PARTITIONS);
132+
int partition_idx;
135133
for (int i = 0; i < total_messages; ++i) {
136-
REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+1]);
134+
if (i == 0) {
135+
// find first polled partition index
136+
partition_idx = runner.get_messages()[i].get_partition();
137+
}
138+
REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+partition_idx]);
137139
REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
138140
}
139141

Diff for: tests/test_utils.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ class BasicConsumerRunner {
4141
* \brief Specific implementation which can be used with other
4242
* util classes such as BasicConsumerDispatcher.
4343
*/
44-
class PollStrategyAdapter : public Consumer
45-
{
44+
class PollStrategyAdapter : public Consumer {
4645
public:
4746
PollStrategyAdapter(Configuration config);
4847
void add_polling_strategy(std::unique_ptr<PollInterface> poll_strategy);

0 commit comments

Comments
 (0)