Skip to content

Compressed messages are yielded with incorrect offsets #505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
1 task done
klippx opened this issue Dec 19, 2017 · 11 comments
Closed
1 task done

Compressed messages are yielded with incorrect offsets #505

klippx opened this issue Dec 19, 2017 · 11 comments
Labels

Comments

@klippx
Copy link
Contributor

klippx commented Dec 19, 2017

If this is a bug report, please fill out the following:

  • Version of Ruby: 2.4.1
  • Version of Kafka: 0.10.2.0
  • Version of ruby-kafka: 0.5.1
  • Please verify that the problem you're seeing hasn't been fixed by the current master of ruby-kafka.
Steps to reproduce

See failing tests and investigation below

Summary

We are investigating a case which causes our consumer to crash and reconsuming it's whole partition. It is about Ruby Kafka not returning the correct offsets for the fetched batch of compressed messages.

This happens dozens of times on a daily basis.

We are using Phobos. Here you can see our debug level log output:

https://gist.github.com/klippx/65c4ef06eb0e01cbde6d0640e3176f1f

Analysis

772033 is not compacted:

> kafka.fetch_messages(topic: 'eu-live.kred.account-events', partition: 7, offset: 772033, max_bytes: 1).map { |v| v.offset }
=> [772033]

772034 however, IS compacted:

> kafka.fetch_messages(topic: 'eu-live.kred.account-events', partition: 7, offset: 772034, max_bytes: 1).map { |v| v.offset }
=> [772043, 772044, 772045, 772046, 772047, 772048, 772049, 772050, 772051, 772052]

So, offset 772034 contains a compacted message with 10 messages inside, as you can see above. But the offsets are not handled correctly by ruby-kafka, it should be 10 items ending with 772043:

=> [772034, 772035, 772036, 772037, 772038, 772039, 772040, 772041, 772042, 772043]
Preparing an experiment

We got some hints from our kafka cluster team that our client may not handle compressed messages well. So, in order to see where things go wrong, I put some debug messages to try to find the issue.

In message.rb:

Kafka::Protocol::Message.class_eval do
      def initialize(value:, key: nil, create_time: Time.now, codec_id: 0, offset: -1)
        @key = key
        @value = value
        @codec_id = codec_id
        @offset = offset
        @create_time = create_time
        puts "Message> Initialized new Message offset=#{offset}"
        @bytesize = @key.to_s.bytesize + @value.to_s.bytesize
      end

      def decompress
        puts 'Message> Decompress messages...'
        codec = Kafka::Compression.find_codec_by_id(@codec_id)

        # For some weird reason we need to cut out the first 20 bytes.
        data = codec.decompress(value)
        message_set_decoder = Kafka::Protocol::Decoder.from_string(data)
        puts "Message> BEGIN decode"
        message_set = Kafka::Protocol::MessageSet.decode(message_set_decoder)
        puts "Message> END decode"

        # The contained messages need to have their offset corrected.
        puts "Message> Checking if compressed = #{message_set.messages.map { |message| message.compressed? }}"
        messages = message_set.messages.each_with_index.map do |message, i|
          puts offset
          Kafka::Protocol::Message.new(
            offset: offset + i,
            value: message.value,
            key: message.key,
            create_time: message.create_time,
            codec_id: message.codec_id
          )
        end

        Kafka::Protocol::MessageSet.new(messages: messages)
      end
end

In message_set.rb:

Kafka::Protocol::MessageSet.class_eval do
      def self.decode(decoder)
        fetched_messages = []

        until decoder.eof?
          begin
            message = Kafka::Protocol::Message.decode(decoder)
            puts "MessageSet> Decoded message with offset #{message.offset}"
            if message.compressed?
              puts "MessageSet> It is compressed..."
              wrapped_message_set = message.decompress
              puts "MessageSet> Concating..."
              fetched_messages.concat(wrapped_message_set.messages)
            else
              puts "MessageSet> It is NOT compressed..."
              fetched_messages << message
            end
          rescue EOFError
            if fetched_messages.empty?
              # If the first message in the set is truncated, it's likely because the
              # message is larger than the maximum size that we have asked for.
              raise MessageTooLargeToRead
            else
              # We tried to decode a partial message at the end of the set; just skip it.
            end
          end
        end
        puts "MessageSet> Now message offsets are: #{fetched_messages.map {|m| m.offset}}"
        new(messages: fetched_messages)
      end
