Skip to content

Commit a7584cd

Browse files
authored
Merge pull request #458 from zendesk/dasch/fix-nested-offsets
Correct offsets in nested message sets
2 parents 9b83275 + 42821e9 commit a7584cd

File tree

3 files changed

+22
-2
lines changed

3 files changed

+22
-2
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ Changes and additions to the library will be listed here.
44

55
## Unreleased
66

7+
- Fix bug when using compression (#458).
8+
79
## v0.5.0
810

911
- Drops support for Kafka 0.9 in favor of Kafka 0.10 (#381)!

lib/kafka/protocol/message.rb

+13-1
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,20 @@ def decompress
5656
# For some weird reason we need to cut out the first 20 bytes.
5757
data = codec.decompress(value)
5858
message_set_decoder = Decoder.from_string(data)
59+
message_set = MessageSet.decode(message_set_decoder)
60+
61+
# The contained messages need to have their offset corrected.
62+
messages = message_set.messages.each_with_index.map do |message, i|
63+
Message.new(
64+
offset: offset + i,
65+
value: message.value,
66+
key: message.key,
67+
create_time: message.create_time,
68+
codec_id: message.codec_id
69+
)
70+
end
5971

60-
MessageSet.decode(message_set_decoder)
72+
MessageSet.new(messages: messages)
6173
end
6274

6375
def self.decode(decoder)

spec/compressor_spec.rb

+7-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@
1515
decoder = Kafka::Protocol::Decoder.from_string(data)
1616
decoded_message = Kafka::Protocol::Message.decode(decoder)
1717
decoded_message_set = decoded_message.decompress
18+
messages = decoded_message_set.messages
1819

19-
expect(decoded_message_set.messages.map(&:value)).to eq ["hello1", "hello2"]
20+
expect(messages.map(&:value)).to eq ["hello1", "hello2"]
21+
22+
# When decoding a compressed message, the offsets are calculated relative to that
23+
# of the container message. The broker will set the offset in normal operation,
24+
# but at the client-side we set it to -1.
25+
expect(messages.map(&:offset)).to eq [-1, 0]
2026
end
2127

2228
it "only compresses the messages if there are at least the configured threshold" do

0 commit comments

Comments
 (0)