Skip to content

Commit 7acbd7b

Browse files
committed
Restore pull request #376 from zendesk/dasch/message-timestamps
This reverts commit d03eae0.
1 parent 2abec4c commit 7acbd7b

11 files changed

+40
-13
lines changed

Diff for: circle.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ machine:
99
dependencies:
1010
pre:
1111
- docker -v
12-
- docker pull ches/kafka:0.9.0.1
12+
- docker pull ches/kafka:0.10.0.0
1313
- docker pull jplock/zookeeper:3.4.6
1414
- gem install bundler -v 1.9.5
1515

Diff for: lib/kafka/fetch_operation.rb

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def execute
9090
topic: fetched_topic.name,
9191
partition: fetched_partition.partition,
9292
offset: message.offset,
93+
create_time: message.create_time,
9394
)
9495
}
9596

Diff for: lib/kafka/fetched_message.rb

+5-1
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@ class FetchedMessage
1616
# @return [Integer] the offset of the message in the partition.
1717
attr_reader :offset
1818

19-
def initialize(value:, key:, topic:, partition:, offset:)
19+
# @return [Time] the timestamp of the message.
20+
attr_reader :create_time
21+
22+
def initialize(value:, key:, topic:, partition:, offset:, create_time:)
2023
@value = value
2124
@key = key
2225
@topic = topic
2326
@partition = partition
2427
@offset = offset
28+
@create_time = create_time
2529
end
2630
end
2731
end

Diff for: lib/kafka/protocol/fetch_request.rb

+4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ def api_key
3030
FETCH_API
3131
end
3232

33+
def api_version
34+
2
35+
end
36+
3337
def response_class
3438
Protocol::FetchResponse
3539
end

Diff for: lib/kafka/protocol/fetch_response.rb

+5-2
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,14 @@ def initialize(name:, partitions:)
3838

3939
attr_reader :topics
4040

41-
def initialize(topics: [])
41+
def initialize(topics: [], throttle_time_ms: 0)
4242
@topics = topics
43+
@throttle_time_ms = throttle_time_ms
4344
end
4445

4546
def self.decode(decoder)
47+
throttle_time_ms = decoder.int32
48+
4649
topics = decoder.array do
4750
topic_name = decoder.string
4851

@@ -68,7 +71,7 @@ def self.decode(decoder)
6871
)
6972
end
7073

71-
new(topics: topics)
74+
new(topics: topics, throttle_time_ms: throttle_time_ms)
7275
end
7376
end
7477
end

Diff for: lib/kafka/protocol/message.rb

+6-3
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ module Protocol
66

77
# ## API Specification
88
#
9-
# Message => Crc MagicByte Attributes Key Value
9+
# Message => Crc MagicByte Attributes Timestamp Key Value
1010
# Crc => int32
1111
# MagicByte => int8
1212
# Attributes => int8
13+
# Timestamp => int64, in ms
1314
# Key => bytes
1415
# Value => bytes
1516
#
1617
class Message
17-
MAGIC_BYTE = 0
18+
MAGIC_BYTE = 1
1819

1920
attr_reader :key, :value, :codec_id, :offset
2021

@@ -71,14 +72,15 @@ def self.decode(decoder)
7172
end
7273

7374
attributes = message_decoder.int8
75+
timestamp = message_decoder.int64
7476
key = message_decoder.bytes
7577
value = message_decoder.bytes
7678

7779
# The codec id is encoded in the three least significant bits of the
7880
# attributes.
7981
codec_id = attributes & 0b111
8082

81-
new(key: key, value: value, codec_id: codec_id, offset: offset)
83+
new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: Time.at(timestamp/1000.0))
8284
end
8385

8486
private
@@ -102,6 +104,7 @@ def encode_without_crc
102104

103105
encoder.write_int8(MAGIC_BYTE)
104106
encoder.write_int8(@codec_id)
107+
encoder.write_int64((@create_time.to_f*1000).to_i)
105108
encoder.write_bytes(@key)
106109
encoder.write_bytes(@value)
107110

Diff for: lib/kafka/protocol/produce_request.rb

+4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ def api_key
4040
PRODUCE_API
4141
end
4242

43+
def api_version
44+
2
45+
end
46+
4347
def response_class
4448
requires_acks? ? Protocol::ProduceResponse : nil
4549
end

Diff for: lib/kafka/protocol/produce_response.rb

+10-5
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,21 @@ def initialize(topic:, partitions:)
1111
end
1212

1313
class PartitionInfo
14-
attr_reader :partition, :error_code, :offset
14+
attr_reader :partition, :error_code, :offset, :timestamp
1515

16-
def initialize(partition:, error_code:, offset:)
16+
def initialize(partition:, error_code:, offset:, timestamp:)
1717
@partition = partition
1818
@error_code = error_code
1919
@offset = offset
20+
@timestamp = timestamp
2021
end
2122
end
2223

23-
attr_reader :topics
24+
attr_reader :topics, :throttle_time_ms
2425

25-
def initialize(topics: [])
26+
def initialize(topics: [], throttle_time_ms: 0)
2627
@topics = topics
28+
@throttle_time_ms = throttle_time_ms
2729
end
2830

2931
def each_partition
@@ -43,13 +45,16 @@ def self.decode(decoder)
4345
partition: decoder.int32,
4446
error_code: decoder.int16,
4547
offset: decoder.int64,
48+
timestamp: Time.at(decoder.int64/1000.0),
4649
)
4750
end
4851

4952
TopicInfo.new(topic: topic, partitions: partitions)
5053
end
5154

52-
new(topics: topics)
55+
throttle_time_ms = decoder.int32
56+
57+
new(topics: topics, throttle_time_ms: throttle_time_ms)
5358
end
5459
end
5560
end

Diff for: spec/consumer_spec.rb

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
topic: "greetings",
5959
partition: 0,
6060
offset: 13,
61+
create_time: Time.now,
6162
)
6263
]
6364
}
@@ -182,6 +183,7 @@
182183
topic: "greetings",
183184
partition: 0,
184185
offset: 13,
186+
create_time: Time.now,
185187
)
186188
]
187189
}

Diff for: spec/fake_broker.rb

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def produce(messages_for_topics:, required_acks:, timeout:)
3535
partition: partition,
3636
error_code: error_code_for_partition(topic, partition),
3737
offset: message_set.messages.size,
38+
timestamp: (Time.now.to_f*1000).to_i,
3839
)
3940
}
4041
)

Diff for: spec/test_cluster.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class TestCluster
1818
}
1919

2020
DOCKER_HOSTNAME = URI(DOCKER_HOST).host
21-
KAFKA_IMAGE = "ches/kafka:0.9.0.1"
21+
KAFKA_IMAGE = "ches/kafka:0.10.0.0"
2222
ZOOKEEPER_IMAGE = "jplock/zookeeper:3.4.6"
2323
KAFKA_CLUSTER_SIZE = 3
2424

0 commit comments

Comments
 (0)