end
Running the experiment

After modifying Ruby Kafka as per above, I ran the fetch_message again and this is the result:

First, the working not compacted message:

> kafka.fetch_messages(topic: 'eu-live.kred.account-events', partition: 7, offset: 772033, max_bytes: 1).map { |v| v.offset }
Message> Creating a message with offset 772033
Message> Initialized new Message offset=772033
MessageSet> Decoded message with offset 772033
MessageSet> It is NOT compressed...
MessageSet> Now message offsets are: [772033]
=> [772033]

Second, the compacted message:

> kafka.fetch_messages(topic: 'eu-live.kred.account-events', partition: 7, offset: 772034, max_bytes: 1).map { |v| v.offset }
Message> Creating a message with offset 772043
Message> Initialized new Message offset=772043
MessageSet> Decoded message with offset 772043
MessageSet> It is compressed...
Message> Decompress messages...
Message> BEGIN decode
Message> Creating a message with offset 772034
Message> Initialized new Message offset=772034
MessageSet> Decoded message with offset 772034
MessageSet> It is NOT compressed...
Message> Creating a message with offset 772035
Message> Initialized new Message offset=772035
MessageSet> Decoded message with offset 772035
MessageSet> It is NOT compressed...
Message> Creating a message with offset 772036
Message> Initialized new Message offset=772036
MessageSet> Decoded message with offset 772036
MessageSet> It is NOT compressed...
Message> Creating a message with offset 772037
Message> Initialized new Message offset=772037
MessageSet> Decoded message with offset 772037
MessageSet> It is NOT compressed...
Message> Creating a message with offset 772038
Message> Initialized new Message offset=772038
MessageSet> Decoded message with offset 772038
MessageSet> It is NOT compressed...
Message> Creating a message with offset 772039
Message> Initialized new Message offset=772039
MessageSet> Decoded message with offset 772039
MessageSet> It is NOT compressed...
Message> Creating a message with offset 772040
Message> Initialized new Message offset=772040
MessageSet> Decoded message with offset 772040
MessageSet> It is NOT compressed...
Message> Creating a message with offset 772041
Message> Initialized new Message offset=772041
MessageSet> Decoded message with offset 772041
MessageSet> It is NOT compressed...
Message> Creating a message with offset 772042
Message> Initialized new Message offset=772042
MessageSet> Decoded message with offset 772042
MessageSet> It is NOT compressed...
Message> Creating a message with offset 772043
Message> Initialized new Message offset=772043
MessageSet> Decoded message with offset 772043
MessageSet> It is NOT compressed...
MessageSet> Now message offsets are: [772034, 772035, 772036, 772037, 772038, 772039, 772040, 772041, 772042, 772043]
Message> END decode
Message> Checking if compressed = [false, false, false, false, false, false, false, false, false, false]
772043
Message> Initialized new Message offset=772043
772043
Message> Initialized new Message offset=772044
772043
Message> Initialized new Message offset=772045
772043
Message> Initialized new Message offset=772046
772043
Message> Initialized new Message offset=772047
772043
Message> Initialized new Message offset=772048
772043
Message> Initialized new Message offset=772049
772043
Message> Initialized new Message offset=772050
772043
Message> Initialized new Message offset=772051
772043
Message> Initialized new Message offset=772052
MessageSet> Concating...
MessageSet> Now message offsets are: [772043, 772044, 772045, 772046, 772047, 772048, 772049, 772050, 772051, 772052]
=> [772043, 772044, 772045, 772046, 772047, 772048, 772049, 772050, 772051, 772052]
Conclusions

What we can see here is that the code that was introduced in bugfix 42821e9 is the offender here. The messages are already correct in terms of offsets, but the bugfix code is RE-generating offsets which are, as a result, off by 10 causing our next fetch operation to crash.

We have added a test to expose the problem we are seeing.

We have verified that this works in v0.5.0

@dasch
Copy link
Contributor

dasch commented Dec 19, 2017

