Skip to content

Commit a6b5404

Browse files
committed
Allow specifying a create time for messages
1 parent 6b720d1 commit a6b5404

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

lib/kafka/producer.rb

+2-3
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,11 @@ def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, requ
178178
# @param topic [String] the topic that the message should be written to.
179179
# @param partition [Integer] the partition that the message should be written to.
180180
# @param partition_key [String] the key that should be used to assign a partition.
181+
# @param create_time [Time] the timestamp that should be set on the message.
181182
#
182183
# @raise [BufferOverflow] if the maximum buffer size has been reached.
183184
# @return [nil]
184-
def produce(value, key: nil, topic:, partition: nil, partition_key: nil)
185-
create_time = Time.now
186-
185+
def produce(value, key: nil, topic:, partition: nil, partition_key: nil, create_time: Time.now)
187186
message = PendingMessage.new(
188187
value && value.to_s,
189188
key && key.to_s,

spec/functional/producer_spec.rb

+11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,17 @@
77

88
let!(:topic) { create_random_topic(num_partitions: 3) }
99

10+
example "setting a create_time value" do
11+
timestamp = Time.now
12+
13+
producer.produce("hello", topic: topic, partition: 0, create_time: timestamp)
14+
producer.deliver_messages
15+
16+
message = kafka.fetch_messages(topic: topic, partition: 0, offset: :earliest).last
17+
18+
expect(message.create_time.to_i).to eq timestamp.to_i
19+
end
20+
1021
example "writing messages using the buffered producer" do
1122
value1 = rand(10_000).to_s
1223
value2 = rand(10_000).to_s

0 commit comments

Comments
 (0)