Skip to content

Commit 57268e6

Browse files
acceleratedmfontanini
authored andcommitted
Added time_point overloads for creating timestamps. (#128)
* Added time_point overloads for creating timestamps. * aliased std::chrono types
1 parent ad9a1e4 commit 57268e6

File tree

4 files changed

+41
-7
lines changed

4 files changed

+41
-7
lines changed

Diff for: include/cppkafka/message.h

+9-2
Original file line numberDiff line numberDiff line change
@@ -240,12 +240,19 @@ class CPPKAFKA_API MessageTimestamp {
240240
};
241241

242242
/**
243-
* Constructs a timestamp object
243+
* Constructs a timestamp object using a 'duration'.
244244
*/
245245
MessageTimestamp(std::chrono::milliseconds timestamp, TimestampType type);
246+
247+
/**
248+
* Constructs a timestamp object using a 'time_point'.
249+
*/
250+
template <typename Clock, typename Duration = typename Clock::duration>
251+
MessageTimestamp(std::chrono::time_point<Clock, Duration> timestamp, TimestampType type);
246252

247253
/**
248-
* Gets the timestamp value
254+
* Gets the timestamp value. If the timestamp was created with a 'time_point',
255+
* the duration represents the number of milliseconds since epoch.
249256
*/
250257
std::chrono::milliseconds get_timestamp() const;
251258

Diff for: include/cppkafka/message_builder.h

+19-2
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,19 @@ class BasicMessageBuilder {
128128
Concrete& payload(BufferType&& value);
129129

130130
/**
131-
* Sets the message's timestamp
131+
* Sets the message's timestamp with a 'duration'
132132
*
133133
* \param value The timestamp to be used
134134
*/
135135
Concrete& timestamp(std::chrono::milliseconds value);
136+
137+
/**
138+
* Sets the message's timestamp with a 'time_point'.
139+
*
140+
* \param value The timestamp to be used
141+
*/
142+
template <typename Clock, typename Duration = typename Clock::duration>
143+
Concrete& timestamp(std::chrono::time_point<Clock, Duration> value);
136144

137145
/**
138146
* Sets the message's user data pointer
@@ -184,7 +192,8 @@ class BasicMessageBuilder {
184192
BufferType& payload();
185193

186194
/**
187-
* Gets the message's timestamp
195+
* Gets the message's timestamp as a duration. If the timestamp was created with a 'time_point',
196+
* the duration represents the number of milliseconds since epoch.
188197
*/
189198
std::chrono::milliseconds timestamp() const;
190199

@@ -295,6 +304,14 @@ C& BasicMessageBuilder<T, C>::timestamp(std::chrono::milliseconds value) {
295304
return get_concrete();
296305
}
297306

307+
template <typename T, typename C>
308+
template <typename Clock, typename Duration>
309+
C& BasicMessageBuilder<T, C>::timestamp(std::chrono::time_point<Clock, Duration> value)
310+
{
311+
timestamp_ = std::chrono::duration_cast<std::chrono::milliseconds>(value.time_since_epoch());
312+
return get_concrete();
313+
}
314+
298315
template <typename T, typename C>
299316
C& BasicMessageBuilder<T, C>::user_data(void* value) {
300317
user_data_ = value;

Diff for: src/message.cpp

+9-1
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,18 @@ Message& Message::load_internal() {
8787
// MessageTimestamp
8888

8989
MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type)
90-
: timestamp_(timestamp), type_(type) {
90+
: timestamp_(timestamp),
91+
type_(type) {
9192

9293
}
9394

95+
template <typename Clock, typename Duration>
96+
MessageTimestamp::MessageTimestamp(std::chrono::time_point<Clock, Duration> timestamp, TimestampType type)
97+
: timestamp_(std::chrono::duration_cast<std::chrono::milliseconds>(timestamp.time_since_epoch())),
98+
type_(type) {
99+
100+
}
101+
94102
milliseconds MessageTimestamp::get_timestamp() const {
95103
return timestamp_;
96104
}

Diff for: tests/producer_test.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ using std::condition_variable;
2424
using std::chrono::system_clock;
2525
using std::chrono::seconds;
2626
using std::chrono::milliseconds;
27+
using std::chrono::time_point;
28+
using std::chrono::duration_cast;
2729
using std::ref;
2830

2931
using namespace cppkafka;
@@ -164,7 +166,7 @@ TEST_CASE("simple production", "[producer]") {
164166
SECTION("message with key") {
165167
const string payload = "Hello world! 2";
166168
const string key = "such key";
167-
const milliseconds timestamp{15};
169+
auto timestamp = system_clock::now();
168170
Producer producer(config);
169171
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition)
170172
.key(key)
@@ -181,7 +183,7 @@ TEST_CASE("simple production", "[producer]") {
181183
CHECK(message.get_partition() == partition);
182184
CHECK(!!message.get_error() == false);
183185
REQUIRE(!!message.get_timestamp() == true);
184-
CHECK(message.get_timestamp()->get_timestamp() == timestamp);
186+
CHECK(message.get_timestamp()->get_timestamp() == duration_cast<milliseconds>(timestamp.time_since_epoch()));
185187
}
186188

187189
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)

0 commit comments

Comments
 (0)