Skip to content

Commit fadd2ff

Browse files
committed
Correct offsets in nested message sets
Message sets are nested before compression. Kafka used to decompress and fix the offsets, but that no longer seems to happen, so the offsets are all wrong. We now calculate the correct offset based on the "container" message's offset and the contained message's position in the message set.
1 parent b9dfe03 commit fadd2ff

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

lib/kafka/protocol/message.rb

+13-1
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,20 @@ def decompress
5656
# For some weird reason we need to cut out the first 20 bytes.
5757
data = codec.decompress(value)
5858
message_set_decoder = Decoder.from_string(data)
59+
message_set = MessageSet.decode(message_set_decoder)
60+
61+
# The contained messages need to have their offset corrected.
62+
messages = message_set.messages.each_with_index.map do |message, i|
63+
Message.new(
64+
offset: offset + i,
65+
value: message.value,
66+
key: message.key,
67+
create_time: message.create_time,
68+
codec_id: message.codec_id
69+
)
70+
end
5971

60-
MessageSet.decode(message_set_decoder)
72+
MessageSet.new(messages: messages)
6173
end
6274

6375
def self.decode(decoder)

0 commit comments

Comments
 (0)