forked from mfontanini/cppkafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka_handle_base.h
277 lines (245 loc) · 8.35 KB
/
kafka_handle_base.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_KAFKA_HANDLE_BASE_H
#define CPPKAFKA_KAFKA_HANDLE_BASE_H
#include <string>
#include <memory>
#include <chrono>
#include <unordered_map>
#include <map>
#include <mutex>
#include <tuple>
#include <chrono>
#include <librdkafka/rdkafka.h>
#include "group_information.h"
#include "topic_partition.h"
#include "topic_partition_list.h"
#include "topic_configuration.h"
#include "configuration.h"
#include "macros.h"
namespace cppkafka {
class Topic;
class Metadata;
class TopicMetadata;
class GroupInformation;
/**
* Base class for kafka consumer/producer
*/
class CPPKAFKA_API KafkaHandleBase {
public:
using OffsetTuple = std::tuple<int64_t, int64_t>;
using TopicPartitionsTimestampsMap = std::map<TopicPartition, std::chrono::milliseconds>;
virtual ~KafkaHandleBase() = default;
KafkaHandleBase(const KafkaHandleBase&) = delete;
KafkaHandleBase(KafkaHandleBase&&) = delete;
KafkaHandleBase& operator=(const KafkaHandleBase&) = delete;
KafkaHandleBase& operator=(KafkaHandleBase&&) = delete;
/**
* \brief Pauses consumption/production from the given topic/partition list
*
* This translates into a call to rd_kafka_pause_partitions
*
* \param topic_partitions The topic/partition list to pause consuming/producing from/to
*/
void pause_partitions(const TopicPartitionList& topic_partitions);
/**
* \brief Pauses consumption/production for this topic
*
* \param topic The topic name
*/
void pause(const std::string& topic);
/**
* \brief Resumes consumption/production from the given topic/partition list
*
* This translates into a call to rd_kafka_resume_partitions
*
* \param topic_partitions The topic/partition list to resume consuming/producing from/to
*/
void resume_partitions(const TopicPartitionList& topic_partitions);
/**
* \brief Resumes consumption/production for this topic
*
* \param topic The topic name
*/
void resume(const std::string& topic);
/**
* \brief Sets the timeout for operations that require a timeout
*
* This timeout is applied to operations like polling, querying for offsets, etc
*
* \param timeout The timeout to be set
*/
void set_timeout(std::chrono::milliseconds timeout);
/**
* \brief Adds one or more brokers to this handle's broker list
*
* This calls rd_kafka_brokers_add using the provided broker list.
*
* \param brokers The broker list endpoint string
*/
void add_brokers(const std::string& brokers);
/**
* \brief Queries the offset for the given topic/partition
*
* This translates into a call to rd_kafka_query_watermark_offsets
*
* \param topic_partition The topic/partition to be queried
*
* \return A pair of watermark offsets {low, high}
*/
OffsetTuple query_offsets(const TopicPartition& topic_partition) const;
/**
* \brief Gets the rdkafka handle
*
* \return The rdkafka handle
*/
rd_kafka_t* get_handle() const;
/**
* \brief Creates a topic handle
*
* This translates into a call to rd_kafka_topic_new. This will use the default topic
* configuration provided in the Configuration object for this consumer/producer handle,
* if any.
*
* \param name The name of the topic to be created
*
* \return A topic
*/
Topic get_topic(const std::string& name);
/**
* \brief Creates a topic handle
*
* This translates into a call to rd_kafka_topic_new.
*
* \param name The name of the topic to be created
* \param config The configuration to be used for the new topic
*
* \return A topic
*/
Topic get_topic(const std::string& name, TopicConfiguration config);
/**
* \brief Gets metadata for brokers, topics, partitions, etc
*
* This translates into a call to rd_kafka_metadata
*
* \param all_topics Whether to fetch metadata about all topics or only locally known ones
*
* \return The metadata
*/
Metadata get_metadata(bool all_topics = true) const;
/**
* \brief Gets general metadata but only fetches metadata for the given topic rather than
* all of them
*
* This translates into a call to rd_kafka_metadata
*
* \param topic The topic to fetch information for
*
* \return The topic metadata
*/
TopicMetadata get_metadata(const Topic& topic) const;
/**
* \brief Gets the consumer group information
*
* \param name The name of the consumer group to look up
*
* \return The group information
*/
GroupInformation get_consumer_group(const std::string& name);
/**
* \brief Gets all consumer groups
*
* \return A list of consumer groups
*/
GroupInformationList get_consumer_groups();
/**
* \brief Gets topic/partition offsets based on timestamps
*
* This translates into a call to rd_kafka_offsets_for_times
*
* \param queries A map from topic/partition to the timestamp to be used
*
* \return A topic partition list
*/
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const;
/**
* \brief Get the kafka handle name
*
* \return The handle name
*/
std::string get_name() const;
/**
* \brief Gets the configured timeout.
*
* \return The configured timeout
*
* \sa KafkaHandleBase::set_timeout
*/
std::chrono::milliseconds get_timeout() const;
/**
* \brief Gets the handle's configuration
*
* \return A reference to the configuration object
*/
const Configuration& get_configuration() const;
/**
* \brief Gets the length of the out queue
*
* This calls rd_kafka_outq_len
*
* \return The length of the queue
*/
int get_out_queue_length() const;
/**
* \brief Cancels the current callback dispatcher
*
* This calls rd_kafka_yield
*/
void yield() const;
protected:
KafkaHandleBase(Configuration config);
void set_handle(rd_kafka_t* handle);
void check_error(rd_kafka_resp_err_t error) const;
rd_kafka_conf_t* get_configuration_handle();
private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
using HandlePtr = std::unique_ptr<rd_kafka_t, decltype(&rd_kafka_destroy)>;
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const;
GroupInformationList fetch_consumer_groups(const char* name);
void save_topic_config(const std::string& topic_name, TopicConfiguration config);
std::chrono::milliseconds timeout_ms_;
Configuration config_;
TopicConfigurationMap topic_configurations_;
std::mutex topic_configurations_mutex_;
HandlePtr handle_;
};
} // cppkafka
#endif // CPPKAFKA_KAFKA_HANDLE_BASE_H