diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index 7b575a994..a2b1f58cf 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -59,8 +59,6 @@ module Kafka # producer.shutdown # class AsyncProducer - THREAD_MUTEX = Mutex.new - # Initializes a new AsyncProducer. # # @param sync_producer [Kafka::Producer] the synchronous producer that should @@ -94,6 +92,8 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli # The timer will no-op if the delivery interval is zero. @timer = Timer.new(queue: @queue, interval: delivery_interval) + + @thread_mutex = Mutex.new end # Produces a message to the specified topic. @@ -131,6 +131,8 @@ def produce(value, topic:, **options) # @see Kafka::Producer#deliver_messages # @return [nil] def deliver_messages + ensure_threads_running! + @queue << [:deliver_messages, nil] nil @@ -142,6 +144,8 @@ def deliver_messages # @see Kafka::Producer#shutdown # @return [nil] def shutdown + ensure_threads_running! + @timer_thread && @timer_thread.exit @queue << [:shutdown, nil] @worker_thread && @worker_thread.join @@ -152,17 +156,22 @@ def shutdown private def ensure_threads_running! - THREAD_MUTEX.synchronize do - @worker_thread = nil unless @worker_thread && @worker_thread.alive? - @worker_thread ||= Thread.new { @worker.run } - end + return if worker_thread_alive? && timer_thread_alive? - THREAD_MUTEX.synchronize do - @timer_thread = nil unless @timer_thread && @timer_thread.alive? - @timer_thread ||= Thread.new { @timer.run } + @thread_mutex.synchronize do + @worker_thread = Thread.new { @worker.run } unless worker_thread_alive? + @timer_thread = Thread.new { @timer.run } unless timer_thread_alive? end end + def worker_thread_alive? + !!@worker_thread && @worker_thread.alive? + end + + def timer_thread_alive? + !!@timer_thread && @timer_thread.alive? + end + def buffer_overflow(topic, message) @instrumenter.instrument("buffer_overflow.async_producer", { topic: topic,