-
Notifications
You must be signed in to change notification settings - Fork 339
Consumer gets stuck in loop fetching partial message #429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Do you have logs from the incident? |
If the broker simply returns an empty message set there's not much we can do on the client side. Any logs would be helpful, but also a close reading of any recent changes to the protocol or Java client that would allow us to detect this issue. |
In the logs, the committing offsets messages showed several partitions not moving for 8 days. When we started reading from those partitions again we skipped forward several thousand offsets. When we dug in to it, our producer had generated a single message that was larger than the configured max_bytes_per_partition for the consumer. When experimenting locally, I produced a message that was 600KB and configured a consumer with a max_bytes_per_partition of 400KB. The decoder,https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/protocol/decoder.rb#L81, would show a message of 600KB was there but would fail to decode because only 400KB was present in the io object. I can try to do some more digging into the documentation of this behavior. |
That would be great. From your comment, it sounds like the broker does return some of the content. I'm not quite sure what the client is supposed to do in that situation – I would err on the side of an explicit error in the log, telling the user that she needs to increase |
Reading through, https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchRequest, doesn't seem to give too much guidance on what to do. When going through the source of the 0.9.0.1 java consumer, they made the decision to raise an error in this scenario. https://github.com/apache/kafka/blob/0.9.0.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L365 and https://github.com/apache/kafka/blob/0.9.0.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L580. |
Yeah, that sounds like the only sensible option, although I'd prefer to keep the consumer running so that the other partitions can be processed... |
Maybe this error is sufficiently exceptional to warrant crashing the consumer process with an error message. |
In our situation, I think crashing the consumer process would have been the preferred behavior. As this misconfiguration could have eventually led to no partitions processing. |
When the message fails to decode, is there no exception being raised? |
I would expect the consumer to crash, actually... or at least write an exception to the logs. |
I think it's because this partial message scenario manifests as an |
Ugh, that's bad – we also get EOFError during normal fetches, since Kafka just hands a slice directly from disk to the client, not worrying about including a whole number of messages... I guess we could check whether any other messages have been read, and raise an error if the first message results in EOFError. |
Can the EOFError be raised by more exceptional circumstances like the connection dying? |
I’m not entirely sure, would have to dive into the IO docs. |
Recently, a consumer got silently stuck trying to pull a message that was larger than the max_bytes_per_partition we had configured for it. It would have been nice if our consumer had raised an error when this scenario occurred. This looks like something the java consumer does in kafka 0.9 based on https://issues.apache.org/jira/browse/KAFKA-3442.
The text was updated successfully, but these errors were encountered: