diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 75d053ec..ad9a9806 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -240,12 +240,19 @@ class CPPKAFKA_API MessageTimestamp { }; /** - * Constructs a timestamp object + * Constructs a timestamp object using a 'duration'. */ MessageTimestamp(std::chrono::milliseconds timestamp, TimestampType type); + + /** + * Constructs a timestamp object using a 'time_point'. + */ + template + MessageTimestamp(std::chrono::time_point timestamp, TimestampType type); /** - * Gets the timestamp value + * Gets the timestamp value. If the timestamp was created with a 'time_point', + * the duration represents the number of milliseconds since epoch. */ std::chrono::milliseconds get_timestamp() const; diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 6f8f1234..df216d91 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -128,11 +128,19 @@ class BasicMessageBuilder { Concrete& payload(BufferType&& value); /** - * Sets the message's timestamp + * Sets the message's timestamp with a 'duration' * * \param value The timestamp to be used */ Concrete& timestamp(std::chrono::milliseconds value); + + /** + * Sets the message's timestamp with a 'time_point'. + * + * \param value The timestamp to be used + */ + template + Concrete& timestamp(std::chrono::time_point value); /** * Sets the message's user data pointer @@ -184,7 +192,8 @@ class BasicMessageBuilder { BufferType& payload(); /** - * Gets the message's timestamp + * Gets the message's timestamp as a duration. If the timestamp was created with a 'time_point', + * the duration represents the number of milliseconds since epoch. */ std::chrono::milliseconds timestamp() const; @@ -295,6 +304,14 @@ C& BasicMessageBuilder::timestamp(std::chrono::milliseconds value) { return get_concrete(); } +template +template +C& BasicMessageBuilder::timestamp(std::chrono::time_point value) +{ + timestamp_ = std::chrono::duration_cast(value.time_since_epoch()); + return get_concrete(); +} + template C& BasicMessageBuilder::user_data(void* value) { user_data_ = value; diff --git a/src/message.cpp b/src/message.cpp index 798642da..77c09892 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -87,10 +87,18 @@ Message& Message::load_internal() { // MessageTimestamp MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type) -: timestamp_(timestamp), type_(type) { +: timestamp_(timestamp), + type_(type) { } +template +MessageTimestamp::MessageTimestamp(std::chrono::time_point timestamp, TimestampType type) +: timestamp_(std::chrono::duration_cast(timestamp.time_since_epoch())), + type_(type) { + +} + milliseconds MessageTimestamp::get_timestamp() const { return timestamp_; } diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 693954ca..64f70b82 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -24,6 +24,8 @@ using std::condition_variable; using std::chrono::system_clock; using std::chrono::seconds; using std::chrono::milliseconds; +using std::chrono::time_point; +using std::chrono::duration_cast; using std::ref; using namespace cppkafka; @@ -164,7 +166,7 @@ TEST_CASE("simple production", "[producer]") { SECTION("message with key") { const string payload = "Hello world! 2"; const string key = "such key"; - const milliseconds timestamp{15}; + auto timestamp = system_clock::now(); Producer producer(config); producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition) .key(key) @@ -181,7 +183,7 @@ TEST_CASE("simple production", "[producer]") { CHECK(message.get_partition() == partition); CHECK(!!message.get_error() == false); REQUIRE(!!message.get_timestamp() == true); - CHECK(message.get_timestamp()->get_timestamp() == timestamp); + CHECK(message.get_timestamp()->get_timestamp() == duration_cast(timestamp.time_since_epoch())); } #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)