Skip to content

Commit e4b5ced

Browse files
committed
Added settings 'check_archive_validity' to optionally enable integrity check on archives, close #261
Fixes #265
1 parent 5cd1171 commit e4b5ced

File tree

8 files changed

+121
-29
lines changed

8 files changed

+121
-29
lines changed

Diff for: CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 4.1.17
2+
- Added configuration setting `check_archive_validity` settings to enable gzipped files verification,
3+
issue [#261](https://github.com/logstash-plugins/logstash-input-file/issues/261)
4+
15
## 4.1.16
26
- Added configuration setting exit_after_read to read to EOF and terminate
37
the input [#240](https://github.com/logstash-plugins/logstash-input-file/pull/240)

Diff for: docs/index.asciidoc

+15
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ see <<plugins-{type}s-{plugin}-string_duration,string_duration>> for the details
164164
[cols="<,<,<",options="header",]
165165
|=======================================================================
166166
|Setting |Input type|Required
167+
| <<plugins-{type}s-{plugin}-check_archive_validity>> |<<boolean,boolean>>|No
167168
| <<plugins-{type}s-{plugin}-close_older>> |<<number,number>> or <<plugins-{type}s-{plugin}-string_duration,string_duration>>|No
168169
| <<plugins-{type}s-{plugin}-delimiter>> |<<string,string>>|No
169170
| <<plugins-{type}s-{plugin}-discover_interval>> |<<number,number>>|No
@@ -191,6 +192,20 @@ input plugins.
191192

192193
&nbsp;
193194

195+
[id="plugins-{type}s-{plugin}-check_archive_validity"]
196+
===== `check_archive_validity`
197+
198+
* Value type is <<boolean,boolean>>
199+
* The default is `false`.
200+
201+
When set to `true`, this setting verifies that a compressed file is valid before
202+
processing it. There are two passes through the file--one pass to
203+
verify that the file is valid, and another pass to process the file.
204+
205+
Validating a compressed file requires more processing time, but can prevent a
206+
corrupt archive from causing looping.
207+
208+
194209
[id="plugins-{type}s-{plugin}-close_older"]
195210
===== `close_older`
196211

Diff for: lib/filewatch/read_mode/handlers/read_zip_file.rb

+55-28
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,40 @@ def handle_specifically(watched_file)
1818
# fast forward through the lines until we reach unseen content?
1919
# meaning that we can quit in the middle of a zip file
2020
key = watched_file.sincedb_key
21-
begin
22-
file_stream = FileInputStream.new(watched_file.path)
23-
gzip_stream = GZIPInputStream.new(file_stream)
24-
decoder = InputStreamReader.new(gzip_stream, "UTF-8")
25-
buffered = BufferedReader.new(decoder)
26-
while (line = buffered.readLine(false))
27-
watched_file.listener.accept(line)
28-
# can't quit, if we did then we would incorrectly write a 'completed' sincedb entry
29-
# what do we do about quit when we have just begun reading the zipped file (e.g. pipeline reloading)
30-
# should we track lines read in the sincedb and
31-
# fast forward through the lines until we reach unseen content?
32-
# meaning that we can quit in the middle of a zip file
33-
end
34-
watched_file.listener.eof
35-
rescue ZipException => e
36-
logger.error("Cannot decompress the gzip file at path: #{watched_file.path}")
37-
watched_file.listener.error
38-
else
39-
sincedb_collection.store_last_read(key, watched_file.last_stat_size)
40-
sincedb_collection.request_disk_flush
41-
watched_file.listener.deleted
21+
22+
if @settings.check_archive_validity && corrupted?(watched_file)
4223
watched_file.unwatch
43-
ensure
44-
# rescue each close individually so all close attempts are tried
45-
close_and_ignore_ioexception(buffered) unless buffered.nil?
46-
close_and_ignore_ioexception(decoder) unless decoder.nil?
47-
close_and_ignore_ioexception(gzip_stream) unless gzip_stream.nil?
48-
close_and_ignore_ioexception(file_stream) unless file_stream.nil?
24+
else
25+
begin
26+
file_stream = FileInputStream.new(watched_file.path)
27+
gzip_stream = GZIPInputStream.new(file_stream)
28+
decoder = InputStreamReader.new(gzip_stream, "UTF-8")
29+
buffered = BufferedReader.new(decoder)
30+
while (line = buffered.readLine(false))
31+
watched_file.listener.accept(line)
32+
# can't quit, if we did then we would incorrectly write a 'completed' sincedb entry
33+
# what do we do about quit when we have just begun reading the zipped file (e.g. pipeline reloading)
34+
# should we track lines read in the sincedb and
35+
# fast forward through the lines until we reach unseen content?
36+
# meaning that we can quit in the middle of a zip file
37+
end
38+
watched_file.listener.eof
39+
rescue ZipException => e
40+
logger.error("Cannot decompress the gzip file at path: #{watched_file.path}", :exception => e.class,
41+
:message => e.message, :backtrace => e.backtrace)
42+
watched_file.listener.error
43+
else
44+
sincedb_collection.store_last_read(key, watched_file.last_stat_size)
45+
sincedb_collection.request_disk_flush
46+
watched_file.listener.deleted
47+
watched_file.unwatch
48+
ensure
49+
# rescue each close individually so all close attempts are tried
50+
close_and_ignore_ioexception(buffered) unless buffered.nil?
51+
close_and_ignore_ioexception(decoder) unless decoder.nil?
52+
close_and_ignore_ioexception(gzip_stream) unless gzip_stream.nil?
53+
close_and_ignore_ioexception(file_stream) unless file_stream.nil?
54+
end
4955
end
5056
sincedb_collection.clear_watched_file(key)
5157
end
@@ -56,7 +62,28 @@ def close_and_ignore_ioexception(closeable)
5662
begin
5763
closeable.close
5864
rescue Exception => e # IOException can be thrown by any of the Java classes that implement the Closable interface.
59-
logger.warn("Ignoring an IOException when closing an instance of #{closeable.class.name}", "exception" => e)
65+
logger.warn("Ignoring an IOException when closing an instance of #{closeable.class.name}",
66+
:exception => e.class, :message => e.message, :backtrace => e.backtrace)
67+
end
68+
end
69+
70+
def corrupted?(watched_file)
71+
begin
72+
file_stream = FileInputStream.new(watched_file.path)
73+
gzip_stream = GZIPInputStream.new(file_stream)
74+
buffer = Java::byte[8192].new
75+
start = Time.new
76+
until gzip_stream.read(buffer) == -1
77+
end
78+
return false
79+
rescue ZipException => e
80+
duration = Time.now - start
81+
logger.warn("Detected corrupted archive #{watched_file.path} file won't be processed", :message => e.message,
82+
:duration => duration.round(3))
83+
return true
84+
ensure
85+
close_and_ignore_ioexception(gzip_stream) unless gzip_stream.nil?
86+
close_and_ignore_ioexception(file_stream) unless file_stream.nil?
6087
end
6188
end
6289
end

Diff for: lib/filewatch/settings.rb

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class Settings
99
attr_reader :sincedb_path, :sincedb_write_interval, :sincedb_expiry_duration
1010
attr_reader :file_sort_by, :file_sort_direction
1111
attr_reader :exit_after_read
12+
attr_reader :check_archive_validity
1213

1314
def self.from_options(opts)
1415
new.add_options(opts)
@@ -52,6 +53,7 @@ def add_options(opts)
5253
@file_sort_by = @opts[:file_sort_by]
5354
@file_sort_direction = @opts[:file_sort_direction]
5455
@exit_after_read = @opts[:exit_after_read]
56+
@check_archive_validity = @opts[:check_archive_validity]
5557
self
5658
end
5759

Diff for: lib/logstash/inputs/file.rb

+6
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ class File < LogStash::Inputs::Base
227227
# Sincedb still works, if you run LS once again after doing some changes - only new values will be read
228228
config :exit_after_read, :validate => :boolean, :default => false
229229

230+
# Before start read a compressed file, checks for its validity.
231+
# This request a full read of the archive, so potentially could cost time.
232+
# If not specified to true, and the file is corrupted, could end in cyclic processing of the broken file.
233+
config :check_archive_validity, :validate => :boolean, :default => false
234+
230235
public
231236

232237
class << self
@@ -266,6 +271,7 @@ def register
266271
:file_sort_by => @file_sort_by,
267272
:file_sort_direction => @file_sort_direction,
268273
:exit_after_read => @exit_after_read,
274+
:check_archive_validity => @check_archive_validity,
269275
}
270276

271277
@completed_file_handlers = []

Diff for: logstash-input-file.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-file'
4-
s.version = '4.1.16'
4+
s.version = '4.1.17'
55
s.licenses = ['Apache-2.0']
66
s.summary = "Streams events from files"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

Diff for: spec/helpers/spec_helper.rb

+7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ def self.make_fixture_current(path, time = Time.now)
1616
::File.utime(time, time, path)
1717
end
1818

19+
def self.corrupt_gzip(file_path)
20+
f = File.open(file_path, "w")
21+
f.seek(12)
22+
f.puts 'corrupting_string'
23+
f.close()
24+
end
25+
1926
class TracerBase
2027
def initialize
2128
@tracer = Concurrent::Array.new

Diff for: spec/inputs/file_read_spec.rb

+31
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,37 @@
214214
expect(events[2].get("message")).to start_with("2010-03-12 23:51")
215215
expect(events[3].get("message")).to start_with("2010-03-12 23:51")
216216
end
217+
218+
it "the corrupted file is untouched" do
219+
directory = Stud::Temporary.directory
220+
file_path = fixture_dir.join('compressed.log.gz')
221+
corrupted_file_path = ::File.join(directory, 'corrupted.gz')
222+
FileUtils.cp(file_path, corrupted_file_path)
223+
224+
FileInput.corrupt_gzip(corrupted_file_path)
225+
226+
log_completed_path = ::File.join(directory, "C_completed.txt")
227+
f = File.new(log_completed_path, "w")
228+
f.close()
229+
230+
conf = <<-CONFIG
231+
input {
232+
file {
233+
type => "blah"
234+
path => "#{corrupted_file_path}"
235+
mode => "read"
236+
file_completed_action => "log_and_delete"
237+
file_completed_log_path => "#{log_completed_path}"
238+
check_archive_validity => true
239+
}
240+
}
241+
CONFIG
242+
243+
events = input(conf) do |pipeline, queue|
244+
wait(1)
245+
expect(IO.read(log_completed_path)).to be_empty
246+
end
247+
end
217248
end
218249
end
219250
end

0 commit comments

Comments
 (0)