diff --git a/CHANGELOG.md b/CHANGELOG.md index e33ec63..255c5fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 4.4.6 + - Change read mode to immediately stop consuming buffered lines when shutdown is requested [#322](https://github.com/logstash-plugins/logstash-input-file/pull/322) + ## 4.4.5 - Handle EOF when checking archive validity [#321](https://github.com/logstash-plugins/logstash-input-file/pull/321) diff --git a/lib/filewatch/read_mode/handlers/read_file.rb b/lib/filewatch/read_mode/handlers/read_file.rb index 2b6cd2b..824ac2c 100644 --- a/lib/filewatch/read_mode/handlers/read_file.rb +++ b/lib/filewatch/read_mode/handlers/read_file.rb @@ -54,6 +54,7 @@ def controlled_read(watched_file, loop_control) # sincedb position is independent from the watched_file bytes_read delta = line.bytesize + @settings.delimiter_byte_size sincedb_collection.increment(watched_file.sincedb_key, delta) + break if quit? end rescue EOFError => e log_error("controlled_read: eof error reading file", watched_file, e) diff --git a/logstash-input-file.gemspec b/logstash-input-file.gemspec index 4f10bcd..032112f 100644 --- a/logstash-input-file.gemspec +++ b/logstash-input-file.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-file' - s.version = '4.4.5' + s.version = '4.4.6' s.licenses = ['Apache-2.0'] s.summary = "Streams events from files" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 3cd54de..ef820b9 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -181,25 +181,27 @@ end context "for a compressed file" do + let(:tmp_directory) { Stud::Temporary.directory } + let(:all_files_path) { fixture_dir.join("compressed.*.*") } + let(:gz_file_path) { fixture_dir.join('compressed.log.gz') } + let(:gzip_file_path) { fixture_dir.join('compressed.log.gzip') } + let(:sincedb_path) { ::File.join(tmp_directory, "sincedb.db") } + let(:log_completed_path) { ::File.join(tmp_directory, "completed.log") } + it "the file is read" do - file_path = fixture_dir.join('compressed.log.gz') - file_path2 = fixture_dir.join('compressed.log.gzip') - FileInput.make_fixture_current(file_path.to_path) - FileInput.make_fixture_current(file_path2.to_path) - tmpfile_path = fixture_dir.join("compressed.*.*") - directory = Stud::Temporary.directory - sincedb_path = ::File.join(directory, "readmode_C_sincedb.txt") - log_completed_path = ::File.join(directory, "C_completed.txt") + FileInput.make_fixture_current(gz_file_path.to_path) + FileInput.make_fixture_current(gzip_file_path.to_path) conf = <<-CONFIG input { file { type => "blah" - path => "#{tmpfile_path}" + path => "#{all_files_path}" sincedb_path => "#{sincedb_path}" mode => "read" file_completed_action => "log" file_completed_log_path => "#{log_completed_path}" + exit_after_read => true } } CONFIG @@ -216,17 +218,11 @@ end it "the corrupted file is untouched" do - directory = Stud::Temporary.directory - file_path = fixture_dir.join('compressed.log.gz') - corrupted_file_path = ::File.join(directory, 'corrupted.gz') - FileUtils.cp(file_path, corrupted_file_path) + corrupted_file_path = ::File.join(tmp_directory, 'corrupted.gz') + FileUtils.cp(gz_file_path, corrupted_file_path) FileInput.corrupt_gzip(corrupted_file_path) - log_completed_path = ::File.join(directory, "C_completed.txt") - f = File.new(log_completed_path, "w") - f.close() - conf = <<-CONFIG input { file { @@ -236,28 +232,23 @@ file_completed_action => "log_and_delete" file_completed_log_path => "#{log_completed_path}" check_archive_validity => true + exit_after_read => true } } CONFIG - events = input(conf) do |pipeline, queue| + input(conf) do |pipeline, queue| wait(1) expect(IO.read(log_completed_path)).to be_empty end end it "the truncated file is untouched" do - directory = Stud::Temporary.directory - file_path = fixture_dir.join('compressed.log.gz') - truncated_file_path = ::File.join(directory, 'truncated.gz') - FileUtils.cp(file_path, truncated_file_path) + truncated_file_path = ::File.join(tmp_directory, 'truncated.gz') + FileUtils.cp(gz_file_path, truncated_file_path) FileInput.truncate_gzip(truncated_file_path) - log_completed_path = ::File.join(directory, "C_completed.txt") - f = File.new(log_completed_path, "w") - f.close() - conf = <<-CONFIG input { file { @@ -267,11 +258,12 @@ file_completed_action => "log_and_delete" file_completed_log_path => "#{log_completed_path}" check_archive_validity => true + exit_after_read => true } } CONFIG - events = input(conf) do |pipeline, queue| + input(conf) do |pipeline, queue| wait(1) expect(IO.read(log_completed_path)).to be_empty end