Skip to content

Commit 9b88b4e

Browse files
author
accelerated
committed
added retry logic for producers
1 parent 9714bec commit 9b88b4e

9 files changed

+436
-47
lines changed

Diff for: include/cppkafka/configuration.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
145145
Configuration& set_default_topic_configuration(TopicConfiguration config);
146146

147147
/**
148-
* Returns true iff the given property name has been set
148+
* Returns true if the given property name has been set
149149
*/
150150
bool has_property(const std::string& name) const;
151151

Diff for: include/cppkafka/message.h

+15-3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
namespace cppkafka {
4444

4545
class MessageTimestamp;
46+
struct Internal;
4647

4748
/**
4849
* \brief Thin wrapper over a rdkafka message handle
@@ -56,6 +57,8 @@ class MessageTimestamp;
5657
*/
5758
class CPPKAFKA_API Message {
5859
public:
60+
friend class MessageInternal;
61+
using InternalPtr = std::shared_ptr<Internal>;
5962
/**
6063
* Constructs a message that won't take ownership of the given pointer
6164
*/
@@ -134,14 +137,13 @@ class CPPKAFKA_API Message {
134137
}
135138

136139
/**
137-
* \brief Gets the private data.
140+
* \brief Gets the private user data.
138141
*
139142
* This should only be used on messages produced by a Producer that were set a private data
140143
* attribute
141144
*/
142145
void* get_user_data() const {
143-
assert(handle_);
144-
return handle_->_private;
146+
return user_data_;
145147
}
146148

147149
/**
@@ -164,17 +166,27 @@ class CPPKAFKA_API Message {
164166
rd_kafka_message_t* get_handle() const {
165167
return handle_.get();
166168
}
169+
170+
/**
171+
* Internal private const data accessor (internal use only)
172+
*/
173+
InternalPtr internal() const {
174+
return internal_;
175+
}
167176
private:
168177
using HandlePtr = std::unique_ptr<rd_kafka_message_t, decltype(&rd_kafka_message_destroy)>;
169178

170179
struct NonOwningTag { };
171180

172181
Message(rd_kafka_message_t* handle, NonOwningTag);
173182
Message(HandlePtr handle);
183+
void load_internal(void* user_data, InternalPtr internal);
174184

175185
HandlePtr handle_;
176186
Buffer payload_;
177187
Buffer key_;
188+
void* user_data_;
189+
InternalPtr internal_;
178190
};
179191

180192
using MessageList = std::vector<Message>;

Diff for: include/cppkafka/message_builder.h

+25-5
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ class BasicMessageBuilder {
166166
* Gets the message's user data pointer
167167
*/
168168
void* user_data() const;
169+
170+
/**
171+
* Private data accessor (internal use only)
172+
*/
173+
Message::InternalPtr internal() const;
174+
Concrete& internal(Message::InternalPtr internal);
175+
169176
private:
170177
void construct_buffer(BufferType& lhs, const BufferType& rhs);
171178
Concrete& get_concrete();
@@ -176,11 +183,13 @@ class BasicMessageBuilder {
176183
BufferType payload_;
177184
std::chrono::milliseconds timestamp_{0};
178185
void* user_data_;
186+
Message::InternalPtr internal_;
179187
};
180188

181189
template <typename T, typename C>
182190
BasicMessageBuilder<T, C>::BasicMessageBuilder(std::string topic)
183-
: topic_(std::move(topic)) {
191+
: topic_(std::move(topic)),
192+
user_data_(nullptr) {
184193
}
185194

186195
template <typename T, typename C>
@@ -190,16 +199,16 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
190199
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
191200
timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
192201
std::chrono::milliseconds(0)),
193-
user_data_(message.get_user_data())
194-
{
195-
202+
user_data_(message.get_user_data()),
203+
internal_(message.internal()) {
196204
}
197205

198206
template <typename T, typename C>
199207
template <typename U, typename V>
200208
BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs)
201209
: topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()),
202-
user_data_(rhs.user_data()) {
210+
user_data_(rhs.user_data()),
211+
internal_(rhs.internal()) {
203212
get_concrete().construct_buffer(key_, rhs.key());
204213
get_concrete().construct_buffer(payload_, rhs.payload());
205214
}
@@ -292,6 +301,17 @@ void* BasicMessageBuilder<T, C>::user_data() const {
292301
return user_data_;
293302
}
294303

304+
template <typename T, typename C>
305+
Message::InternalPtr BasicMessageBuilder<T, C>::internal() const {
306+
return internal_;
307+
}
308+
309+
template <typename T, typename C>
310+
C& BasicMessageBuilder<T, C>::internal(Message::InternalPtr internal) {
311+
internal_ = internal;
312+
return get_concrete();
313+
}
314+
295315
template <typename T, typename C>
296316
void BasicMessageBuilder<T, C>::construct_buffer(T& lhs, const T& rhs) {
297317
lhs = rhs;

Diff for: include/cppkafka/message_internal.h

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#ifndef CPPKAFKA_MESSAGE_INTERNAL_H
31+
#define CPPKAFKA_MESSAGE_INTERNAL_H
32+
33+
#include <memory>
34+
#include "message.h"
35+
36+
namespace cppkafka {
37+
38+
struct Internal {
39+
virtual ~Internal() = default;
40+
};
41+
using InternalPtr = std::shared_ptr<Internal>;
42+
43+
/**
44+
* \brief Private message data structure
45+
*/
46+
class MessageInternal {
47+
friend class Producer;
48+
49+
public:
50+
static std::unique_ptr<MessageInternal> load(Message& message) {
51+
if (message.get_user_data()) {
52+
// Unpack internal data
53+
std::unique_ptr<MessageInternal> internal_data(static_cast<MessageInternal*>(message.get_user_data()));
54+
message.load_internal(internal_data->user_data_, internal_data->internal_);
55+
return internal_data;
56+
}
57+
return nullptr;
58+
}
59+
60+
private:
61+
MessageInternal(void* user_data, std::shared_ptr<Internal> internal)
62+
: user_data_(user_data),
63+
internal_(internal) {
64+
}
65+
66+
void* user_data_;
67+
InternalPtr internal_;
68+
};
69+
70+
}
71+
72+
#endif //CPPKAFKA_MESSAGE_INTERNAL_H

0 commit comments

Comments
 (0)