Skip to content

Commit 6a2145b

Browse files
committed
logstash-plugins#270 Support periodic manual commits
1 parent 0dc3405 commit 6a2145b

File tree

1 file changed

+17
-2
lines changed

1 file changed

+17
-2
lines changed

lib/logstash/inputs/kafka.rb

+17-2
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
212212
# `key`: A ByteBuffer containing the message key
213213
# `timestamp`: The timestamp of this message
214214
config :decorate_events, :validate => :boolean, :default => false
215-
215+
config :manual_commit_interval_ms, :validate => :string
216216

217217
public
218218
def register
@@ -221,6 +221,7 @@ def register
221221

222222
public
223223
def run(logstash_queue)
224+
@manual_commit_interval_ms = manual_commit_interval_ms.to_i
224225
@runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") }
225226
@runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) }
226227
@runner_threads.each { |t| t.join }
@@ -247,6 +248,7 @@ def thread_runner(logstash_queue, consumer)
247248
else
248249
consumer.subscribe(topics);
249250
end
251+
last_commit_time = timestamp_ms
250252
codec_instance = @codec.clone
251253
while !stop?
252254
records = consumer.poll(poll_timeout_ms)
@@ -266,8 +268,9 @@ def thread_runner(logstash_queue, consumer)
266268
end
267269
end
268270
# Manual offset commit
269-
if @enable_auto_commit == "false"
271+
if has_to_commit?(last_commit_time)
270272
consumer.commitSync
273+
last_commit_time = timestamp_ms
271274
end
272275
end
273276
rescue org.apache.kafka.common.errors.WakeupException => e
@@ -354,4 +357,16 @@ def set_sasl_config(props)
354357

355358
props.put("sasl.kerberos.service.name",sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil?
356359
end
360+
361+
def timestamp_ms
362+
(Time.now.to_f * 1000).to_i
363+
end
364+
365+
def has_to_commit?(last_commit_time)
366+
# If auto_commit is enable we just leave the commit to the client library on poll and close actions
367+
return false if @enable_auto_commit == "false"
368+
369+
# If auto_commit is disable, we need to commit, we will do it depending on the manual_commit_interval option
370+
@manual_commit_interval_ms <= 0 || (last_commit_time + @manual_commit_interval_ms) < timestamp_ms
371+
end
357372
end #class LogStash::Inputs::Kafka

0 commit comments

Comments
 (0)