diff --git a/CHANGELOG.md b/CHANGELOG.md index 32f6e71bc..1583086f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ Changes and additions to the library will be listed here. ## Unreleased +- Fix bug when using compression (#458). + ## v0.5.0 - Drops support for Kafka 0.9 in favor of Kafka 0.10 (#381)! diff --git a/lib/kafka/protocol/message.rb b/lib/kafka/protocol/message.rb index 7f6181e50..ed950e38a 100644 --- a/lib/kafka/protocol/message.rb +++ b/lib/kafka/protocol/message.rb @@ -56,8 +56,20 @@ def decompress # For some weird reason we need to cut out the first 20 bytes. data = codec.decompress(value) message_set_decoder = Decoder.from_string(data) + message_set = MessageSet.decode(message_set_decoder) + + # The contained messages need to have their offset corrected. + messages = message_set.messages.each_with_index.map do |message, i| + Message.new( + offset: offset + i, + value: message.value, + key: message.key, + create_time: message.create_time, + codec_id: message.codec_id + ) + end - MessageSet.decode(message_set_decoder) + MessageSet.new(messages: messages) end def self.decode(decoder) diff --git a/spec/compressor_spec.rb b/spec/compressor_spec.rb index 9e0037980..bd11ea7d6 100644 --- a/spec/compressor_spec.rb +++ b/spec/compressor_spec.rb @@ -15,8 +15,14 @@ decoder = Kafka::Protocol::Decoder.from_string(data) decoded_message = Kafka::Protocol::Message.decode(decoder) decoded_message_set = decoded_message.decompress + messages = decoded_message_set.messages - expect(decoded_message_set.messages.map(&:value)).to eq ["hello1", "hello2"] + expect(messages.map(&:value)).to eq ["hello1", "hello2"] + + # When decoding a compressed message, the offsets are calculated relative to that + # of the container message. The broker will set the offset in normal operation, + # but at the client-side we set it to -1. + expect(messages.map(&:offset)).to eq [-1, 0] end it "only compresses the messages if there are at least the configured threshold" do