-
Notifications
You must be signed in to change notification settings - Fork 339
/
Copy pathmessage.rb
138 lines (109 loc) · 3.71 KB
/
message.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
require "stringio"
require "zlib"
module Kafka
module Protocol
# ## API Specification
#
# Message => Crc MagicByte Attributes Timestamp Key Value
# Crc => int32
# MagicByte => int8
# Attributes => int8
# Timestamp => int64, in ms
# Key => bytes
# Value => bytes
#
class Message
MAGIC_BYTE = 1
attr_reader :key, :value, :codec_id, :offset
attr_reader :bytesize, :create_time
def initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)
@key = key
@value = value
@codec_id = codec_id
@offset = offset
@create_time = create_time
@bytesize = @key.to_s.bytesize + @value.to_s.bytesize
end
def encode(encoder)
data = encode_with_crc
encoder.write_int64(offset)
encoder.write_bytes(data)
end
def ==(other)
@key == other.key &&
@value == other.value &&
@codec_id == other.codec_id &&
@offset == other.offset
end
def compressed?
@codec_id != 0
end
# @return [Kafka::Protocol::MessageSet]
def decompress
codec = Compression.find_codec_by_id(@codec_id)
# For some weird reason we need to cut out the first 20 bytes.
data = codec.decompress(value)
message_set_decoder = Decoder.from_string(data)
message_set = MessageSet.decode(message_set_decoder)
# The contained messages need to have their offset corrected.
messages = message_set.messages.each_with_index.map do |message, i|
Message.new(
offset: offset + i,
value: message.value,
key: message.key,
create_time: message.create_time,
codec_id: message.codec_id
)
end
MessageSet.new(messages: messages)
end
def self.decode(decoder)
offset = decoder.int64
message_decoder = Decoder.from_string(decoder.bytes)
crc = message_decoder.int32
magic_byte = message_decoder.int8
attributes = message_decoder.int8
# The magic byte indicates the message format version. There are situations
# where an old message format can be returned from a newer version of Kafka,
# because old messages are not necessarily rewritten on upgrades.
case magic_byte
when 0
# No timestamp in the pre-0.10 message format.
timestamp = nil
when 1
timestamp = message_decoder.int64
else
raise Kafka::Error, "Invalid magic byte: #{magic_byte}"
end
key = message_decoder.bytes
value = message_decoder.bytes
# The codec id is encoded in the three least significant bits of the
# attributes.
codec_id = attributes & 0b111
# The timestamp will be nil if the message was written in the Kafka 0.9 log format.
create_time = timestamp && Time.at(timestamp / 1000.0)
new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time)
end
private
def encode_with_crc
buffer = StringIO.new
encoder = Encoder.new(buffer)
data = encode_without_crc
crc = Zlib.crc32(data)
encoder.write_int32(crc)
encoder.write(data)
buffer.string
end
def encode_without_crc
buffer = StringIO.new
encoder = Encoder.new(buffer)
encoder.write_int8(MAGIC_BYTE)
encoder.write_int8(@codec_id)
encoder.write_int64((@create_time.to_f * 1000).to_i)
encoder.write_bytes(@key)
encoder.write_bytes(@value)
buffer.string
end
end
end
end