diff --git a/CHANGELOG.md b/CHANGELOG.md index b08bb04..e33ec63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 4.4.5 + - Handle EOF when checking archive validity [#321](https://github.com/logstash-plugins/logstash-input-file/pull/321) + ## 4.4.4 - Fixes gzip file handling in read mode when run on JDK12+, including JDK17 that is bundled with Logstash 8.4+ [#312](https://github.com/logstash-plugins/logstash-input-file/pull/312) diff --git a/lib/filewatch/read_mode/handlers/read_zip_file.rb b/lib/filewatch/read_mode/handlers/read_zip_file.rb index 10b3486..e7eb21f 100644 --- a/lib/filewatch/read_mode/handlers/read_zip_file.rb +++ b/lib/filewatch/read_mode/handlers/read_zip_file.rb @@ -71,14 +71,14 @@ def close_and_ignore_ioexception(closeable) def corrupted?(watched_file) begin + start = Time.new file_stream = FileInputStream.new(watched_file.path) gzip_stream = GZIPInputStream.new(file_stream) buffer = Java::byte[8192].new - start = Time.new until gzip_stream.read(buffer) == -1 end return false - rescue ZipException => e + rescue ZipException, Java::JavaIo::EOFException => e duration = Time.now - start logger.warn("Detected corrupted archive #{watched_file.path} file won't be processed", :message => e.message, :duration => duration.round(3)) diff --git a/logstash-input-file.gemspec b/logstash-input-file.gemspec index f6b66ea..4f10bcd 100644 --- a/logstash-input-file.gemspec +++ b/logstash-input-file.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-file' - s.version = '4.4.4' + s.version = '4.4.5' s.licenses = ['Apache-2.0'] s.summary = "Streams events from files" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/helpers/spec_helper.rb b/spec/helpers/spec_helper.rb index 404c93f..095b327 100644 --- a/spec/helpers/spec_helper.rb +++ b/spec/helpers/spec_helper.rb @@ -24,6 +24,12 @@ def self.corrupt_gzip(file_path) f.close() end + def self.truncate_gzip(file_path) + f = File.open(file_path, "ab") + f.truncate(100) + f.close() + end + class TracerBase def initialize @tracer = Concurrent::Array.new diff --git a/spec/inputs/file_read_spec.rb b/spec/inputs/file_read_spec.rb index 02bf83d..3cd54de 100644 --- a/spec/inputs/file_read_spec.rb +++ b/spec/inputs/file_read_spec.rb @@ -245,6 +245,37 @@ expect(IO.read(log_completed_path)).to be_empty end end + + it "the truncated file is untouched" do + directory = Stud::Temporary.directory + file_path = fixture_dir.join('compressed.log.gz') + truncated_file_path = ::File.join(directory, 'truncated.gz') + FileUtils.cp(file_path, truncated_file_path) + + FileInput.truncate_gzip(truncated_file_path) + + log_completed_path = ::File.join(directory, "C_completed.txt") + f = File.new(log_completed_path, "w") + f.close() + + conf = <<-CONFIG + input { + file { + type => "blah" + path => "#{truncated_file_path}" + mode => "read" + file_completed_action => "log_and_delete" + file_completed_log_path => "#{log_completed_path}" + check_archive_validity => true + } + } + CONFIG + + events = input(conf) do |pipeline, queue| + wait(1) + expect(IO.read(log_completed_path)).to be_empty + end + end end end