Skip to content

Commit 9121246

Browse files
committed
Fix multiple [Producer name] tags on failures
On `rescue Kafka::Error` we're rerunning the whole method block including `@logger.push_tags(@producer.to_s)`, but clearing tags only in `ensure` which is not run on `retry`
1 parent 8a18f9e commit 9121246

File tree

1 file changed

+36
-34
lines changed

1 file changed

+36
-34
lines changed

lib/kafka/async_producer.rb

+36-34
Original file line numberDiff line numberDiff line change
@@ -213,44 +213,46 @@ def run
213213
@logger.info "Starting async producer in the background..."
214214

215215
loop do
216-
operation, payload = @queue.pop
217-
218-
case operation
219-
when :produce
220-
produce(payload[0], **payload[1])
221-
deliver_messages if threshold_reached?
222-
when :deliver_messages
223-
deliver_messages
224-
when :shutdown
225-
begin
226-
# Deliver any pending messages first.
227-
@producer.deliver_messages
228-
rescue Error => e
229-
@logger.error("Failed to deliver messages during shutdown: #{e.message}")
230-
231-
@instrumenter.instrument("drop_messages.async_producer", {
232-
message_count: @producer.buffer_size + @queue.size,
233-
})
216+
begin
217+
operation, payload = @queue.pop
218+
219+
case operation
220+
when :produce
221+
produce(payload[0], **payload[1])
222+
deliver_messages if threshold_reached?
223+
when :deliver_messages
224+
deliver_messages
225+
when :shutdown
226+
begin
227+
# Deliver any pending messages first.
228+
@producer.deliver_messages
229+
rescue Error => e
230+
@logger.error("Failed to deliver messages during shutdown: #{e.message}")
231+
232+
@instrumenter.instrument("drop_messages.async_producer", {
233+
message_count: @producer.buffer_size + @queue.size,
234+
})
235+
end
236+
237+
# Stop the run loop.
238+
break
239+
else
240+
raise "Unknown operation #{operation.inspect}"
234241
end
242+
rescue Kafka::Error => e
243+
@logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
244+
@logger.info "Restarting in 10 seconds..."
235245

236-
# Stop the run loop.
237-
break
238-
else
239-
raise "Unknown operation #{operation.inspect}"
246+
sleep 10
247+
retry
248+
rescue Exception => e
249+
@logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
250+
@logger.error "Async producer crashed!"
251+
ensure
252+
@producer.shutdown
253+
@logger.pop_tags
240254
end
241255
end
242-
rescue Kafka::Error => e
243-
@logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
244-
@logger.info "Restarting in 10 seconds..."
245-
246-
sleep 10
247-
retry
248-
rescue Exception => e
249-
@logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
250-
@logger.error "Async producer crashed!"
251-
ensure
252-
@producer.shutdown
253-
@logger.pop_tags
254256
end
255257

256258
private

0 commit comments

Comments
 (0)