Skip to content

Commit 9a0ca2b

Browse files
committed
Fix: release watched files on complete (in read-mode)
not doing so leads to a steady increase in watched collection's size over time (esp. in use-cases where user is pulling in new files). the left-over file is never to be processed again - it's being deleted anyway using the completion handler.
1 parent a0384c0 commit 9a0ca2b

File tree

6 files changed

+90
-16
lines changed

6 files changed

+90
-16
lines changed

Diff for: lib/filewatch/watch.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def iterate_on_state
6767
watched_files = @watched_files_collection.values
6868
@processor.process_all_states(watched_files)
6969
ensure
70-
@watched_files_collection.delete(@processor.deletable_filepaths)
70+
@watched_files_collection.remove_paths(@processor.deletable_filepaths)
7171
@processor.deletable_filepaths.clear
7272
end
7373
end # def each

Diff for: lib/filewatch/watched_files_collection.rb

+8-5
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@ def add(watched_file)
1515
@sort_method.call
1616
end
1717

18-
def delete(paths)
19-
Array(paths).each do |f|
20-
index = @pointers.delete(f)
21-
@files.delete_at(index)
22-
refresh_pointers
18+
def remove_paths(paths)
19+
removed_files = Array(paths).map do |path|
20+
if index = @pointers.delete(path)
21+
watched_file = @files.delete_at(index)
22+
refresh_pointers
23+
watched_file
24+
end
2325
end
2426
@sort_method.call
27+
removed_files
2528
end
2629

2730
def close_all

Diff for: lib/logstash/inputs/delete_completed_file_handler.rb

+5
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22

33
module LogStash module Inputs
44
class DeleteCompletedFileHandler
5+
def initialize(watch)
6+
@watch = watch
7+
end
8+
59
def handle(path)
610
Pathname.new(path).unlink rescue nil
11+
@watch.watched_files_collection.remove_paths([path])
712
end
813
end
914
end end

Diff for: lib/logstash/inputs/file.rb

+14-8
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ def validate_value(value, validator)
247247
end
248248
end
249249

250+
attr_reader :queue, :watcher # used in specs
251+
250252
def register
251253
require "addressable/uri"
252254
require "digest/md5"
@@ -274,8 +276,6 @@ def register
274276
:check_archive_validity => @check_archive_validity,
275277
}
276278

277-
@completed_file_handlers = []
278-
279279
@path.each do |path|
280280
if Pathname.new(path).relative?
281281
raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
@@ -319,12 +319,6 @@ def register
319319
@watcher_class = FileWatch::ObservingTail
320320
else
321321
@watcher_class = FileWatch::ObservingRead
322-
if @file_completed_action.include?('log')
323-
@completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path)
324-
end
325-
if @file_completed_action.include?('delete')
326-
@completed_file_handlers << DeleteCompletedFileHandler.new
327-
end
328322
end
329323
@codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
330324
@completely_stopped = Concurrent::AtomicBoolean.new
@@ -344,7 +338,19 @@ def start_processing
344338
# if the pipeline restarts this input,
345339
# make sure previous files are closed
346340
stop
341+
347342
@watcher = @watcher_class.new(@filewatch_config)
343+
344+
@completed_file_handlers = []
345+
if read_mode?
346+
if @file_completed_action.include?('log')
347+
@completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path)
348+
end
349+
if @file_completed_action.include?('delete')
350+
@completed_file_handlers << DeleteCompletedFileHandler.new(@watcher.watch)
351+
end
352+
end
353+
348354
@path.each { |path| @watcher.watch_this(path) }
349355
end
350356

Diff for: spec/filewatch/watched_files_collection_spec.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ module FileWatch
8585
collection.add(wf3)
8686
expect(collection.keys).to eq([filepath1, filepath2, filepath3])
8787

88-
collection.delete([filepath2,filepath3])
88+
collection.remove_paths([filepath2,filepath3])
8989
expect(collection.keys).to eq([filepath1])
90-
90+
expect(collection.values.size).to eq 1
9191
end
9292
end
9393

Diff for: spec/inputs/file_read_spec.rb

+60
Original file line numberDiff line numberDiff line change
@@ -247,4 +247,64 @@
247247
end
248248
end
249249
end
250+
251+
let(:temp_directory) { Stud::Temporary.directory }
252+
let(:interval) { 0.1 }
253+
let(:options) do
254+
{
255+
'mode' => "read",
256+
'path' => "#{temp_directory}/*",
257+
'stat_interval' => interval,
258+
'discover_interval' => interval,
259+
'sincedb_path' => "#{temp_directory}/.sincedb",
260+
'sincedb_write_interval' => interval
261+
}
262+
end
263+
264+
let(:queue) { Queue.new }
265+
let(:plugin) { LogStash::Inputs::File.new(options) }
266+
267+
describe 'delete on complete' do
268+
269+
let(:options) do
270+
super.merge({ 'file_completed_action' => "delete", 'exit_after_read' => false })
271+
end
272+
273+
let(:sample_file) { File.join(temp_directory, "sample.log") }
274+
275+
before do
276+
plugin.register
277+
Thread.new(plugin) do |plugin|
278+
Thread.current.abort_on_exception = true
279+
plugin.run queue
280+
end
281+
282+
File.open(sample_file, 'w') { |fd| fd.write("sample-content\n") }
283+
end
284+
285+
after { plugin.stop }
286+
287+
it 'processes a file' do
288+
wait_for_watcher_discovery
289+
290+
expect( plugin.queue.size ).to eql 1
291+
event = plugin.queue.pop
292+
expect( event.get('message') ).to eql 'sample-content'
293+
end
294+
295+
it 'removes watched file from collection' do
296+
wait_for_watcher_discovery
297+
298+
watched_files = plugin.watcher.watch.watched_files_collection
299+
expect( watched_files ).to be_empty
300+
end
301+
302+
private
303+
304+
def wait_for_watcher_discovery(interval: self.interval)
305+
sleep(3 * interval) # let the watched discover files
306+
raise "plugin did not start processing" unless plugin.queue
307+
end
308+
309+
end
250310
end

0 commit comments

Comments
 (0)