Skip to content

Fix: sincedb_clean_after not being respected #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 4.2.2
- Fix: sincedb_clean_after not being respected [#276](https://github.com/logstash-plugins/logstash-input-file/pull/276)

## 4.2.1
- Fix: skip sincedb eviction if read mode completion deletes file during flush [#273](https://github.com/logstash-plugins/logstash-input-file/pull/273)

Expand Down
10 changes: 0 additions & 10 deletions lib/filewatch/observing_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,5 @@ def quit
# sincedb_write("shutting down")
end

# close_file(path) is to be used by external code
# when it knows that it is completely done with a file.
# Other files or folders may still be being watched.
# Caution, once unwatched, a file can't be watched again
# unless a new instance of this class begins watching again.
# The sysadmin should rename, move or delete the file.
def close_file(path)
@watch.unwatch(path)
sincedb_write
end
end
end
26 changes: 13 additions & 13 deletions lib/filewatch/sincedb_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,17 @@ def watched_file_unset?(key)
get(key).watched_file.nil?
end

private

def flush_at_interval
now = Time.now.to_i
delta = now - @sincedb_last_write
now = Time.now
delta = now.to_i - @sincedb_last_write
if delta >= @settings.sincedb_write_interval
logger.debug("writing sincedb (delta since last write = #{delta})")
sincedb_write(now)
end
end

private

def handle_association(sincedb_value, watched_file)
watched_file.update_bytes_read(sincedb_value.position)
sincedb_value.set_watched_file(watched_file)
Expand All @@ -210,33 +210,33 @@ def set_key_value(key, value)
end
end

def sincedb_write(time = Time.now.to_i)
logger.trace("sincedb_write: to: #{path}")
def sincedb_write(time = Time.now)
logger.trace("sincedb_write: #{path} (time = #{time})")
begin
@write_method.call
@write_method.call(time)
@serializer.expired_keys.each do |key|
@sincedb[key].unset_watched_file
delete(key)
logger.trace? && logger.trace("sincedb_write: cleaned", :key => key)
end
@sincedb_last_write = time
@sincedb_last_write = time.to_i
@write_requested = false
rescue Errno::EACCES
# no file handles free perhaps
# maybe it will work next time
logger.trace("sincedb_write: error: #{path}: #{$!}")
logger.trace("sincedb_write: #{path} error: #{$!}")
end
end

def atomic_write
def atomic_write(time)
FileHelper.write_atomically(@full_path) do |io|
@serializer.serialize(@sincedb, io)
@serializer.serialize(@sincedb, io, time.to_f)
end
end

def non_atomic_write
def non_atomic_write(time)
IO.open(IO.sysopen(@full_path, "w+")) do |io|
@serializer.serialize(@sincedb, io)
@serializer.serialize(@sincedb, io, time.to_f)
end
end
end
Expand Down
3 changes: 3 additions & 0 deletions lib/filewatch/watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def subscribe(observer, sincedb_collection)
glob = 0
end
break if quit?
# NOTE: maybe the plugin should validate stat_interval <= sincedb_write_interval <= sincedb_clean_after
sleep(@settings.stat_interval)
# we need to check potential expired keys (sincedb_clean_after) periodically
sincedb_collection.flush_at_interval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

We already have a SincedbCollection#write_if_requested in this until quit? loop, which conditionally sends SincedbCollection#flush_at_interval IFF a write has been requested. Should we simply change this to send SincedbCollection#flush_at_interval instead, or is there a valid reason to sometimes flush twice in a loop?

Also, this isn't new to your PR, but it looks like we are passing an instance of SincedbCollection around a fair bit, with this flush_at_interval and other state-mutating methods being invoked by potentially multiple threads simultaneously 😩. We should probably file a separate issue to audit the thread-safety.

Copy link
Contributor Author

@kares kares Sep 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the described in the bug report and in the description the problem is when no writes are requested (no new discovered files or new content in watched files) write_if_requested is simply a noop and thus sincedb_clean_after won't get triggered.

the plugin should guarantee a cleanup of expired sincedb entried regardless of whether there's actual new content. currently cleanup is only guaranteed to happen on shutdown.

have decided to add an explicit flush_at_interval along side while keeping the existing write_if_requested, since they accomplish different things we want to happen in a loop a) write changes if any b) guarantee periodic sincedb flushes. the actual write won't happen twice in the loop since flushing only happens if sincedb_write_interval time has passed.

the whole thing is ripe for deeper refactoring but unfortunately I do not have the cycles to get into that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaauie is your preference still (given ^^^) :

I think that the change to FileWatch::Watch#subscribe can be simplified a bit by jumping straight to invoking SincedbCollection#flush_at_interval exactly once per loop instead of invoking it once-or-twice.

do not have strong opinions for either way given the current state of code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, that the actual flush is safeguarded to not flush more often than the interval, so it doesn't matter much if we attempt it once or twice in the loop. I'll update my review to an LGTM.

end
sincedb_collection.write_if_requested # does nothing if no requests to write were lodged.
@watched_files_collection.close_all
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-file.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-file'
s.version = '4.2.1'
s.version = '4.2.2'
s.licenses = ['Apache-2.0']
s.summary = "Streams events from files"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
70 changes: 57 additions & 13 deletions spec/inputs/file_read_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -301,25 +301,69 @@
watched_files = plugin.watcher.watch.watched_files_collection
expect( watched_files ).to be_empty
end
end

private
describe 'sincedb cleanup' do

def wait_for_start_processing(run_thread, timeout: 1.0)
begin
Timeout.timeout(timeout) do
sleep(0.01) while run_thread.status != 'sleep'
sleep(timeout) unless plugin.queue
end
rescue Timeout::Error
raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue
else
raise "plugin did not start processing" unless plugin.queue
let(:options) do
super.merge(
'sincedb_path' => sincedb_path,
'sincedb_clean_after' => '1.0 seconds',
'sincedb_write_interval' => 0.25,
'stat_interval' => 0.1,
)
end

let(:sincedb_path) { "#{temp_directory}/.sincedb" }

let(:sample_file) { File.join(temp_directory, "sample.txt") }

before do
plugin.register
@run_thread = Thread.new(plugin) do |plugin|
Thread.current.abort_on_exception = true
plugin.run queue
end

File.open(sample_file, 'w') { |fd| fd.write("line1\nline2\n") }

wait_for_start_processing(@run_thread)
end

def wait_for_file_removal(path, timeout: 3 * interval)
wait(timeout).for { File.exist?(path) }.to be_falsey
after { plugin.stop }

it 'cleans up sincedb entry' do
wait_for_file_removal(sample_file) # watched discovery

sincedb_content = File.read(sincedb_path).strip
expect( sincedb_content ).to_not be_empty

Stud.try(3.times) do
sleep(1.5) # > sincedb_clean_after

sincedb_content = File.read(sincedb_path).strip
expect( sincedb_content ).to be_empty
end
end

end

private

def wait_for_start_processing(run_thread, timeout: 1.0)
begin
Timeout.timeout(timeout) do
sleep(0.01) while run_thread.status != 'sleep'
sleep(timeout) unless plugin.queue
end
rescue Timeout::Error
raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue
else
raise "plugin did not start processing" unless plugin.queue
end
end

def wait_for_file_removal(path, timeout: 3 * interval)
wait(timeout).for { File.exist?(path) }.to be_falsey
end
end