From ab0cca92b4c2e8e9d3fcf1e3925621818c11fcd8 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Thu, 10 Sep 2020 11:58:29 +0200 Subject: [PATCH 1/5] Refactor: keep same ts for serializer writes --- lib/filewatch/sincedb_collection.rb | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/filewatch/sincedb_collection.rb b/lib/filewatch/sincedb_collection.rb index a19ed3ca..c37d584b 100644 --- a/lib/filewatch/sincedb_collection.rb +++ b/lib/filewatch/sincedb_collection.rb @@ -183,8 +183,8 @@ def watched_file_unset?(key) private def flush_at_interval - now = Time.now.to_i - delta = now - @sincedb_last_write + now = Time.now + delta = now.to_i - @sincedb_last_write if delta >= @settings.sincedb_write_interval logger.debug("writing sincedb (delta since last write = #{delta})") sincedb_write(now) @@ -210,33 +210,33 @@ def set_key_value(key, value) end end - def sincedb_write(time = Time.now.to_i) - logger.trace("sincedb_write: to: #{path}") + def sincedb_write(time = Time.now) + logger.trace("sincedb_write: #{path} (time = #{time})") begin - @write_method.call + @write_method.call(time) @serializer.expired_keys.each do |key| @sincedb[key].unset_watched_file delete(key) logger.trace? && logger.trace("sincedb_write: cleaned", :key => key) end - @sincedb_last_write = time + @sincedb_last_write = time.to_i @write_requested = false rescue Errno::EACCES # no file handles free perhaps # maybe it will work next time - logger.trace("sincedb_write: error: #{path}: #{$!}") + logger.trace("sincedb_write: #{path} error: #{$!}") end end - def atomic_write + def atomic_write(time) FileHelper.write_atomically(@full_path) do |io| - @serializer.serialize(@sincedb, io) + @serializer.serialize(@sincedb, io, time.to_f) end end - def non_atomic_write + def non_atomic_write(time) IO.open(IO.sysopen(@full_path, "w+")) do |io| - @serializer.serialize(@sincedb, io) + @serializer.serialize(@sincedb, io, time.to_f) end end end From b767c0f4c6db418471929e5140cd164369376953 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Thu, 10 Sep 2020 13:15:44 +0200 Subject: [PATCH 2/5] Fix: sincedb not respecting clean_after for now the plugin relied on other files coming in or getting new content - would cause the sincedb flush to eventually trigger (due changes). however, if there isn't any activity going on we still need to cleanup sincedb periodically to respect the clean_after setting... --- lib/filewatch/sincedb_collection.rb | 4 +- lib/filewatch/watch.rb | 3 ++ spec/inputs/file_read_spec.rb | 68 +++++++++++++++++++++++------ 3 files changed, 60 insertions(+), 15 deletions(-) diff --git a/lib/filewatch/sincedb_collection.rb b/lib/filewatch/sincedb_collection.rb index c37d584b..2d3458ae 100644 --- a/lib/filewatch/sincedb_collection.rb +++ b/lib/filewatch/sincedb_collection.rb @@ -180,8 +180,6 @@ def watched_file_unset?(key) get(key).watched_file.nil? end - private - def flush_at_interval now = Time.now delta = now.to_i - @sincedb_last_write @@ -191,6 +189,8 @@ def flush_at_interval end end + private + def handle_association(sincedb_value, watched_file) watched_file.update_bytes_read(sincedb_value.position) sincedb_value.set_watched_file(watched_file) diff --git a/lib/filewatch/watch.rb b/lib/filewatch/watch.rb index 48e0e51b..8bee1208 100644 --- a/lib/filewatch/watch.rb +++ b/lib/filewatch/watch.rb @@ -51,7 +51,10 @@ def subscribe(observer, sincedb_collection) glob = 0 end break if quit? + # NOTE: maybe the plugin should validate stat_interval <= sincedb_write_interval <= sincedb_clean_after sleep(@settings.stat_interval) + # we need to check potential expired keys (sincedb_clean_after) periodically + sincedb_collection.flush_at_interval end sincedb_collection.write_if_requested # does nothing if no requests to write were lodged. @watched_files_collection.close_all diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 14fc9332..6f1debbe 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -301,25 +301,67 @@ watched_files = plugin.watcher.watch.watched_files_collection expect( watched_files ).to be_empty end + end - private + describe 'sincedb cleanup' do - def wait_for_start_processing(run_thread, timeout: 1.0) - begin - Timeout.timeout(timeout) do - sleep(0.01) while run_thread.status != 'sleep' - sleep(timeout) unless plugin.queue - end - rescue Timeout::Error - raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue - else - raise "plugin did not start processing" unless plugin.queue + let(:options) do + super.merge( + 'sincedb_path' => sincedb_path, + 'sincedb_clean_after' => '1.0 seconds', + 'sincedb_write_interval' => 0.5, + 'stat_interval' => 0.1, + ) + end + + let(:sincedb_path) { "#{temp_directory}/.sincedb" } + + let(:sample_file) { File.join(temp_directory, "sample.txt") } + + before do + plugin.register + @run_thread = Thread.new(plugin) do |plugin| + Thread.current.abort_on_exception = true + plugin.run queue end + + File.open(sample_file, 'w') { |fd| fd.write("line1\nline2\n") } + + wait_for_start_processing(@run_thread) + end + + after { plugin.stop } + + it 'cleans up sincedb entry' do + wait_for_file_removal(sample_file) # watched discovery + + sincedb_content = File.read(sincedb_path).strip + expect( sincedb_content ).to_not be_empty + + sleep(1.5) # > sincedb_clean_after + + sincedb_content = File.read(sincedb_path).strip + expect( sincedb_content ).to be_empty end - def wait_for_file_removal(path, timeout: 3 * interval) - wait(timeout).for { File.exist?(path) }.to be_falsey + end + + private + + def wait_for_start_processing(run_thread, timeout: 1.0) + begin + Timeout.timeout(timeout) do + sleep(0.01) while run_thread.status != 'sleep' + sleep(timeout) unless plugin.queue + end + rescue Timeout::Error + raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue + else + raise "plugin did not start processing" unless plugin.queue end + end + def wait_for_file_removal(path, timeout: 3 * interval) + wait(timeout).for { File.exist?(path) }.to be_falsey end end From 62f2f2d580930b5a05657492f0d9d21a74f25e81 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Thu, 10 Sep 2020 13:16:11 +0200 Subject: [PATCH 3/5] Refactor: remove (confusing) un-used method --- lib/filewatch/observing_base.rb | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/lib/filewatch/observing_base.rb b/lib/filewatch/observing_base.rb index ecdf7f7b..06cd3170 100644 --- a/lib/filewatch/observing_base.rb +++ b/lib/filewatch/observing_base.rb @@ -83,15 +83,5 @@ def quit # sincedb_write("shutting down") end - # close_file(path) is to be used by external code - # when it knows that it is completely done with a file. - # Other files or folders may still be being watched. - # Caution, once unwatched, a file can't be watched again - # unless a new instance of this class begins watching again. - # The sysadmin should rename, move or delete the file. - def close_file(path) - @watch.unwatch(path) - sincedb_write - end end end From 5484bf183bc1fae7a605a658d5f90b245457f7b4 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Thu, 10 Sep 2020 13:38:59 +0200 Subject: [PATCH 4/5] Test: try harder to verify (due slow CI) --- spec/inputs/file_read_spec.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 6f1debbe..d1533bd0 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -309,7 +309,7 @@ super.merge( 'sincedb_path' => sincedb_path, 'sincedb_clean_after' => '1.0 seconds', - 'sincedb_write_interval' => 0.5, + 'sincedb_write_interval' => 0.25, 'stat_interval' => 0.1, ) end @@ -338,10 +338,12 @@ sincedb_content = File.read(sincedb_path).strip expect( sincedb_content ).to_not be_empty - sleep(1.5) # > sincedb_clean_after + Stud.try(3.times) do + sleep(1.5) # > sincedb_clean_after - sincedb_content = File.read(sincedb_path).strip - expect( sincedb_content ).to be_empty + sincedb_content = File.read(sincedb_path).strip + expect( sincedb_content ).to be_empty + end end end From a5a002e33b482fb653e7f373c9e32626fa8b0dc0 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Thu, 10 Sep 2020 14:27:17 +0200 Subject: [PATCH 5/5] Changelog and version bump --- CHANGELOG.md | 3 +++ logstash-input-file.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7eeb3876..51f7abc2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 4.2.2 + - Fix: sincedb_clean_after not being respected [#276](https://github.com/logstash-plugins/logstash-input-file/pull/276) + ## 4.2.1 - Fix: skip sincedb eviction if read mode completion deletes file during flush [#273](https://github.com/logstash-plugins/logstash-input-file/pull/273) diff --git a/logstash-input-file.gemspec b/logstash-input-file.gemspec index 2a1d85b5..bc7afa67 100644 --- a/logstash-input-file.gemspec +++ b/logstash-input-file.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-file' - s.version = '4.2.1' + s.version = '4.2.2' s.licenses = ['Apache-2.0'] s.summary = "Streams events from files" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"