Skip to content

Check and avoid to process corrupted gzip files, close #261 #265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 4.1.17
- Added configuration setting `check_archive_validity` settings to enable gzipped files verification,
issue [#261](https://github.com/logstash-plugins/logstash-input-file/issues/261)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two comments about the changelog:

  • It looks like the changelog contains the same info for both 4.1.16 and 4.1.17.
  • I had simultaneiously claimed 4.1.17 for changes in [DOC] Doc improvements #266. Let's coordinate on version bump and publishing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doubling of description was my fault on conflict resolution, now fixed. For the the rest, we could merge your `4.1.17' and this one so that we publish just one time, WDYT?

## 4.1.16
- Added configuration setting exit_after_read to read to EOF and terminate
the input [#240](https://github.com/logstash-plugins/logstash-input-file/pull/240)
Expand Down
15 changes: 15 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ see <<plugins-{type}s-{plugin}-string_duration,string_duration>> for the details
[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-check_archive_validity>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-close_older>> |<<number,number>> or <<plugins-{type}s-{plugin}-string_duration,string_duration>>|No
| <<plugins-{type}s-{plugin}-delimiter>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-discover_interval>> |<<number,number>>|No
Expand Down Expand Up @@ -191,6 +192,20 @@ input plugins.

&nbsp;

[id="plugins-{type}s-{plugin}-check_archive_validity"]
===== `check_archive_validity`

* Value type is <<boolean,boolean>>
* The default is `false`.

When set to `true`, this setting verifies that a compressed file is valid before
processing it. There are two passes through the file--one pass to
verify that the file is valid, and another pass to process the file.

Validating a compressed file requires more processing time, but can prevent a
corrupt archive from causing looping.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good explanation. I took the information you provided, and tried to reword it a bit. What do you think about this:

The read option kicks off a full read of an archive, and could potentially
waste time trying to process an invalid file.
When set to true, this option verifies that a compressed file is valid before
reading it.

If this option is not explicitly set to true, a corrupt file could cause
cyclic processing of the broken file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not wrong, from this rewording I understand that reading an archive is costly, so before read a corrupted archive (and waste time) this option enables the verification.

In the original form I tried to describe that processing a corrupted archive led to looping on that archive and to avoid this we could enable this flag. Enabling this flag means read upfront the entire file for a verification and then read it again for processing. In this case for a not corrupted archive this could be considered a waste of time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this:

When set to true, this setting verifies that a compressed file is valid before
processing it. There are two passes through the file--one pass to
verify that the file is valid, and another pass to process the file.

Validating a compressed file requires more processing time, but can prevent a
corrupt archive from causing looping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @karenzone sounds really really better

[id="plugins-{type}s-{plugin}-close_older"]
===== `close_older`

Expand Down
83 changes: 55 additions & 28 deletions lib/filewatch/read_mode/handlers/read_zip_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,40 @@ def handle_specifically(watched_file)
# fast forward through the lines until we reach unseen content?
# meaning that we can quit in the middle of a zip file
key = watched_file.sincedb_key
begin
file_stream = FileInputStream.new(watched_file.path)
gzip_stream = GZIPInputStream.new(file_stream)
decoder = InputStreamReader.new(gzip_stream, "UTF-8")
buffered = BufferedReader.new(decoder)
while (line = buffered.readLine(false))
watched_file.listener.accept(line)
# can't quit, if we did then we would incorrectly write a 'completed' sincedb entry
# what do we do about quit when we have just begun reading the zipped file (e.g. pipeline reloading)
# should we track lines read in the sincedb and
# fast forward through the lines until we reach unseen content?
# meaning that we can quit in the middle of a zip file
end
watched_file.listener.eof
rescue ZipException => e
logger.error("Cannot decompress the gzip file at path: #{watched_file.path}")
watched_file.listener.error
else
sincedb_collection.store_last_read(key, watched_file.last_stat_size)
sincedb_collection.request_disk_flush
watched_file.listener.deleted

if @settings.check_archive_validity && corrupted?(watched_file)
watched_file.unwatch
ensure
# rescue each close individually so all close attempts are tried
close_and_ignore_ioexception(buffered) unless buffered.nil?
close_and_ignore_ioexception(decoder) unless decoder.nil?
close_and_ignore_ioexception(gzip_stream) unless gzip_stream.nil?
close_and_ignore_ioexception(file_stream) unless file_stream.nil?
else
begin
file_stream = FileInputStream.new(watched_file.path)
gzip_stream = GZIPInputStream.new(file_stream)
decoder = InputStreamReader.new(gzip_stream, "UTF-8")
buffered = BufferedReader.new(decoder)
while (line = buffered.readLine(false))
watched_file.listener.accept(line)
# can't quit, if we did then we would incorrectly write a 'completed' sincedb entry
# what do we do about quit when we have just begun reading the zipped file (e.g. pipeline reloading)
# should we track lines read in the sincedb and
# fast forward through the lines until we reach unseen content?
# meaning that we can quit in the middle of a zip file
end
watched_file.listener.eof
rescue ZipException => e
logger.error("Cannot decompress the gzip file at path: #{watched_file.path}", :exception => e.class,
:message => e.message, :backtrace => e.backtrace)
watched_file.listener.error
else
sincedb_collection.store_last_read(key, watched_file.last_stat_size)
sincedb_collection.request_disk_flush
watched_file.listener.deleted
watched_file.unwatch
ensure
# rescue each close individually so all close attempts are tried
close_and_ignore_ioexception(buffered) unless buffered.nil?
close_and_ignore_ioexception(decoder) unless decoder.nil?
close_and_ignore_ioexception(gzip_stream) unless gzip_stream.nil?
close_and_ignore_ioexception(file_stream) unless file_stream.nil?
end
end
sincedb_collection.clear_watched_file(key)
end
Expand All @@ -56,7 +62,28 @@ def close_and_ignore_ioexception(closeable)
begin
closeable.close
rescue Exception => e # IOException can be thrown by any of the Java classes that implement the Closable interface.
logger.warn("Ignoring an IOException when closing an instance of #{closeable.class.name}", "exception" => e)
logger.warn("Ignoring an IOException when closing an instance of #{closeable.class.name}",
:exception => e.class, :message => e.message, :backtrace => e.backtrace)
end
end

def corrupted?(watched_file)
begin
file_stream = FileInputStream.new(watched_file.path)
gzip_stream = GZIPInputStream.new(file_stream)
buffer = Java::byte[8192].new
start = Time.new
until gzip_stream.read(buffer) == -1
end
return false
rescue ZipException => e
duration = Time.now - start
logger.warn("Detected corrupted archive #{watched_file.path} file won't be processed", :message => e.message,
:duration => duration.round(3))
return true
ensure
close_and_ignore_ioexception(gzip_stream) unless gzip_stream.nil?
close_and_ignore_ioexception(file_stream) unless file_stream.nil?
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/filewatch/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Settings
attr_reader :sincedb_path, :sincedb_write_interval, :sincedb_expiry_duration
attr_reader :file_sort_by, :file_sort_direction
attr_reader :exit_after_read
attr_reader :check_archive_validity

def self.from_options(opts)
new.add_options(opts)
Expand Down Expand Up @@ -52,6 +53,7 @@ def add_options(opts)
@file_sort_by = @opts[:file_sort_by]
@file_sort_direction = @opts[:file_sort_direction]
@exit_after_read = @opts[:exit_after_read]
@check_archive_validity = @opts[:check_archive_validity]
self
end

Expand Down
6 changes: 6 additions & 0 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ class File < LogStash::Inputs::Base
# Sincedb still works, if you run LS once again after doing some changes - only new values will be read
config :exit_after_read, :validate => :boolean, :default => false

# Before start read a compressed file, checks for its validity.
# This request a full read of the archive, so potentially could cost time.
# If not specified to true, and the file is corrupted, could end in cyclic processing of the broken file.
config :check_archive_validity, :validate => :boolean, :default => false

public

class << self
Expand Down Expand Up @@ -266,6 +271,7 @@ def register
:file_sort_by => @file_sort_by,
:file_sort_direction => @file_sort_direction,
:exit_after_read => @exit_after_read,
:check_archive_validity => @check_archive_validity,
}

@completed_file_handlers = []
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-file.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-file'
s.version = '4.1.16'
s.version = '4.1.17'
s.licenses = ['Apache-2.0']
s.summary = "Streams events from files"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
7 changes: 7 additions & 0 deletions spec/helpers/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ def self.make_fixture_current(path, time = Time.now)
::File.utime(time, time, path)
end

def self.corrupt_gzip(file_path)
f = File.open(file_path, "w")
f.seek(12)
f.puts 'corrupting_string'
f.close()
end

class TracerBase
def initialize
@tracer = Concurrent::Array.new
Expand Down
31 changes: 31 additions & 0 deletions spec/inputs/file_read_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,37 @@
expect(events[2].get("message")).to start_with("2010-03-12 23:51")
expect(events[3].get("message")).to start_with("2010-03-12 23:51")
end

it "the corrupted file is untouched" do
directory = Stud::Temporary.directory
file_path = fixture_dir.join('compressed.log.gz')
corrupted_file_path = ::File.join(directory, 'corrupted.gz')
FileUtils.cp(file_path, corrupted_file_path)

FileInput.corrupt_gzip(corrupted_file_path)

log_completed_path = ::File.join(directory, "C_completed.txt")
f = File.new(log_completed_path, "w")
f.close()

conf = <<-CONFIG
input {
file {
type => "blah"
path => "#{corrupted_file_path}"
mode => "read"
file_completed_action => "log_and_delete"
file_completed_log_path => "#{log_completed_path}"
check_archive_validity => true
}
}
CONFIG

events = input(conf) do |pipeline, queue|
wait(1)
expect(IO.read(log_completed_path)).to be_empty
end
end
end
end
end