From 309cdd8888ac442a4df995c0eb4eef5f816ab741 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Tue, 28 Apr 2020 14:06:30 +0200 Subject: [PATCH 1/4] Fix: release watched files on complete (in read-mode) not doing so leads to a steady increase in watched collection's size over time (esp. in use-cases where user is pulling in new files). the left-over file is never to be processed again - it's being deleted anyway using the completion handler. --- CHANGELOG.md | 3 + lib/filewatch/watch.rb | 2 +- lib/filewatch/watched_files_collection.rb | 14 ++-- .../inputs/delete_completed_file_handler.rb | 5 ++ lib/logstash/inputs/file.rb | 22 ++++-- logstash-input-file.gemspec | 2 +- .../watched_files_collection_spec.rb | 4 +- spec/inputs/file_read_spec.rb | 74 +++++++++++++++++++ 8 files changed, 109 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44e9c2b..81f3843 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 4.1.18 + - Fix: release watched files on completion (in read-mode) [#271](https://github.com/logstash-plugins/logstash-input-file/pull/271) + ## 4.1.17 - Added configuration setting `check_archive_validity` settings to enable gzipped files verification, issue diff --git a/lib/filewatch/watch.rb b/lib/filewatch/watch.rb index 3c88dd1..6cd4f9d 100644 --- a/lib/filewatch/watch.rb +++ b/lib/filewatch/watch.rb @@ -67,7 +67,7 @@ def iterate_on_state watched_files = @watched_files_collection.values @processor.process_all_states(watched_files) ensure - @watched_files_collection.delete(@processor.deletable_filepaths) + @watched_files_collection.remove_paths(@processor.deletable_filepaths) @processor.deletable_filepaths.clear end end # def each diff --git a/lib/filewatch/watched_files_collection.rb b/lib/filewatch/watched_files_collection.rb index 475d836..153cd48 100644 --- a/lib/filewatch/watched_files_collection.rb +++ b/lib/filewatch/watched_files_collection.rb @@ -15,13 +15,17 @@ def add(watched_file) @sort_method.call end - def delete(paths) - Array(paths).each do |f| - index = @pointers.delete(f) - @files.delete_at(index) - refresh_pointers + def remove_paths(paths) + removed_files = Array(paths).map do |path| + index = @pointers.delete(path) + if index + watched_file = @files.delete_at(index) + refresh_pointers + watched_file + end end @sort_method.call + removed_files end def close_all diff --git a/lib/logstash/inputs/delete_completed_file_handler.rb b/lib/logstash/inputs/delete_completed_file_handler.rb index c6a3e91..6f8e8d0 100644 --- a/lib/logstash/inputs/delete_completed_file_handler.rb +++ b/lib/logstash/inputs/delete_completed_file_handler.rb @@ -2,8 +2,13 @@ module LogStash module Inputs class DeleteCompletedFileHandler + def initialize(watch) + @watch = watch + end + def handle(path) Pathname.new(path).unlink rescue nil + @watch.watched_files_collection.remove_paths([path]) end end end end diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index b83646e..b755d28 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -247,6 +247,8 @@ def validate_value(value, validator) end end + attr_reader :queue, :watcher # used in specs + def register require "addressable/uri" require "digest/md5" @@ -274,8 +276,6 @@ def register :check_archive_validity => @check_archive_validity, } - @completed_file_handlers = [] - @path.each do |path| if Pathname.new(path).relative? raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}") @@ -319,12 +319,6 @@ def register @watcher_class = FileWatch::ObservingTail else @watcher_class = FileWatch::ObservingRead - if @file_completed_action.include?('log') - @completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path) - end - if @file_completed_action.include?('delete') - @completed_file_handlers << DeleteCompletedFileHandler.new - end end @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) @completely_stopped = Concurrent::AtomicBoolean.new @@ -344,7 +338,19 @@ def start_processing # if the pipeline restarts this input, # make sure previous files are closed stop + @watcher = @watcher_class.new(@filewatch_config) + + @completed_file_handlers = [] + if read_mode? + if @file_completed_action.include?('log') + @completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path) + end + if @file_completed_action.include?('delete') + @completed_file_handlers << DeleteCompletedFileHandler.new(@watcher.watch) + end + end + @path.each { |path| @watcher.watch_this(path) } end diff --git a/logstash-input-file.gemspec b/logstash-input-file.gemspec index 1859d57..d6186f2 100644 --- a/logstash-input-file.gemspec +++ b/logstash-input-file.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-file' - s.version = '4.1.17' + s.version = '4.1.18' s.licenses = ['Apache-2.0'] s.summary = "Streams events from files" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/filewatch/watched_files_collection_spec.rb b/spec/filewatch/watched_files_collection_spec.rb index 484aa89..50c2b59 100644 --- a/spec/filewatch/watched_files_collection_spec.rb +++ b/spec/filewatch/watched_files_collection_spec.rb @@ -85,9 +85,9 @@ module FileWatch collection.add(wf3) expect(collection.keys).to eq([filepath1, filepath2, filepath3]) - collection.delete([filepath2,filepath3]) + collection.remove_paths([filepath2,filepath3]) expect(collection.keys).to eq([filepath1]) - + expect(collection.values.size).to eq 1 end end diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 2af41d2..f187312 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -247,4 +247,78 @@ end end end + + let(:temp_directory) { Stud::Temporary.directory } + let(:interval) { 0.1 } + let(:options) do + { + 'mode' => "read", + 'path' => "#{temp_directory}/*", + 'stat_interval' => interval, + 'discover_interval' => interval, + 'sincedb_path' => "#{temp_directory}/.sincedb", + 'sincedb_write_interval' => interval + } + end + + let(:queue) { Queue.new } + let(:plugin) { LogStash::Inputs::File.new(options) } + + describe 'delete on complete' do + + let(:options) do + super.merge({ 'file_completed_action' => "delete", 'exit_after_read' => false }) + end + + let(:sample_file) { File.join(temp_directory, "sample.log") } + + before do + plugin.register + @run_thread = Thread.new(plugin) do |plugin| + Thread.current.abort_on_exception = true + plugin.run queue + end + + File.open(sample_file, 'w') { |fd| fd.write("sample-content\n") } + + wait_for_start_processing(@run_thread) + end + + after { plugin.stop } + + it 'processes a file' do + wait_for_file_removal(sample_file) # watched discovery + + expect( plugin.queue.size ).to eql 1 + event = plugin.queue.pop + expect( event.get('message') ).to eql 'sample-content' + end + + it 'removes watched file from collection' do + wait_for_file_removal(sample_file) # watched discovery + + watched_files = plugin.watcher.watch.watched_files_collection + expect( watched_files ).to be_empty + end + + private + + def wait_for_start_processing(run_thread, timeout: 1.0) + begin + Timeout.timeout(timeout) do + sleep(0.01) while run_thread.status != 'sleep' + end + rescue Timeout::Error + raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue + else + raise "plugin did not start processing" unless plugin.queue + end + end + + def wait_for_file_removal(path, timeout: 3 * interval) + wait(timeout).for { File.exist?(path) }.to be_falsey + raise "plugin did not start processing" unless plugin.queue + end + + end end From 2e9708a967458891d696dfbbf80835425c4de427 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Wed, 29 Apr 2020 15:33:45 +0200 Subject: [PATCH 2/4] Refactor: use volatile queue reference --- lib/logstash/inputs/file.rb | 14 +++++++++++--- spec/inputs/file_read_spec.rb | 1 - 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index b755d28..0f11807 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -6,6 +6,7 @@ require "pathname" require "socket" # for Socket.gethostname require "fileutils" +require "concurrent/atomic/atomic_reference" require_relative "file/patch" require_relative "file_listener" @@ -247,7 +248,8 @@ def validate_value(value, validator) end end - attr_reader :queue, :watcher # used in specs + # @private used in specs + attr_reader :watcher def register require "addressable/uri" @@ -322,6 +324,7 @@ def register end @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) @completely_stopped = Concurrent::AtomicBoolean.new + @queue = Concurrent::AtomicReference.new end # def register def completely_stopped? @@ -356,7 +359,7 @@ def start_processing def run(queue) start_processing - @queue = queue + @queue.set queue @watcher.subscribe(self) # halts here until quit is called # last action of the subscribe call is to write the sincedb exit_flush @@ -367,7 +370,7 @@ def post_process_this(event) event.set("[@metadata][host]", @host) event.set("host", @host) unless event.include?("host") decorate(event) - @queue << event + @queue.get << event end def handle_deletable_path(path) @@ -388,6 +391,11 @@ def stop end end + # @private used in specs + def queue + @queue.get + end + private def build_sincedb_base_from_settings(settings) diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index f187312..2c80cc2 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -317,7 +317,6 @@ def wait_for_start_processing(run_thread, timeout: 1.0) def wait_for_file_removal(path, timeout: 3 * interval) wait(timeout).for { File.exist?(path) }.to be_falsey - raise "plugin did not start processing" unless plugin.queue end end From d9dc49a992c4a9f66b4bf3940a1013b630deaf75 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Wed, 29 Apr 2020 15:38:30 +0200 Subject: [PATCH 3/4] Test: check for plugin.queue early (and sleep on CI) --- spec/helpers/spec_helper.rb | 1 + spec/inputs/file_read_spec.rb | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/spec/helpers/spec_helper.rb b/spec/helpers/spec_helper.rb index 6f54305..b2f0ddd 100644 --- a/spec/helpers/spec_helper.rb +++ b/spec/helpers/spec_helper.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" +require "rspec/wait" require "rspec_sequencing" module FileInput diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 2c80cc2..14fc933 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -296,7 +296,8 @@ it 'removes watched file from collection' do wait_for_file_removal(sample_file) # watched discovery - + sleep(0.25) # give CI some space to execute the removal + # TODO shouldn't be necessary once WatchedFileCollection does proper locking watched_files = plugin.watcher.watch.watched_files_collection expect( watched_files ).to be_empty end @@ -307,6 +308,7 @@ def wait_for_start_processing(run_thread, timeout: 1.0) begin Timeout.timeout(timeout) do sleep(0.01) while run_thread.status != 'sleep' + sleep(timeout) unless plugin.queue end rescue Timeout::Error raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue From 6f0e1cfeb0c189873b56cc2e4d9ccfa88fd36baa Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Wed, 29 Apr 2020 19:37:26 +0200 Subject: [PATCH 4/4] Chore: note on watched file entry removal when read --- lib/filewatch/read_mode/handlers/read_file.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/filewatch/read_mode/handlers/read_file.rb b/lib/filewatch/read_mode/handlers/read_file.rb index c85f6a3..d14afcc 100644 --- a/lib/filewatch/read_mode/handlers/read_file.rb +++ b/lib/filewatch/read_mode/handlers/read_file.rb @@ -22,6 +22,9 @@ def handle_specifically(watched_file) sincedb_collection.reading_completed(key) sincedb_collection.clear_watched_file(key) watched_file.listener.deleted + # NOTE: on top of un-watching we should also remove from the watched files collection + # if the file is getting deleted (on completion), that part currently resides in + # DeleteCompletedFileHandler - triggered above using `watched_file.listener.deleted` watched_file.unwatch end end