Skip to content

Fix: release watched files on completion (in read-mode) #271

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 4.1.18
- Fix: release watched files on completion (in read-mode) [#271](https://github.com/logstash-plugins/logstash-input-file/pull/271)

## 4.1.17
- Added configuration setting `check_archive_validity` settings to enable
gzipped files verification, issue
Expand Down
3 changes: 3 additions & 0 deletions lib/filewatch/read_mode/handlers/read_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def handle_specifically(watched_file)
sincedb_collection.reading_completed(key)
sincedb_collection.clear_watched_file(key)
watched_file.listener.deleted
# NOTE: on top of un-watching we should also remove from the watched files collection
# if the file is getting deleted (on completion), that part currently resides in
# DeleteCompletedFileHandler - triggered above using `watched_file.listener.deleted`
watched_file.unwatch
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/filewatch/watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def iterate_on_state
watched_files = @watched_files_collection.values
@processor.process_all_states(watched_files)
ensure
@watched_files_collection.delete(@processor.deletable_filepaths)
@watched_files_collection.remove_paths(@processor.deletable_filepaths)
@processor.deletable_filepaths.clear
end
end # def each
Expand Down
14 changes: 9 additions & 5 deletions lib/filewatch/watched_files_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ def add(watched_file)
@sort_method.call
end

def delete(paths)
Array(paths).each do |f|
index = @pointers.delete(f)
@files.delete_at(index)
refresh_pointers
def remove_paths(paths)
removed_files = Array(paths).map do |path|
index = @pointers.delete(path)
if index
watched_file = @files.delete_at(index)
refresh_pointers
watched_file
end
end
@sort_method.call
removed_files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why we the remove_paths returns the list of removed files. The method remove_paths is used inside the DeleteCompletedFileHandler.handle which on its own return the list of deleted files. That method is invoked by File.handle_deletable_path in an each loop and the method itself doesn't return nothing.

Copy link
Contributor Author

@kares kares Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Andrea, might have been a bit ahead of myself here ...
It's common for a (map) remove operation to return removed values.
The watched files collection is a ~ map ([String] path -> [WatchedFile] file), wanted to re-turn smt meaningful (planning to spec the return value with a collection rewrite to native).

end

def close_all
Expand Down
5 changes: 5 additions & 0 deletions lib/logstash/inputs/delete_completed_file_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

module LogStash module Inputs
class DeleteCompletedFileHandler
def initialize(watch)
@watch = watch
end

def handle(path)
Pathname.new(path).unlink rescue nil
@watch.watched_files_collection.remove_paths([path])
end
end
end end
34 changes: 24 additions & 10 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "pathname"
require "socket" # for Socket.gethostname
require "fileutils"
require "concurrent/atomic/atomic_reference"

require_relative "file/patch"
require_relative "file_listener"
Expand Down Expand Up @@ -247,6 +248,9 @@ def validate_value(value, validator)
end
end

# @private used in specs
attr_reader :watcher

def register
require "addressable/uri"
require "digest/md5"
Expand Down Expand Up @@ -274,8 +278,6 @@ def register
:check_archive_validity => @check_archive_validity,
}

@completed_file_handlers = []

@path.each do |path|
if Pathname.new(path).relative?
raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
Expand Down Expand Up @@ -319,15 +321,10 @@ def register
@watcher_class = FileWatch::ObservingTail
else
@watcher_class = FileWatch::ObservingRead
if @file_completed_action.include?('log')
@completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path)
end
if @file_completed_action.include?('delete')
@completed_file_handlers << DeleteCompletedFileHandler.new
end
end
@codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
@completely_stopped = Concurrent::AtomicBoolean.new
@queue = Concurrent::AtomicReference.new
end # def register

def completely_stopped?
Expand All @@ -344,13 +341,25 @@ def start_processing
# if the pipeline restarts this input,
# make sure previous files are closed
stop

@watcher = @watcher_class.new(@filewatch_config)

@completed_file_handlers = []
if read_mode?
if @file_completed_action.include?('log')
@completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path)
end
if @file_completed_action.include?('delete')
@completed_file_handlers << DeleteCompletedFileHandler.new(@watcher.watch)
end
end

@path.each { |path| @watcher.watch_this(path) }
end

def run(queue)
start_processing
@queue = queue
@queue.set queue
@watcher.subscribe(self) # halts here until quit is called
# last action of the subscribe call is to write the sincedb
exit_flush
Expand All @@ -361,7 +370,7 @@ def post_process_this(event)
event.set("[@metadata][host]", @host)
event.set("host", @host) unless event.include?("host")
decorate(event)
@queue << event
@queue.get << event
end

def handle_deletable_path(path)
Expand All @@ -382,6 +391,11 @@ def stop
end
end

# @private used in specs
def queue
@queue.get
end

private

def build_sincedb_base_from_settings(settings)
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-file.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-file'
s.version = '4.1.17'
s.version = '4.1.18'
s.licenses = ['Apache-2.0']
s.summary = "Streams events from files"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
4 changes: 2 additions & 2 deletions spec/filewatch/watched_files_collection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ module FileWatch
collection.add(wf3)
expect(collection.keys).to eq([filepath1, filepath2, filepath3])

collection.delete([filepath2,filepath3])
collection.remove_paths([filepath2,filepath3])
expect(collection.keys).to eq([filepath1])

expect(collection.values.size).to eq 1
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/helpers/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8

require "logstash/devutils/rspec/spec_helper"
require "rspec/wait"
require "rspec_sequencing"

module FileInput
Expand Down
75 changes: 75 additions & 0 deletions spec/inputs/file_read_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,79 @@
end
end
end

let(:temp_directory) { Stud::Temporary.directory }
let(:interval) { 0.1 }
let(:options) do
{
'mode' => "read",
'path' => "#{temp_directory}/*",
'stat_interval' => interval,
'discover_interval' => interval,
'sincedb_path' => "#{temp_directory}/.sincedb",
'sincedb_write_interval' => interval
}
end

let(:queue) { Queue.new }
let(:plugin) { LogStash::Inputs::File.new(options) }

describe 'delete on complete' do

let(:options) do
super.merge({ 'file_completed_action' => "delete", 'exit_after_read' => false })
end

let(:sample_file) { File.join(temp_directory, "sample.log") }

before do
plugin.register
@run_thread = Thread.new(plugin) do |plugin|
Thread.current.abort_on_exception = true
plugin.run queue
end

File.open(sample_file, 'w') { |fd| fd.write("sample-content\n") }

wait_for_start_processing(@run_thread)
end

after { plugin.stop }

it 'processes a file' do
wait_for_file_removal(sample_file) # watched discovery

expect( plugin.queue.size ).to eql 1
event = plugin.queue.pop
expect( event.get('message') ).to eql 'sample-content'
end

it 'removes watched file from collection' do
wait_for_file_removal(sample_file) # watched discovery
sleep(0.25) # give CI some space to execute the removal
# TODO shouldn't be necessary once WatchedFileCollection does proper locking
watched_files = plugin.watcher.watch.watched_files_collection
expect( watched_files ).to be_empty
end

private

def wait_for_start_processing(run_thread, timeout: 1.0)
begin
Timeout.timeout(timeout) do
sleep(0.01) while run_thread.status != 'sleep'
sleep(timeout) unless plugin.queue
end
rescue Timeout::Error
raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue
else
raise "plugin did not start processing" unless plugin.queue
end
end

def wait_for_file_removal(path, timeout: 3 * interval)
wait(timeout).for { File.exist?(path) }.to be_falsey
end

end
end