The problem is that this breaks Kafka 0.11, which is the reason for the original bugfix: #457

The JVM Kafka client has a different algorithm for getting the correct offsets – could you look into that and see if you can port that algorithm over?

https://github.com/apache/kafka/blob/88c2b6849a5af2af74972c8b2e8431473542ca83/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java#L340-L386

@zmstone
Copy link

zmstone commented Dec 19, 2017

What we can see here is that the code that was introduced in bugfix 42821e9 is the offender here. The messages are already correct in terms of offsets, but the bugfix code is RE-generating offsets which are, as a result, off by 10 causing our next fetch operation to crash.

This is because the messages were in 0.9 format in kafka.

It's not just 0.11. Relative offset feature was introduced in 0.10:
KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
JIRA: https://issues.apache.org/jira/browse/KAFKA-2511

Cases when client will receive corrected offsets:

  1. When fetch request is version 0, kafka will correct relative offset on broker side before replying fetch response
  2. When messages is stored in 0.9 format on disk (broker configured to do so).

All other cases, compressed inner messages should have relative offset, with below attributes:

  1. The container message should have the 'real' offset
  2. The container message's offset should be the 'real' offset of the last message in the compressed batch
  3. The first inner message should always have offset = 0

To resolve relative offsets:

base_offset = container_offset - message_set.last.offset
for message in message_set:
  message.offset = base_offset + message.offset

You may check if container_offset == message_set.last.offset, true to skip this 'correction',
but the above logic should be generic enough for offsets corrected by broker already,
because base_offset will be 0 in that case.

@dasch
Copy link
Contributor

dasch commented Dec 20, 2017

@zmstone can you write a PR that implements this?

@zmstone
Copy link

zmstone commented Dec 20, 2017

I believe @klippx is working on it.

@klippx
Copy link
Contributor Author

klippx commented Dec 20, 2017

We verified that with the fix in #506 the result of the test is

> kafka.fetch_messages(topic: 'eu-live.kred.account-events', partition: 7, offset: 772033, max_bytes: 1024).map { |v| v.offset }
=> [772033]

And

> kafka.fetch_messages(topic: 'eu-live.kred.account-events', partition: 7, offset: 772034, max_bytes: 1024).map { |v| v.offset }
=> [772034, 772035, 772036, 772037, 772038, 772039, 772040, 772041, 772042, 772043]

It should be noted that we are testing our scenario, we have no way of testing all possible real life scenarios (especially in production).

@dasch
Copy link
Contributor

dasch commented Dec 20, 2017

I'd have to do at least one pre-release and ask contributors to deploy it and test.

@piavka
Copy link

piavka commented Dec 20, 2017

we are suffering from same issue after upgrading from 0.4.3 to 0.5.1 yesterday and had to change all consumers to be reset to :latest offsets to avoid full partition re-consumption all the time
at the cost of loosing the unconsumed messages lag on reset
(this is happening at 4 different consumers with different workloads, with heavy workloads this happens very often)

we are running kafka 0.10.1.1 with log.message.format.version=0.9.0.1
now from @zmstone message is understand all the offsets should be correct? yet then why is this happening in our case? can you pls clarify?

Anyway i can tests this patch and see if this fixes the issue, we are consuming billions of messages and will quickly see if this is resolved.

Assuming that it is resolved next step we want to bump the message format to 0.10.x
should this be safe thing to do with the fix?

@dasch can you pls push a pre release to rubygems so we could test asap?

@dasch
Copy link
Contributor

dasch commented Dec 20, 2017

@piavka I've just released v0.5.2.beta1 – can you test that and report back? I won't be able to test in a real-life workload until January.

@dasch
Copy link
Contributor

dasch commented Dec 20, 2017

Should be fixed by #506.

@dasch dasch closed this as completed Dec 20, 2017
@dasch dasch added the 🐞 bug label Dec 20, 2017
@piavka
Copy link

piavka commented Dec 23, 2017

@dasch it's running stable with v0.5.2.beta2 for couple days already
tomorrow going to change log.message.format.versionfrom 0.9.0.1 to 0.10.1
and see if it triggers any issue

@dasch
Copy link
Contributor

dasch commented Dec 27, 2017

Great!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants