Skip to content

Commit 20db56d

Browse files
authored
Fix: sincedb_clean_after not being respected (#276)
In certain cases, in read mode sincedb is not updated. There was no periodic check for sincedb updates that would cause the sincedb_clean_after entries to cleanup. The cleanup relied on new files being discovered or new content being added to existing files -> causing sincedb updates. The fix here is to periodically flush sincedb (from the watch loop). Besides, to make the process more deterministic, there's a minor change to make sure the same "updated" timestamp is used to mark the last changed time. resolves #250 expected to also resolve #260
1 parent be18de7 commit 20db56d

File tree

6 files changed

+77
-37
lines changed

6 files changed

+77
-37
lines changed

Diff for: CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.2.2
2+
- Fix: sincedb_clean_after not being respected [#276](https://github.com/logstash-plugins/logstash-input-file/pull/276)
3+
14
## 4.2.1
25
- Fix: skip sincedb eviction if read mode completion deletes file during flush [#273](https://github.com/logstash-plugins/logstash-input-file/pull/273)
36

Diff for: lib/filewatch/observing_base.rb

-10
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,5 @@ def quit
8383
# sincedb_write("shutting down")
8484
end
8585

86-
# close_file(path) is to be used by external code
87-
# when it knows that it is completely done with a file.
88-
# Other files or folders may still be being watched.
89-
# Caution, once unwatched, a file can't be watched again
90-
# unless a new instance of this class begins watching again.
91-
# The sysadmin should rename, move or delete the file.
92-
def close_file(path)
93-
@watch.unwatch(path)
94-
sincedb_write
95-
end
9686
end
9787
end

Diff for: lib/filewatch/sincedb_collection.rb

+13-13
Original file line numberDiff line numberDiff line change
@@ -180,17 +180,17 @@ def watched_file_unset?(key)
180180
get(key).watched_file.nil?
181181
end
182182

183-
private
184-
185183
def flush_at_interval
186-
now = Time.now.to_i
187-
delta = now - @sincedb_last_write
184+
now = Time.now
185+
delta = now.to_i - @sincedb_last_write
188186
if delta >= @settings.sincedb_write_interval
189187
logger.debug("writing sincedb (delta since last write = #{delta})")
190188
sincedb_write(now)
191189
end
192190
end
193191

192+
private
193+
194194
def handle_association(sincedb_value, watched_file)
195195
watched_file.update_bytes_read(sincedb_value.position)
196196
sincedb_value.set_watched_file(watched_file)
@@ -210,33 +210,33 @@ def set_key_value(key, value)
210210
end
211211
end
212212

213-
def sincedb_write(time = Time.now.to_i)
214-
logger.trace("sincedb_write: to: #{path}")
213+
def sincedb_write(time = Time.now)
214+
logger.trace("sincedb_write: #{path} (time = #{time})")
215215
begin
216-
@write_method.call
216+
@write_method.call(time)
217217
@serializer.expired_keys.each do |key|
218218
@sincedb[key].unset_watched_file
219219
delete(key)
220220
logger.trace? && logger.trace("sincedb_write: cleaned", :key => key)
221221
end
222-
@sincedb_last_write = time
222+
@sincedb_last_write = time.to_i
223223
@write_requested = false
224224
rescue Errno::EACCES
225225
# no file handles free perhaps
226226
# maybe it will work next time
227-
logger.trace("sincedb_write: error: #{path}: #{$!}")
227+
logger.trace("sincedb_write: #{path} error: #{$!}")
228228
end
229229
end
230230

231-
def atomic_write
231+
def atomic_write(time)
232232
FileHelper.write_atomically(@full_path) do |io|
233-
@serializer.serialize(@sincedb, io)
233+
@serializer.serialize(@sincedb, io, time.to_f)
234234
end
235235
end
236236

237-
def non_atomic_write
237+
def non_atomic_write(time)
238238
IO.open(IO.sysopen(@full_path, "w+")) do |io|
239-
@serializer.serialize(@sincedb, io)
239+
@serializer.serialize(@sincedb, io, time.to_f)
240240
end
241241
end
242242
end

Diff for: lib/filewatch/watch.rb

+3
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ def subscribe(observer, sincedb_collection)
5151
glob = 0
5252
end
5353
break if quit?
54+
# NOTE: maybe the plugin should validate stat_interval <= sincedb_write_interval <= sincedb_clean_after
5455
sleep(@settings.stat_interval)
56+
# we need to check potential expired keys (sincedb_clean_after) periodically
57+
sincedb_collection.flush_at_interval
5558
end
5659
sincedb_collection.write_if_requested # does nothing if no requests to write were lodged.
5760
@watched_files_collection.close_all

Diff for: logstash-input-file.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-file'
4-
s.version = '4.2.1'
4+
s.version = '4.2.2'
55
s.licenses = ['Apache-2.0']
66
s.summary = "Streams events from files"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

Diff for: spec/inputs/file_read_spec.rb

+57-13
Original file line numberDiff line numberDiff line change
@@ -301,25 +301,69 @@
301301
watched_files = plugin.watcher.watch.watched_files_collection
302302
expect( watched_files ).to be_empty
303303
end
304+
end
304305

305-
private
306+
describe 'sincedb cleanup' do
306307

307-
def wait_for_start_processing(run_thread, timeout: 1.0)
308-
begin
309-
Timeout.timeout(timeout) do
310-
sleep(0.01) while run_thread.status != 'sleep'
311-
sleep(timeout) unless plugin.queue
312-
end
313-
rescue Timeout::Error
314-
raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue
315-
else
316-
raise "plugin did not start processing" unless plugin.queue
308+
let(:options) do
309+
super.merge(
310+
'sincedb_path' => sincedb_path,
311+
'sincedb_clean_after' => '1.0 seconds',
312+
'sincedb_write_interval' => 0.25,
313+
'stat_interval' => 0.1,
314+
)
315+
end
316+
317+
let(:sincedb_path) { "#{temp_directory}/.sincedb" }
318+
319+
let(:sample_file) { File.join(temp_directory, "sample.txt") }
320+
321+
before do
322+
plugin.register
323+
@run_thread = Thread.new(plugin) do |plugin|
324+
Thread.current.abort_on_exception = true
325+
plugin.run queue
317326
end
327+
328+
File.open(sample_file, 'w') { |fd| fd.write("line1\nline2\n") }
329+
330+
wait_for_start_processing(@run_thread)
318331
end
319332

320-
def wait_for_file_removal(path, timeout: 3 * interval)
321-
wait(timeout).for { File.exist?(path) }.to be_falsey
333+
after { plugin.stop }
334+
335+
it 'cleans up sincedb entry' do
336+
wait_for_file_removal(sample_file) # watched discovery
337+
338+
sincedb_content = File.read(sincedb_path).strip
339+
expect( sincedb_content ).to_not be_empty
340+
341+
Stud.try(3.times) do
342+
sleep(1.5) # > sincedb_clean_after
343+
344+
sincedb_content = File.read(sincedb_path).strip
345+
expect( sincedb_content ).to be_empty
346+
end
322347
end
323348

324349
end
350+
351+
private
352+
353+
def wait_for_start_processing(run_thread, timeout: 1.0)
354+
begin
355+
Timeout.timeout(timeout) do
356+
sleep(0.01) while run_thread.status != 'sleep'
357+
sleep(timeout) unless plugin.queue
358+
end
359+
rescue Timeout::Error
360+
raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue
361+
else
362+
raise "plugin did not start processing" unless plugin.queue
363+
end
364+
end
365+
366+
def wait_for_file_removal(path, timeout: 3 * interval)
367+
wait(timeout).for { File.exist?(path) }.to be_falsey
368+
end
325369
end

0 commit comments

Comments
 (0)