Skip to content

Commit 0b686a3

Browse files
author
accelerated
committed
intial polling version
1 parent ae74814 commit 0b686a3

13 files changed

+735
-51
lines changed

Diff for: include/cppkafka/consumer.h

+49-17
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
#include <chrono>
3636
#include <functional>
3737
#include "kafka_handle_base.h"
38-
#include "message.h"
38+
#include "queue.h"
3939
#include "macros.h"
4040
#include "error.h"
4141

@@ -54,7 +54,7 @@ class TopicConfiguration;
5454
* Semi-simple code showing how to use this class
5555
*
5656
* \code
57-
* // Create a configuration and set the group.id and broker list fields
57+
* // Create a configuration and set the group.id and broker list fields
5858
* Configuration config = {
5959
* { "metadata.broker.list", "127.0.0.1:9092" },
6060
* { "group.id", "foo" }
@@ -74,13 +74,13 @@ class TopicConfiguration;
7474
* consumer.set_revocation_callback([&](const TopicPartitionList& topic_partitions) {
7575
* cout << topic_partitions.size() << " partitions revoked!" << endl;
7676
* });
77-
*
78-
* // Subscribe
77+
*
78+
* // Subscribe
7979
* consumer.subscribe({ "my_topic" });
8080
* while (true) {
8181
* // Poll. This will optionally return a message. It's necessary to check if it's a valid
8282
* // one before using it
83-
* Message msg = consumer.poll();
83+
* Message msg = consumer.poll();
8484
* if (msg) {
8585
* if (!msg.get_error()) {
8686
* // It's an actual message. Get the payload and print it to stdout
@@ -103,12 +103,12 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
103103

104104
/**
105105
* \brief Creates an instance of a consumer.
106-
*
107-
* Note that the configuration *must contain* the group.id attribute set or this
106+
*
107+
* Note that the configuration *must contain* the group.id attribute set or this
108108
* will throw.
109109
*
110110
* \param config The configuration to be used
111-
*/
111+
*/
112112
Consumer(Configuration config);
113113
Consumer(const Consumer&) = delete;
114114
Consumer(Consumer&&) = delete;
@@ -124,7 +124,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
124124

125125
/**
126126
* \brief Sets the topic/partition assignment callback
127-
*
127+
*
128128
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
129129
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
130130
* and executing the assignment/revocation/rebalance_error callbacks.
@@ -138,7 +138,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
138138

139139
/**
140140
* \brief Sets the topic/partition revocation callback
141-
*
141+
*
142142
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
143143
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
144144
* and executing the assignment/revocation/rebalance_error callbacks.
@@ -153,7 +153,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
153153

154154
/**
155155
* \brief Sets the rebalance error callback
156-
*
156+
*
157157
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
158158
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
159159
* and executing the assignment/revocation/rebalance_error callbacks.
@@ -188,9 +188,9 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
188188
/**
189189
* \brief Unassigns the current topic/partition assignment
190190
*
191-
* This translates into a call to rd_kafka_assign using a null as the topic partition list
191+
* This translates into a call to rd_kafka_assign using a null as the topic partition list
192192
* parameter
193-
*/
193+
*/
194194
void unassign();
195195

196196
/**
@@ -252,7 +252,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
252252
*
253253
* This translates into a call to rd_kafka_get_watermark_offsets
254254
*
255-
* \param topic_partition The topic/partition to get the offsets from
255+
* \param topic_partition The topic/partition to get the offsets from
256256
*/
257257
OffsetTuple get_offsets(const TopicPartition& topic_partition) const;
258258

@@ -316,16 +316,16 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
316316
* This will call rd_kafka_consumer_poll.
317317
*
318318
* Note that you need to call poll periodically as a keep alive mechanism, otherwise the broker
319-
* will think this consumer is down and will trigger a rebalance (if using dynamic
319+
* will think this consumer is down and will trigger a rebalance (if using dynamic
320320
* subscription).
321321
*
322322
* The timeout used on this call will be the one configured via Consumer::set_timeout.
323323
*
324324
* The returned message *might* be empty. If's necessary to check that it's a valid one before
325325
* using it:
326-
*
326+
*
327327
* \code
328-
* Message msg = consumer.poll();
328+
* Message msg = consumer.poll();
329329
* if (msg) {
330330
* // It's a valid message!
331331
* }
@@ -361,6 +361,38 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
361361
* \param timeout The timeout for this operation
362362
*/
363363
std::vector<Message> poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
364+
365+
/**
366+
* \brief Get the global event queue servicing this consumer corresponding to
367+
* rd_kafka_queue_get_main and which is polled via rd_kafka_poll
368+
*
369+
* \return A Queue object
370+
*
371+
* \remark Note that this call will disable forwarding to the consumer_queue.
372+
* To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue)
373+
*/
374+
Queue get_main_queue() const;
375+
376+
/**
377+
* \brief Get the consumer group queue servicing corresponding to
378+
* rd_kafka_queue_get_consumer and which is polled via rd_kafka_consumer_poll
379+
*
380+
* \return A Queue object
381+
*/
382+
Queue get_consumer_queue() const;
383+
384+
/**
385+
* \brief Get the queue belonging to this partition. If the consumer is not assigned to this
386+
* partition, an empty queue will be returned
387+
*
388+
* \param partition The partition object
389+
*
390+
* \return A Queue object
391+
*
392+
* \remark Note that this call will disable forwarding to the consumer_queue.
393+
* To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue)
394+
*/
395+
Queue get_partition_queue(const TopicPartition& partition) const;
364396
private:
365397
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
366398
rd_kafka_topic_partition_list_t *partitions, void *opaque);

Diff for: include/cppkafka/cppkafka.h

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include <cppkafka/message_builder.h>
4747
#include <cppkafka/metadata.h>
4848
#include <cppkafka/producer.h>
49+
#include <cppkafka/queue.h>
4950
#include <cppkafka/topic.h>
5051
#include <cppkafka/topic_configuration.h>
5152
#include <cppkafka/topic_partition.h>
@@ -55,5 +56,6 @@
5556
#include <cppkafka/utils/buffered_producer.h>
5657
#include <cppkafka/utils/compacted_topic_processor.h>
5758
#include <cppkafka/utils/consumer_dispatcher.h>
59+
#include <cppkafka/utils/roundrobin_poll_adapter.h>
5860

5961
#endif

Diff for: include/cppkafka/exceptions.h

+12
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,18 @@ class CPPKAFKA_API ConsumerException : public Exception {
122122
Error error_;
123123
};
124124

125+
/**
126+
* Queue exception for rd_kafka_queue_t errors
127+
*/
128+
class CPPKAFKA_API QueueException : public Exception {
129+
public:
130+
QueueException(Error error);
131+
132+
Error get_error() const;
133+
private:
134+
Error error_;
135+
};
136+
125137
} // cppkafka
126138

127139
#endif // CPPKAFKA_EXCEPTIONS_H

Diff for: include/cppkafka/message.h

+2
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ class CPPKAFKA_API Message {
177177
Buffer key_;
178178
};
179179

180+
using MessageList = std::vector<Message>;
181+
180182
/**
181183
* Represents a message's timestamp
182184
*/

Diff for: include/cppkafka/queue.h

+183
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#include <vector>
31+
#include <memory>
32+
#include <boost/optional.hpp>
33+
#include <librdkafka/rdkafka.h>
34+
#include "macros.h"
35+
#include "message.h"
36+
37+
#ifndef CPPKAFKA_QUEUE_H
38+
#define CPPKAFKA_QUEUE_H
39+
40+
namespace cppkafka {
41+
/**
42+
* \brief Represents a rdkafka queue
43+
*
44+
* This is a simple wrapper over a rd_kafka_queue_t*
45+
*/
46+
class CPPKAFKA_API Queue {
47+
public:
48+
/**
49+
* \brief Creates a Queue object that doesn't take ownership of the handle
50+
*
51+
* \param handle The handle to be used
52+
*/
53+
static Queue make_non_owning(rd_kafka_queue_t* handle);
54+
55+
/**
56+
* \brief Constructs an empty queue
57+
*
58+
* Note that using any methods except Queue::get_handle on an empty queue is undefined
59+
* behavior
60+
*/
61+
Queue();
62+
63+
/**
64+
* \brief Constructs a queue using a handle
65+
*
66+
* This will take ownership of the handle
67+
*
68+
* \param handle The handle to be used
69+
*/
70+
Queue(rd_kafka_queue_t* handle);
71+
72+
/**
73+
* Returns the rdkakfa handle
74+
*/
75+
rd_kafka_queue_t* get_handle() const;
76+
77+
/**
78+
* \brief Returns the length of the queue
79+
*
80+
* This translates to a call to rd_kafka_queue_length
81+
*/
82+
size_t get_length() const;
83+
84+
/**
85+
* \brief Forward to another queue
86+
*
87+
* This translates to a call to rd_kafka_queue_forward
88+
*/
89+
void forward_to_queue(const Queue& forward_queue) const;
90+
91+
/**
92+
* \brief Disable forwarding to another queue
93+
*
94+
* This translates to a call to rd_kafka_queue_forward(NULL)
95+
*/
96+
void disable_queue_forwarding() const;
97+
98+
/**
99+
* \brief Sets the timeout for consume operations
100+
*
101+
* This timeout is applied when calling consume()
102+
*
103+
* \param timeout The timeout to be set
104+
*/
105+
void set_consume_timeout(std::chrono::milliseconds timeout);
106+
107+
/**
108+
* Gets the configured timeout.
109+
*
110+
* \sa Queue::set_timeout
111+
*/
112+
std::chrono::milliseconds get_consume_timeout() const;
113+
114+
/**
115+
* \brief Consume a message from this queue
116+
*
117+
* This translates to a call to rd_kafka_consume_queue using the configured timeout for this object
118+
*
119+
* \return A message
120+
*/
121+
Message consume() const;
122+
123+
/**
124+
* \brief Consume a message from this queue
125+
*
126+
* Same as consume() but the specified timeout will be used instead of the configured one
127+
*
128+
* \param timeout The timeout to be used on this call
129+
*
130+
* \return A message
131+
*/
132+
Message consume(std::chrono::milliseconds timeout) const;
133+
134+
/**
135+
* \brief Consumes a batch of messages from this queue
136+
*
137+
* This translates to a call to rd_kafka_consume_batch_queue using the configured timeout for this object
138+
*
139+
* \param max_batch_size The max number of messages to consume if available
140+
*
141+
* \return A list of messages. Could be empty if there's nothing to consume
142+
*/
143+
MessageList consume_batch(size_t max_batch_size) const;
144+
145+
/**
146+
* \brief Consumes a batch of messages from this queue
147+
*
148+
* Same as Queue::consume_batch(size_t) but the specified timeout will be used instead of the configured one
149+
*
150+
* \param max_batch_size The max number of messages to consume if available
151+
*
152+
* \param timeout The timeout to be used on this call
153+
*
154+
* \return A list of messages. Could be empty if there's nothing to consume
155+
*/
156+
MessageList consume_batch(size_t max_batch_size, std::chrono::milliseconds timeout) const;
157+
158+
/**
159+
* Indicates whether this queue is valid (not null)
160+
*/
161+
explicit operator bool() const {
162+
return handle_ != nullptr;
163+
}
164+
165+
private:
166+
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
167+
168+
using HandlePtr = std::unique_ptr<rd_kafka_queue_t, decltype(&rd_kafka_queue_destroy)>;
169+
170+
struct NonOwningTag { };
171+
172+
Queue(rd_kafka_queue_t* handle, NonOwningTag);
173+
174+
// Members
175+
HandlePtr handle_;
176+
std::chrono::milliseconds timeout_ms_;
177+
};
178+
179+
using QueueList = std::vector<Queue>;
180+
181+
} // cppkafka
182+
183+
#endif //CPPKAFKA_QUEUE_H

0 commit comments

Comments
 (0)