-
Notifications
You must be signed in to change notification settings - Fork 339
Produce operation fails on retry #544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This is just an illustration where I found problems. What happened: When rebooting kafka broker, sync producer is failing to retry. (I assume it only fails for clusters that require authentication) This was the first exception I got when producing a message: ``` Kafka::ConnectionError: Connection refused - connect(2) for {} /gems/ruby-kafka-0.5.2/lib/kafka/connection.rb:139 in rescue in open /gems/ruby-kafka-0.5.2/lib/kafka/connection.rb:118 in open /gems/ruby-kafka-0.5.2/lib/kafka/connection.rb:95 in block in send_request /gems/activesupport-5.1.4/lib/active_support/notifications.rb:168 in instrument /gems/ruby-kafka-0.5.2/lib/kafka/instrumenter.rb:19 in instrument /gems/ruby-kafka-0.5.2/lib/kafka/connection.rb:94 in send_request /gems/ruby-kafka-0.5.2/lib/kafka/sasl_authenticator.rb:39 in authenticate! /gems/ruby-kafka-0.5.2/lib/kafka/connection_builder.rb:25 in build_connection /gems/ruby-kafka-0.5.2/lib/kafka/broker.rb:141 in connection /gems/ruby-kafka-0.5.2/lib/kafka/broker.rb:22 in to_s /gems/ruby-kafka-0.5.2/lib/kafka/produce_operation.rb:103 in rescue in block in send_buffered_messages /gems/ruby-kafka-0.5.2/lib/kafka/produce_operation.rb:82 in block in send_buffered_messages /gems/ruby-kafka-0.5.2/lib/kafka/produce_operation.rb:81 in each /gems/ruby-kafka-0.5.2/lib/kafka/produce_operation.rb:81 in send_buffered_messages /gems/ruby-kafka-0.5.2/lib/kafka/produce_operation.rb:47 in block in execute /gems/activesupport-5.1.4/lib/active_support/notifications.rb:168 in instrument /gems/ruby-kafka-0.5.2/lib/kafka/instrumenter.rb:19 in instrument /gems/ruby-kafka-0.5.2/lib/kafka/produce_operation.rb:41 in execute /gems/ruby-kafka-0.5.2/lib/kafka/producer.rb:303 in block in deliver_messages_with_retries /gems/ruby-kafka-0.5.2/lib/kafka/producer.rb:291 in loop /gems/ruby-kafka-0.5.2/lib/kafka/producer.rb:291 in deliver_messages_with_retries /gems/ruby-kafka-0.5.2/lib/kafka/producer.rb:241 in block in deliver_messages /gems/activesupport-5.1.4/lib/active_support/notifications.rb:168 in instrument /gems/ruby-kafka-0.5.2/lib/kafka/instrumenter.rb:19 in instrument /gems/ruby-kafka-0.5.2/lib/kafka/producer.rb:234 in deliver_messages ``` Initial investigation led me that the problem was with the `broker.to_s`, which was failing, and cluster wasn't marked as stale. But once I fixed that, there was also an issue with `cluster.fetch_cluster_info` method, which was failing on the broker that was shut down currently. This PR is only to indicate the problems.
Can you try to come up with a functional test that reproduces the problem? |
As I mentioned, this might be only happening with clusters that require authentication :/ So it might be quite challenging to build a functional test that reproduces it. (I won't defo have time today, that I might take a stab on it tomorrow.) |
How many retries have you configured, and do you have DEBUG logs? |
@@ -357,7 +357,14 @@ def fetch_cluster_info | |||
@logger.error "Failed to fetch metadata from #{node}: #{e}" | |||
errors << [node, e] | |||
ensure | |||
broker.disconnect unless broker.nil? | |||
begin | |||
broker.disconnect unless broker.nil? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a bug nevertheless.
we could have a broker instance, that doesn't have an open connection. have a look here:
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/broker_pool.rb#L11-L30
and here: https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/broker.rb#L7-L14
and when we call disconnect
then it tries to open a connection, and that's when it fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so if I'm not really missing something, then @broker_pool.connect(node.hostname, node.port)
doen't really connect, just creates an instance of broker.
broker.fetch_metadata
- creates the acctual socket connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you create a separate PR where Broker only tries to disconnect if there's an open connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh, It's a bit tricky. when calling connection
, it already fails with ConnectionError
. It's all b/c in connection builder we try to authenticate, and it's trying to open a socket to service that is unavailable.
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/connection_builder.rb#L25
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl_authenticator.rb#L39-L45
I could do this:
def disconnect
connection.close
rescue ConnectionError
nil
end
but there are other areas in the code that calls connection
and expect it not to fail, like broker.to_s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try adding this to Broker:
def connected?
!@connection.nil?
end
Then in disconnect
do:
def disconnect
connection.close if connected?
end
2, but it never gets to execute the 2nd retry, as it fails earlier. I can provide the debug logs. |
I opened a new PR: #545 |
What happened:
When rebooting kafka broker, sync producer was failing to retry sending messages. (I assume it only fails for clusters that require authentication)
FYI: This is just an illustration where I found the problems, when I was manually testing that scenario against integration cluster.
This was the first exception I got when producing a message:
Initial exception was indicating that the problem was with the
broker.to_s
, which was failing, and was prohibiting setting stale on the cluster.Once I "fixed" that, there was also an issue with
cluster.fetch_cluster_info
method, which was failing on the broker that was being rebooted (
ensure close
).This PR is only to indicate the problems, and to get some guidance on how to fix it properly.