-
Notifications
You must be signed in to change notification settings - Fork 339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Infinite reprocessing of messages from Snappy compressed producer #457
Comments
I've been able to recreate this issue using a Ruby producer. Strangely, when using the Ruby producer the repetition is harder to produce and the loop appear to happen at a slower rate. It also appears that the below producer may need to be run twice before the infinite loop starts in the consumer. # Place this in the project directory
$:.unshift File.expand_path("../lib", __FILE__)
require "ruby-kafka"
require "snappy"
kafka = Kafka.new(seed_brokers: "172.18.0.3:9092")
producer = kafka.producer(compression_codec: :snappy)
30.times do |i|
producer.produce(i.to_s, topic: "signals", partition_key: "abcdefgh12")
end
producer.deliver_messages
producer.shutdown |
I saw #429 and I don't believe this is a duplicate. The messages in the test are much smaller than the default for It's worth pointing out that this is is happening without using a consumer group (simple consumer) so we don't have any of the additional commit tracking that happens in that case. |
Can you paste some logs at DEBUG level? |
@gaffneyc can you try with the consumer group API, i.e. consumer = kafka.consumer(group_id: "my-group")
consumer.subscribe("signals")
consumer.each_message do |message|
p message
end And paste the logs if it's still not working? |
Sure thing, here are the logs and the consumer: https://gist.github.com/gaffneyc/7fea8395235c07d5c0f7619570c3aac9 As part of this test I've also reduced the number of Kafka brokers from 5 to 2 and partitions from 64 to 4 to try to reduce the log output. |
That's weird. Can you investigate whether there have been any changes to how compression is done in Kafka? |
Yeah, it is weird. In further testing, with Snappy disabled in the producer everything worked as expected with 0.5.0. As soon as I send a message with Snappy enabled it enters the infinite processing loop. I'm not sure if you saw but it looked like 7acbd7b was the commit that introduced the problem (found via git bisect). My first thought was that the updated API wasn't quite correct with compression or that the version upgrade handshake might not be working as expected. Honestly, I'm not familiar enough with the code or the Kafka protocol to know where to begin. I'll test against Kafka 0.10.2.1 to see if I can reproduce it. Jumping into Kafka's changelog and code is probably outside of my depth. |
It could be that v2 of the fetch API introduces some change in how compressed messages work. It's just weird that you're not getting an error then, as the message is correctly processed... |
Tested it again on 0.10.2.1 and was able to reproduce it (added the logs to the gist). Each test is run against a fresh Kafka cluster. |
It sounds like a change in API semantics for the fetch API, although I'm baffled that it seems like the problem is at the processing level – I would assume that we'd maybe get back the same message from the broker repeatedly. |
In the logs it stands out that the offset is 0 for each of the messages. Value is an incrementing number so they should map (roughly) to the offset in Kafka. If the offset isn't being parsed correctly then it could explain why it's never moving forward. |
If you can run with a local version of ruby-kafka, can you just pretty print all the batches returned from the broker to the terminal? |
Ah, they're different messages! I thought the same message was being re-processed. |
Ah yeah, the producer is pushing 30 messages to the topic. |
Hmm, it could be that the client has to calculate the offset itself when decompressing the messages – compression is sort of weird in Kafka version < 0.11. Basically, you jam all the messages together in a message set, then compress and place those bytes in the value of a new Kafka message, which is the one that is actually written to the broker... |
My guess would be that in Kafka 0.10, they brokers no longer decompress those message sets in order to set the correct offsets, instead relying on the clients to calculate them based on their relative offset from the "container" message. |
Can you see if this fixes the problem? #458 |
Force pushed a new version that should work. |
Yep! That looks to have fixed the issue. I'm getting the correct offsets in the logs and messages are only being processed once. |
🎉 I'll try to add a test and will merge it tomorrow. |
Awesome! Thank you for getting a fix in there and building ruby-kafka in the first place. |
We have a message producer written in Go and a consumer written in Ruby. It appears that enabling snappy compression on the producer causes messages to be infinitely reprocessed in the Ruby consumer.
The example below works fine on 0.4.3 and a
git bisect
shows that it may have been introduced by 7acbd7b. I've tried to reduce the problem to the smallest reproducible version (consumer groups are not necessary) though I have not tried to recreate it with a Ruby producer.Steps to reproduce
In the code below I have Kafka running locally in docker at 172.18.0.3:9092, you may need to change the broker list to get it working locally.
log: https://gist.github.com/gaffneyc/f2de66eceb7a4c2f9967c0ba4acda402
producer.go
consumer.rb
Expected outcome
Messages are processed and the consumer waits for the next one available
Actual outcome
Messages are processed in an infinite loop. For a topic with a large number of messages it appears that only a subset may be processed.
The text was updated successfully, but these errors were encountered: