-
Notifications
You must be signed in to change notification settings - Fork 339
Cannot get topic list in ruby-kafka v0.5.1 #493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I should add a comment in the README – Kafka 0.10.0 is not supported, you need to upgrade to Kafka 0.10.1 or higher. |
I am having the exact problem too. I also get an empty list when calling
|
@mhakeem can you paste some DEBUG logs here? |
I have pasted my code which is very similar to My code: require 'dotenv/load'
require 'kafka'
logger = Logger.new(STDERR)
# logger = Logger.new(StringIO.new)
brokers = ENV.fetch('KAFKA_BROKERS')
topic = 'simple-test'
kafka = Kafka.new(
seed_brokers: brokers,
client_id: 'simple_consumer',
logger: logger
)
logger.debug "Kafka topics: #{kafka.topics}"
consumer = kafka.consumer(group_id: 'simple-consumer-1')
consumer.subscribe(topic, start_from_beginning: true)
logger.debug "Kafka topics: #{kafka.topics}"
trap('TERM') { consumer.stop }
consumer.each_message do |msg|
puts "topic: #{msg.topic}, partition: #{msg.partition}"
puts "offset: #{msg.offset}, key: #{msg.key}, value: #{msg.value}"
puts ''
end The logs: ruby simple_consumer.rb
I, [2017-12-18T03:21:28.204147 #29332] INFO -- : Fetching cluster metadata from kafka://localhost:9092
D, [2017-12-18T03:21:28.204280 #29332] DEBUG -- : Opening connection to localhost:9092 with client id simple_consumer...
D, [2017-12-18T03:21:28.204788 #29332] DEBUG -- : Sending topic_metadata API request 1 to localhost:9092
D, [2017-12-18T03:21:28.204928 #29332] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2017-12-18T03:21:28.205886 #29332] DEBUG -- : Received response 1 from localhost:9092
I, [2017-12-18T03:21:28.205970 #29332] INFO -- : Discovered cluster metadata; nodes: ubuntu:9092 (node_id=0)
D, [2017-12-18T03:21:28.206004 #29332] DEBUG -- : Closing socket to localhost:9092
D, [2017-12-18T03:21:28.206320 #29332] DEBUG -- : Kafka topics: []
I, [2017-12-18T03:21:28.206489 #29332] INFO -- : New topics added to target list: simple-test
I, [2017-12-18T03:21:28.206557 #29332] INFO -- : Fetching cluster metadata from kafka://localhost:9092
D, [2017-12-18T03:21:28.206615 #29332] DEBUG -- : Opening connection to localhost:9092 with client id simple_consumer...
D, [2017-12-18T03:21:28.206866 #29332] DEBUG -- : Sending topic_metadata API request 1 to localhost:9092
D, [2017-12-18T03:21:28.206968 #29332] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2017-12-18T03:21:28.207903 #29332] DEBUG -- : Received response 1 from localhost:9092
I, [2017-12-18T03:21:28.207947 #29332] INFO -- : Discovered cluster metadata; nodes: ubuntu:9092 (node_id=0)
D, [2017-12-18T03:21:28.207969 #29332] DEBUG -- : Closing socket to localhost:9092
I, [2017-12-18T03:21:28.208300 #29332] INFO -- : Fetching cluster metadata from kafka://localhost:9092
D, [2017-12-18T03:21:28.208393 #29332] DEBUG -- : Opening connection to localhost:9092 with client id simple_consumer...
D, [2017-12-18T03:21:28.208573 #29332] DEBUG -- : Sending topic_metadata API request 1 to localhost:9092
D, [2017-12-18T03:21:28.208661 #29332] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2017-12-18T03:21:28.209988 #29332] DEBUG -- : Received response 1 from localhost:9092
I, [2017-12-18T03:21:28.210064 #29332] INFO -- : Discovered cluster metadata; nodes: ubuntu:9092 (node_id=0)
D, [2017-12-18T03:21:28.210093 #29332] DEBUG -- : Closing socket to localhost:9092
D, [2017-12-18T03:21:28.210148 #29332] DEBUG -- : Kafka topics: []
I, [2017-12-18T03:21:28.210189 #29332] INFO -- : Joining group `simple-consumer-1`
D, [2017-12-18T03:21:28.210217 #29332] DEBUG -- : Getting group coordinator for `simple-consumer-1`
D, [2017-12-18T03:21:28.210285 #29332] DEBUG -- : Opening connection to ubuntu:9092 with client id simple_consumer...
D, [2017-12-18T03:21:28.210501 #29332] DEBUG -- : Sending group_coordinator API request 1 to ubuntu:9092
D, [2017-12-18T03:21:28.210640 #29332] DEBUG -- : Waiting for response 1 from ubuntu:9092
D, [2017-12-18T03:21:28.211694 #29332] DEBUG -- : Received response 1 from ubuntu:9092
D, [2017-12-18T03:21:28.211768 #29332] DEBUG -- : Coordinator for group `simple-consumer-1` is 0. Connecting...
D, [2017-12-18T03:21:28.211809 #29332] DEBUG -- : Connected to coordinator: ubuntu:9092 (node_id=0) for group `simple-consumer-1`
D, [2017-12-18T03:21:28.211870 #29332] DEBUG -- : Sending join_group API request 2 to ubuntu:9092
D, [2017-12-18T03:21:28.212059 #29332] DEBUG -- : Waiting for response 2 from ubuntu:9092
D, [2017-12-18T03:21:28.214247 #29332] DEBUG -- : Received response 2 from ubuntu:9092
I, [2017-12-18T03:21:28.214339 #29332] INFO -- : Joined group `simple-consumer-1` with member id `simple_consumer-589f6a57-bf15-4ca0-a9c7-ada07bab50b0`
I, [2017-12-18T03:21:28.214371 #29332] INFO -- : Chosen as leader of group `simple-consumer-1`
D, [2017-12-18T03:21:28.214500 #29332] DEBUG -- : Sending sync_group API request 3 to ubuntu:9092
D, [2017-12-18T03:21:28.214706 #29332] DEBUG -- : Waiting for response 3 from ubuntu:9092
D, [2017-12-18T03:21:28.217944 #29332] DEBUG -- : Received response 3 from ubuntu:9092
I, [2017-12-18T03:21:28.218635 #29332] INFO -- : Partitions assigned for `simple-test`: 0
D, [2017-12-18T03:21:28.218817 #29332] DEBUG -- : Sending offset_fetch API request 4 to ubuntu:9092
D, [2017-12-18T03:21:28.219338 #29332] DEBUG -- : Waiting for response 4 from ubuntu:9092
D, [2017-12-18T03:21:28.220129 #29332] DEBUG -- : Received response 4 from ubuntu:9092
D, [2017-12-18T03:21:28.220326 #29332] DEBUG -- : Sending list_offset API request 5 to ubuntu:9092
D, [2017-12-18T03:21:28.220526 #29332] DEBUG -- : Waiting for response 5 from ubuntu:9092
D, [2017-12-18T03:21:28.221499 #29332] DEBUG -- : Received response 5 from ubuntu:9092
D, [2017-12-18T03:21:28.221694 #29332] DEBUG -- : Fetching batch from simple-test/0 starting at offset 0
D, [2017-12-18T03:21:28.221877 #29332] DEBUG -- : Sending fetch API request 6 to ubuntu:9092
D, [2017-12-18T03:21:28.222141 #29332] DEBUG -- : Waiting for response 6 from ubuntu:9092
D, [2017-12-18T03:21:28.223499 #29332] DEBUG -- : Received response 6 from ubuntu:9092
topic: simple-test, partition: 0
offset: 0, key: , value: 1
D, [2017-12-18T03:21:28.223792 #29332] DEBUG -- : Marking simple-test/0:0 as processed
I, [2017-12-18T03:21:28.223919 #29332] INFO -- : Committing offsets with recommit: simple-test/0:1
D, [2017-12-18T03:21:28.224046 #29332] DEBUG -- : Sending offset_commit API request 7 to ubuntu:9092
D, [2017-12-18T03:21:28.224345 #29332] DEBUG -- : Waiting for response 7 from ubuntu:9092
D, [2017-12-18T03:21:28.229286 #29332] DEBUG -- : Received response 7 from ubuntu:9092
topic: simple-test, partition: 0
offset: 1, key: , value: 2
D, [2017-12-18T03:21:28.229524 #29332] DEBUG -- : Marking simple-test/0:1 as processed
topic: simple-test, partition: 0
offset: 2, key: , value: 3
D, [2017-12-18T03:21:28.230253 #29332] DEBUG -- : Marking simple-test/0:2 as processed
topic: simple-test, partition: 0
offset: 3, key: , value: 4
D, [2017-12-18T03:21:28.230338 #29332] DEBUG -- : Marking simple-test/0:3 as processed
topic: simple-test, partition: 0
offset: 4, key: , value: 5
D, [2017-12-18T03:21:28.230407 #29332] DEBUG -- : Marking simple-test/0:4 as processed
topic: simple-test, partition: 0
offset: 5, key: , value: 6
D, [2017-12-18T03:21:28.230481 #29332] DEBUG -- : Marking simple-test/0:5 as processed
topic: simple-test, partition: 0
offset: 6, key: , value: 7
D, [2017-12-18T03:21:28.230527 #29332] DEBUG -- : Marking simple-test/0:6 as processed
topic: simple-test, partition: 0
offset: 7, key: , value: 8
D, [2017-12-18T03:21:28.230562 #29332] DEBUG -- : Marking simple-test/0:7 as processed
topic: simple-test, partition: 0
offset: 8, key: , value: 9
D, [2017-12-18T03:21:28.230604 #29332] DEBUG -- : Marking simple-test/0:8 as processed
topic: simple-test, partition: 0
offset: 9, key: , value: 10
D, [2017-12-18T03:21:28.230639 #29332] DEBUG -- : Marking simple-test/0:9 as processed
D, [2017-12-18T03:21:28.230693 #29332] DEBUG -- : Fetching batch from simple-test/0 starting at offset 10
D, [2017-12-18T03:21:28.230810 #29332] DEBUG -- : Sending fetch API request 8 to ubuntu:9092
D, [2017-12-18T03:21:28.230992 #29332] DEBUG -- : Waiting for response 8 from ubuntu:9092
^CW, [2017-12-18T03:21:29.125595 #29332] WARN -- : Received signal , shutting down
I, [2017-12-18T03:21:29.125763 #29332] INFO -- : Committing offsets: simple-test/0:10
D, [2017-12-18T03:21:29.125863 #29332] DEBUG -- : Sending offset_commit API request 9 to ubuntu:9092
D, [2017-12-18T03:21:29.126194 #29332] DEBUG -- : Waiting for response 9 from ubuntu:9092
D, [2017-12-18T03:21:29.233907 #29332] DEBUG -- : Received response 8 from ubuntu:9092
E, [2017-12-18T03:21:29.234030 #29332] ERROR -- : Received out-of-order response id 8, was expecting 9
D, [2017-12-18T03:21:29.234065 #29332] DEBUG -- : Waiting for response 9 from ubuntu:9092
D, [2017-12-18T03:21:29.236104 #29332] DEBUG -- : Received response 9 from ubuntu:9092
I, [2017-12-18T03:21:29.236209 #29332] INFO -- : Leaving group `simple-consumer-1`
D, [2017-12-18T03:21:29.236303 #29332] DEBUG -- : Sending leave_group API request 10 to ubuntu:9092
D, [2017-12-18T03:21:29.236476 #29332] DEBUG -- : Waiting for response 10 from ubuntu:9092
D, [2017-12-18T03:21:29.238929 #29332] DEBUG -- : Received response 10 from ubuntu:9092 |
@mhakeem can you try this? require 'kafka'
logger = Logger.new(STDERR)
brokers = ENV.fetch('KAFKA_BROKERS')
kafka = Kafka.new(
seed_brokers: brokers,
client_id: 'simple_consumer',
logger: logger
)
puts kafka.topics.inspect |
Here is the log. Still an empty list, sadly. ruby simple_consumer.rb
I, [2017-12-19T02:58:09.745552 #4947] INFO -- : Fetching cluster metadata from kafka://localhost:9092
D, [2017-12-19T02:58:09.745686 #4947] DEBUG -- : Opening connection to localhost:9092 with client id simple_consumer...
D, [2017-12-19T02:58:09.746237 #4947] DEBUG -- : Sending topic_metadata API request 1 to localhost:9092
D, [2017-12-19T02:58:09.746370 #4947] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2017-12-19T02:58:09.747543 #4947] DEBUG -- : Received response 1 from localhost:9092
I, [2017-12-19T02:58:09.747699 #4947] INFO -- : Discovered cluster metadata; nodes: ubuntu:9092 (node_id=0)
D, [2017-12-19T02:58:09.747736 #4947] DEBUG -- : Closing socket to localhost:9092
[] I also tried printing #<Kafka::Client:0x000056462b6dd658 @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @instrumenter=#<Kafka::Instrumenter:0x000056462b6dd5b8 @default_payload={:client_id=>"simple_consumer"}, @backend=nil>, @seed_brokers=[#<URI::Generic kafka://localhost:9092>], @connection_builder=#<Kafka::ConnectionBuilder:0x000056462b6dcf50 @client_id="simple_consumer", @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @instrumenter=#<Kafka::Instrumenter:0x000056462b6dd5b8 @default_payload={:client_id=>"simple_consumer"}, @backend=nil>, @connect_timeout=nil, @socket_timeout=nil, @ssl_context=nil, @sasl_authenticator=#<Kafka::SaslAuthenticator:0x000056462b6dd180 @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @plain=#<Kafka::Sasl::Plain:0x000056462b6dd0e0 @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @authzid="", @username=nil, @password=nil>, @gssapi=#<Kafka::Sasl::Gssapi:0x000056462b6dd068 @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @principal=nil, @keytab=nil>, @scram=#<Kafka::Sasl::Scram:0x000056462b6dcfc8 @username=nil, @password=nil, @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>>>>, @cluster=#<Kafka::Cluster:0x000056462b6dce10 @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @seed_brokers=[#<URI::Generic kafka://localhost:9092>], @broker_pool=#<Kafka::BrokerPool:0x000056462b6dced8 @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @connection_builder=#<Kafka::ConnectionBuilder:0x000056462b6dcf50 @client_id="simple_consumer", @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @instrumenter=#<Kafka::Instrumenter:0x000056462b6dd5b8 @default_payload={:client_id=>"simple_consumer"}, @backend=nil>, @connect_timeout=nil, @socket_timeout=nil, @ssl_context=nil, @sasl_authenticator=#<Kafka::SaslAuthenticator:0x000056462b6dd180 @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @plain=#<Kafka::Sasl::Plain:0x000056462b6dd0e0 @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @authzid="", @username=nil, @password=nil>, @gssapi=#<Kafka::Sasl::Gssapi:0x000056462b6dd068 @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>, @principal=nil, @keytab=nil>, @scram=#<Kafka::Sasl::Scram:0x000056462b6dcfc8 @username=nil, @password=nil, @logger=#<Logger:0x000056462b6ddb80 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x000056462b6dda90 @datetime_format=nil>, @formatter=nil, @logdev=#<Logger::LogDevice:0x000056462b6dd9f0 @shift_period_suffix=nil, @shift_size=nil, @shift_age=nil, @filename=nil, @dev=#<IO:<STDERR>>, @mon_owner=nil, @mon_count=0, @mon_mutex=#<Thread::Mutex:0x000056462b6dd928>>>>>>, @brokers={}>, @cluster_info=nil, @stale=true, @target_topics=#<Set: {}>>> |
|
Yes because 1) the consumer works fine when reading data from any given topics, but no topics are shown when running |
In the protocol docs there's a subtle change between version 0 and 1 of the metadata request format, which is used to discover the topics in a cluster. Before:
After:
It could be that it's now possible to pass in |
Found a fix. |
Awesome! Thanks for fixing it. It lists all topics in Kafka now. Just FYI, I had to install snappy & extlz4 gems because the new beta version of the gem raised exceptions for not having them. Besides that, thanks a lot :) |
Ah, that's a bug – I'll take a look at that. |
If this is a bug report, please fill out the following:
Please verify that the problem you're seeing hasn't been fixed by the current
master
of ruby-kafka.Steps to reproduce
Expected outcome
When I use ruby-kafka v0.5.0 or v0.4+ and run the same code, those return
But v0.5.1 client returns nothing.
I get the info of Compatibility from v0.5.1 Readme.md.
Does ruby-kafak-v0.5.1 not support kafka-v0.10.0?
The text was updated successfully, but these errors were encountered: