Skip to content

Commit 68b8761

Browse files
committed
Fix multiple [Producer name] tags on failures
On `rescue Kafka::Error` we're rerunning the whole method block including `@logger.push_tags(@producer.to_s)`, but clearing tags only in `ensure` which is not run on `retry`
1 parent 8a18f9e commit 68b8761

File tree

1 file changed

+37
-31
lines changed

1 file changed

+37
-31
lines changed

lib/kafka/async_producer.rb

+37-31
Original file line numberDiff line numberDiff line change
@@ -212,31 +212,45 @@ def run
212212
@logger.push_tags(@producer.to_s)
213213
@logger.info "Starting async producer in the background..."
214214

215+
do_loop
216+
rescue Exception => e
217+
@logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
218+
@logger.error "Async producer crashed!"
219+
ensure
220+
@producer.shutdown
221+
@logger.pop_tags
222+
end
223+
224+
private
225+
226+
def do_loop
215227
loop do
216-
operation, payload = @queue.pop
217-
218-
case operation
219-
when :produce
220-
produce(payload[0], **payload[1])
221-
deliver_messages if threshold_reached?
222-
when :deliver_messages
223-
deliver_messages
224-
when :shutdown
225-
begin
226-
# Deliver any pending messages first.
227-
@producer.deliver_messages
228-
rescue Error => e
229-
@logger.error("Failed to deliver messages during shutdown: #{e.message}")
230-
231-
@instrumenter.instrument("drop_messages.async_producer", {
232-
message_count: @producer.buffer_size + @queue.size,
233-
})
228+
begin
229+
operation, payload = @queue.pop
230+
231+
case operation
232+
when :produce
233+
produce(payload[0], **payload[1])
234+
deliver_messages if threshold_reached?
235+
when :deliver_messages
236+
deliver_messages
237+
when :shutdown
238+
begin
239+
# Deliver any pending messages first.
240+
@producer.deliver_messages
241+
rescue Error => e
242+
@logger.error("Failed to deliver messages during shutdown: #{e.message}")
243+
244+
@instrumenter.instrument("drop_messages.async_producer", {
245+
message_count: @producer.buffer_size + @queue.size,
246+
})
247+
end
248+
249+
# Stop the run loop.
250+
break
251+
else
252+
raise "Unknown operation #{operation.inspect}"
234253
end
235-
236-
# Stop the run loop.
237-
break
238-
else
239-
raise "Unknown operation #{operation.inspect}"
240254
end
241255
end
242256
rescue Kafka::Error => e
@@ -245,16 +259,8 @@ def run
245259

246260
sleep 10
247261
retry
248-
rescue Exception => e
249-
@logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
250-
@logger.error "Async producer crashed!"
251-
ensure
252-
@producer.shutdown
253-
@logger.pop_tags
254262
end
255263

256-
private
257-
258264
def produce(value, **kwargs)
259265
retries = 0
260266
begin

0 commit comments

Comments
 (0)