From 42821e9fc58d63185f13b6524dcd5bc7a112d0f3 Mon Sep 17 00:00:00 2001 From: Daniel Schierbeck Date: Mon, 30 Oct 2017 16:07:07 +0100 Subject: [PATCH] Correct offsets in nested message sets Message sets are nested before compression. Kafka used to decompress and fix the offsets, but that no longer seems to happen, so the offsets are all wrong. We now calculate the correct offset based on the "container" message's offset and the contained message's position in the message set. --- CHANGELOG.md | 2 ++ lib/kafka/protocol/message.rb | 14 +++++++++++++- spec/compressor_spec.rb | 8 +++++++- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 32f6e71bc..1583086f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ Changes and additions to the library will be listed here. ## Unreleased +- Fix bug when using compression (#458). + ## v0.5.0 - Drops support for Kafka 0.9 in favor of Kafka 0.10 (#381)! diff --git a/lib/kafka/protocol/message.rb b/lib/kafka/protocol/message.rb index 7f6181e50..ed950e38a 100644 --- a/lib/kafka/protocol/message.rb +++ b/lib/kafka/protocol/message.rb @@ -56,8 +56,20 @@ def decompress # For some weird reason we need to cut out the first 20 bytes. data = codec.decompress(value) message_set_decoder = Decoder.from_string(data) + message_set = MessageSet.decode(message_set_decoder) + + # The contained messages need to have their offset corrected. + messages = message_set.messages.each_with_index.map do |message, i| + Message.new( + offset: offset + i, + value: message.value, + key: message.key, + create_time: message.create_time, + codec_id: message.codec_id + ) + end - MessageSet.decode(message_set_decoder) + MessageSet.new(messages: messages) end def self.decode(decoder) diff --git a/spec/compressor_spec.rb b/spec/compressor_spec.rb index 9e0037980..bd11ea7d6 100644 --- a/spec/compressor_spec.rb +++ b/spec/compressor_spec.rb @@ -15,8 +15,14 @@ decoder = Kafka::Protocol::Decoder.from_string(data) decoded_message = Kafka::Protocol::Message.decode(decoder) decoded_message_set = decoded_message.decompress + messages = decoded_message_set.messages - expect(decoded_message_set.messages.map(&:value)).to eq ["hello1", "hello2"] + expect(messages.map(&:value)).to eq ["hello1", "hello2"] + + # When decoding a compressed message, the offsets are calculated relative to that + # of the container message. The broker will set the offset in normal operation, + # but at the client-side we set it to -1. + expect(messages.map(&:offset)).to eq [-1, 0] end it "only compresses the messages if there are at least the configured threshold" do