diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 745460a..0aec29d 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -221,7 +221,7 @@ def register public def run(logstash_queue) - @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") } + @runner_consumers = consumer_threads.times.map { |i| create_consumer(client_id) } @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) } @runner_threads.each { |t| t.join } end # def run diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 1fa3015..ffec2c4 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -82,7 +82,7 @@ def thread_it(kafka_input, queue) wait(timeout_seconds).for {queue.length}.to eq(num_events) expect(queue.length).to eq(num_events) kafka_input.kafka_consumers.each_with_index do |consumer, i| - expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}") + expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec") end ensure t.kill