Skip to content

[Bug?] Trying to join a consumer group indefinitely #854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
PChambino opened this issue Jul 27, 2020 · 3 comments
Closed

[Bug?] Trying to join a consumer group indefinitely #854

PChambino opened this issue Jul 27, 2020 · 3 comments

Comments

@PChambino
Copy link

PChambino commented Jul 27, 2020

If this is a bug report, please fill out the following:

  • Version of Ruby: 2.6.3p62
  • Version of Kafka: 2.4.1 (heroku-kafka:standard-0)
  • Version of ruby-kafka: 1.1.0

Please verify that the problem you're seeing hasn't been fixed by the current master of ruby-kafka.
Doesn't seem like it has been fixed.

Steps to reproduce

Restart a consumer process (in Heroku). It only happens sometimes, and so far we only noticed it when the restart is initiated by Heroku instead of a normal deploy but it could just be by chance.

Expected outcome

The consumer would start, join the group, and start processing messages.

Actual outcome

The consumer stays in a loop trying to join the consumer group but fails indefinitely until it is restarted manually.

It starts by retrying like this:

2020-07-19T00:27:31.74778+00:00 host app kafka_consumer.1 - Joining group `performance_stats_es`
2020-07-19T00:27:31.747875+00:00 host app kafka_consumer.1 - Fetching cluster metadata from kafka+ssl://ec2-redacted.compute-1.amazonaws.com:9096
2020-07-19T00:27:32.038725+00:00 host app kafka_consumer.1 - Discovered cluster metadata; nodes: ec2-redacted.compute-1.amazonaws.com:9096 (node_id=0), ec2-redacted.compute-1.amazonaws.com:9096 (node_id=2), ec2-redacted.compute-1.amazonaws.com:9096 (node_id=3)
2020-07-19T00:27:33.864019+00:00 host app kafka_consumer.1 - There are no partitions to fetch from, sleeping for 5s
2020-07-19T00:27:38.864331+00:00 host app kafka_consumer.1 - There are no partitions to fetch from, sleeping for 5s
2020-07-19T00:27:42.356492+00:00 host app kafka_consumer.1 - [join_group] Timed out while waiting for response 2
2020-07-19T00:27:42.356803+00:00 host app kafka_consumer.1 - Connection error while trying to join group `performance_stats_es`; retrying...
2020-07-19T00:27:43.358034+00:00 host app kafka_consumer.1 - Joining group `performance_stats_es`

Then like this:

2020-07-19T00:28:18.136462+00:00 host app kafka_consumer.1 - Joining group `performance_stats_es`
2020-07-19T00:28:18.136569+00:00 host app kafka_consumer.1 - Fetching cluster metadata from kafka+ssl://ec2-redacted.compute-1.amazonaws.com:9096
2020-07-19T00:28:18.413302+00:00 host app kafka_consumer.1 - Discovered cluster metadata; nodes: ec2-redacted.compute-1.amazonaws.com:9096 (node_id=0), ec2-redacted.compute-1.amazonaws.com:9096 (node_id=2), ec2-redacted.compute-1.amazonaws.com:9096 (node_id=3)
2020-07-19T00:28:18.866894+00:00 host app kafka_consumer.1 - There are no partitions to fetch from, sleeping for 5s
2020-07-19T00:28:20.818791+00:00 host app kafka_consumer.1 - Joined group `performance_stats_es` with member id `ruby-kafka-06797ce5-c8eb-4ae1-b560-b271cea0599a`
2020-07-19T00:28:23.867229+00:00 host app kafka_consumer.1 - There are no partitions to fetch from, sleeping for 5s
2020-07-19T00:28:28.867534+00:00 host app kafka_consumer.1 - There are no partitions to fetch from, sleeping for 5s
2020-07-19T00:28:30.819315+00:00 host app kafka_consumer.1 - [sync_group] Timed out while waiting for response 3
2020-07-19T00:28:30.819636+00:00 host app kafka_consumer.1 - Connection error while trying to join group `performance_stats_es`; retrying...
2020-07-19T00:28:31.820809+00:00 host app kafka_consumer.1 - Joining group `performance_stats_es`

Later on it will also starting printing, while still trying to join the group:

2020-07-19T00:35:28.896276+00:00 host app kafka_consumer.1 - Reached max fetcher queue size (100), sleeping 1s

Which is odd because I wouldn't expect it to fetch messages if it fails to join the consumer group. Regardless, no messages are processed.

When we eventually restart the process, the SIGTERM signal terminates the fetcher thread but it still continues to try to join the consumer group until it is SIGKILL'ed.

The new process only manages to join the consumer group a couple of minutes after the previous process is SIGKILL'ed. Here is an example when that happens and from there it behaves as expected.

2020-07-19T00:40:48.629825+00:00 host app kafka_consumer.1 - Joining group `performance_stats_es`
2020-07-19T00:40:48.629905+00:00 host app kafka_consumer.1 - Fetching cluster metadata from kafka+ssl://ec2-redacted.compute-1.amazonaws.com:9096
2020-07-19T00:40:48.90397+00:00 host app kafka_consumer.1 - Discovered cluster metadata; nodes: ec2-redacted.compute-1.amazonaws.com:9096 (node_id=0), ec2-redacted.compute-1.amazonaws.com:9096 (node_id=2), ec2-redacted.compute-1.amazonaws.com:9096 (node_id=3)
2020-07-19T00:40:50.938604+00:00 host app kafka_consumer.1 - There are no partitions to fetch from, sleeping for 5s
2020-07-19T00:40:55.938981+00:00 host app kafka_consumer.1 - There are no partitions to fetch from, sleeping for 5s
2020-07-19T00:40:59.204298+00:00 host app kafka_consumer.1 - [join_group] Timed out while waiting for response 2
2020-07-19T00:40:59.204653+00:00 host app kafka_consumer.1 - Connection error while trying to join group `performance_stats_es`; retrying...
2020-07-19T00:41:00.204938+00:00 host app kafka_consumer.1 - Joining group `performance_stats_es`
2020-07-19T00:41:00.205048+00:00 host app kafka_consumer.1 - Fetching cluster metadata from kafka+ssl://ec2-redacted.compute-1.amazonaws.com:9096
2020-07-19T00:41:00.498095+00:00 host app kafka_consumer.1 - Discovered cluster metadata; nodes: ec2-redacted.compute-1.amazonaws.com:9096 (node_id=0), ec2-redacted.compute-1.amazonaws.com:9096 (node_id=2), ec2-redacted.compute-1.amazonaws.com:9096 (node_id=3)
2020-07-19T00:41:00.939355+00:00 host app kafka_consumer.1 - There are no partitions to fetch from, sleeping for 5s
2020-07-19T00:41:01.267279+00:00 host app kafka_consumer.1 - Joined group `performance_stats_es` with member id `ruby-kafka-71107678-ed6c-47cd-85a5-d6f08f8d761b`
2020-07-19T00:41:01.267292+00:00 host app kafka_consumer.1 - Chosen as leader of group `performance_stats_es`

(I will check how a normal restart behaves and see if there is anything interesting in the logs to compare with these ones.)

Should the client have a configuration to terminate itself if it can't join a group after a number of attempts? Anything else I can do to help understand what is happening here?

It appears to be a concurrency issue but I can't pinpoint where or why.

@PChambino PChambino changed the title Retrying to join a consumer group indefinitely [Bug?] Trying to join a consumer group indefinitely Jul 27, 2020
@dasch
Copy link
Contributor

dasch commented Aug 3, 2020

Are you doing something funky with threads?

@PChambino
Copy link
Author

We initialize a consumer, subscribe to a few topics, and then call each_batch and schedule Sidekiq jobs from the messages. So nothing funky with threads.

I think this started to happen after we upgraded to 1.1.0. There are at least two PRs that make changes around that area, e.g. #818 #817. Although, it is possible it was an Heroku issue since it seems to fail due to timeouts and we haven't had any issues for a full week now.

Nevertheless, the client behaviour is not great. The thread that joins the consumer group just loops forever and when consumer.stop is called the thread still doesn't exit and the process needs to be SIGKILL.

Since it is impossible to avoid network errors I was wondering if having a maximum number of attempts when trying to join a consumer group would be a good thing? In our use case, if the process died after failing to join the consumer group, it would then restart and hopefully join the group successfully after. It would avoid having to manually restart it.

@PChambino
Copy link
Author

We effectively "solved" this by terminating a consumer that fails to join a group after a certain amount of time so it auto-restarts. Not pretty but it does the trick.

Thread.new do
  sleep 10.minutes
  next if consumer.instance_variable_get(:@group).member?

  consumer.stop
  exit 1
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants