Skip to content

Produce operation fails on retry #544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/kafka/broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def address_match?(host, port)

# @return [String]
def to_s
"#{connection} (node_id=#{@node_id.inspect})"
#"#{connection} (node_id=#{@node_id.inspect})"
"(node_id=#{@node_id.inspect})"
end

# @return [nil]
Expand Down
9 changes: 8 additions & 1 deletion lib/kafka/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,14 @@ def fetch_cluster_info
@logger.error "Failed to fetch metadata from #{node}: #{e}"
errors << [node, e]
ensure
broker.disconnect unless broker.nil?
begin
broker.disconnect unless broker.nil?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a bug nevertheless.

we could have a broker instance, that doesn't have an open connection. have a look here:
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/broker_pool.rb#L11-L30
and here: https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/broker.rb#L7-L14

and when we call disconnect then it tries to open a connection, and that's when it fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if I'm not really missing something, then @broker_pool.connect(node.hostname, node.port) doen't really connect, just creates an instance of broker.

broker.fetch_metadata - creates the acctual socket connection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create a separate PR where Broker only tries to disconnect if there's an open connection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh, It's a bit tricky. when calling connection, it already fails with ConnectionError. It's all b/c in connection builder we try to authenticate, and it's trying to open a socket to service that is unavailable.
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/connection_builder.rb#L25
https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl_authenticator.rb#L39-L45

I could do this:

    def disconnect
      connection.close
    rescue ConnectionError
      nil
    end

but there are other areas in the code that calls connection and expect it not to fail, like broker.to_s

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try adding this to Broker:

def connected?
  !@connection.nil?
end

Then in disconnect do:

def disconnect
  connection.close if connected?
end

rescue => e
# Failure #2
# borker wansn't nil
# Connection refused - connect(2) for
# And it wasn't trying to connnect to the rest of the seed brokers
end
end
end

Expand Down
13 changes: 13 additions & 0 deletions lib/kafka/produce_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,19 @@ def send_buffered_messages

handle_response(broker, response) if response
rescue ConnectionError => e
# Failure #1 - broker.to_s on connection
# Connection refused - connect(2) for
#
# "/Users/michal.wrobel/.rbenv/versions/2.4.3/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.5.3/lib/kafka/connection.rb:139:in `rescue in open'",
# "/Users/michal.wrobel/.rbenv/versions/2.4.3/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.5.3/lib/kafka/connection.rb:118:in `open'",
# "/Users/michal.wrobel/.rbenv/versions/2.4.3/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.5.3/lib/kafka/connection.rb:95:in `block in send_request'",
# "/Users/michal.wrobel/.rbenv/versions/2.4.3/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.5.3/lib/kafka/instrumenter.rb:21:in `instrument'",
# "/Users/michal.wrobel/.rbenv/versions/2.4.3/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.5.3/lib/kafka/connection.rb:94:in `send_request'",
# "/Users/michal.wrobel/.rbenv/versions/2.4.3/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.5.3/lib/kafka/sasl_authenticator.rb:39:in `authenticate!'",
# "/Users/michal.wrobel/.rbenv/versions/2.4.3/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.5.3/lib/kafka/connection_builder.rb:25:in `build_connection'",
# "/Users/michal.wrobel/.rbenv/versions/2.4.3/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.5.3/lib/kafka/broker.rb:159:in `connection'",
# "/Users/michal.wrobel/.rbenv/versions/2.4.3/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.5.3/lib/kafka/broker.rb:22:in `to_s'",
# "/Users/michal.wrobel/.rbenv/versions/2.4.3/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.5.3/lib/kafka/produce_operation.rb:103:in `rescue in block in send_buffered_messages'",
@logger.error "Could not connect to broker #{broker}: #{e}"

# Mark the cluster as stale in order to force a cluster metadata refresh.
Expand Down