Skip to content

Commit 5f22fbd

Browse files
committed
Create a unique Mutex for each AsyncProducer
Currently we reuse the same mutex for every instance of `AsyncProducer`. This means that one producer can block another producer from producing if it holds the mutex. This is unnecessary because the details of whether a particular producer's threads are running are irrelevant to every other producer.
1 parent 25e24f9 commit 5f22fbd

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

Diff for: lib/kafka/async_producer.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ module Kafka
5959
# producer.shutdown
6060
#
6161
class AsyncProducer
62-
THREAD_MUTEX = Mutex.new
63-
6462
# Initializes a new AsyncProducer.
6563
#
6664
# @param sync_producer [Kafka::Producer] the synchronous producer that should
@@ -94,6 +92,8 @@ def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, deli
9492

9593
# The timer will no-op if the delivery interval is zero.
9694
@timer = Timer.new(queue: @queue, interval: delivery_interval)
95+
96+
@thread_mutex = Mutex.new
9797
end
9898

9999
# Produces a message to the specified topic.
@@ -152,12 +152,12 @@ def shutdown
152152
private
153153

154154
def ensure_threads_running!
155-
THREAD_MUTEX.synchronize do
155+
@thread_mutex.synchronize do
156156
@worker_thread = nil unless @worker_thread && @worker_thread.alive?
157157
@worker_thread ||= Thread.new { @worker.run }
158158
end
159159

160-
THREAD_MUTEX.synchronize do
160+
@thread_mutex.synchronize do
161161
@timer_thread = nil unless @timer_thread && @timer_thread.alive?
162162
@timer_thread ||= Thread.new { @timer.run }
163163
end

0 commit comments

Comments
 (0)