We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent f799f61 commit 42c6c22Copy full SHA for 42c6c22
lib/fluent/plugin/kafka_producer_ext.rb
@@ -254,9 +254,8 @@ def assign_partitions!
254
255
@pending_message_queue.each do |message|
256
partition = message.partition
257
-
+ partition_count = @cluster.partitions_for(message.topic).count
258
begin
259
- partition_count = @cluster.partitions_for(message.topic).count
260
261
if partition.nil?
262
partition = @partitioner.call(partition_count, message)
lib/fluent/plugin/out_kafka2.rb
@@ -403,6 +403,7 @@ def write(chunk)
403
rescue Kafka::UnknownTopicOrPartition
404
if @use_default_for_unknown_topic && topic != @default_topic
405
log.warn "'#{topic}' topic not found. Retry with '#{default_topic}' topic"
406
+ producer.clear_buffer
407
topic = @default_topic
408
retry
409
end
0 commit comments