Skip to content

Various AsyncProducer improvements #855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 4, 2020
27 changes: 18 additions & 9 deletions lib/kafka/async_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ module Kafka
# producer.shutdown
#
class AsyncProducer
THREAD_MUTEX = Mutex.new

# Initializes a new AsyncProducer.
#
# @param sync_producer [Kafka::Producer] the synchronous producer that should
Expand Down Expand Up @@ -94,6 +92,8 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli

# The timer will no-op if the delivery interval is zero.
@timer = Timer.new(queue: @queue, interval: delivery_interval)

@thread_mutex = Mutex.new
end

# Produces a message to the specified topic.
Expand Down Expand Up @@ -131,6 +131,8 @@ def produce(value, topic:, **options)
# @see Kafka::Producer#deliver_messages
# @return [nil]
def deliver_messages
ensure_threads_running!

@queue << [:deliver_messages, nil]

nil
Expand All @@ -142,6 +144,8 @@ def deliver_messages
# @see Kafka::Producer#shutdown
# @return [nil]
def shutdown
ensure_threads_running!

@timer_thread && @timer_thread.exit
@queue << [:shutdown, nil]
@worker_thread && @worker_thread.join
Expand All @@ -152,17 +156,22 @@ def shutdown
private

def ensure_threads_running!
THREAD_MUTEX.synchronize do
@worker_thread = nil unless @worker_thread && @worker_thread.alive?
@worker_thread ||= Thread.new { @worker.run }
end
return if worker_thread_alive? && timer_thread_alive?

THREAD_MUTEX.synchronize do
@timer_thread = nil unless @timer_thread && @timer_thread.alive?
@timer_thread ||= Thread.new { @timer.run }
@thread_mutex.synchronize do
@worker_thread = Thread.new { @worker.run } unless worker_thread_alive?
@timer_thread = Thread.new { @timer.run } unless timer_thread_alive?
end
end

def worker_thread_alive?
!!@worker_thread && @worker_thread.alive?
end

def timer_thread_alive?
!!@timer_thread && @timer_thread.alive?
end

def buffer_overflow(topic, message)
@instrumenter.instrument("buffer_overflow.async_producer", {
topic: topic,
Expand Down