Skip to content

Commit 4bf6de5

Browse files
committed
Fix: release watched files on complete (in read-mode)
not doing so leads to a steady increase in watched collection's size over time (esp. in use-cases where user is pulling in new files). the left-over file is never to be processed again - it's being deleted anyway using the completion handler.
1 parent a0384c0 commit 4bf6de5

File tree

6 files changed

+107
-16
lines changed

6 files changed

+107
-16
lines changed

Diff for: lib/filewatch/watch.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def iterate_on_state
6767
watched_files = @watched_files_collection.values
6868
@processor.process_all_states(watched_files)
6969
ensure
70-
@watched_files_collection.delete(@processor.deletable_filepaths)
70+
@watched_files_collection.remove_paths(@processor.deletable_filepaths)
7171
@processor.deletable_filepaths.clear
7272
end
7373
end # def each

Diff for: lib/filewatch/watched_files_collection.rb

+8-5
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@ def add(watched_file)
1515
@sort_method.call
1616
end
1717

18-
def delete(paths)
19-
Array(paths).each do |f|
20-
index = @pointers.delete(f)
21-
@files.delete_at(index)
22-
refresh_pointers
18+
def remove_paths(paths)
19+
removed_files = Array(paths).map do |path|
20+
if index = @pointers.delete(path)
21+
watched_file = @files.delete_at(index)
22+
refresh_pointers
23+
watched_file
24+
end
2325
end
2426
@sort_method.call
27+
removed_files
2528
end
2629

2730
def close_all

Diff for: lib/logstash/inputs/delete_completed_file_handler.rb

+5
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22

33
module LogStash module Inputs
44
class DeleteCompletedFileHandler
5+
def initialize(watch)
6+
@watch = watch
7+
end
8+
59
def handle(path)
610
Pathname.new(path).unlink rescue nil
11+
@watch.watched_files_collection.remove_paths([path])
712
end
813
end
914
end end

Diff for: lib/logstash/inputs/file.rb

+14-8
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ def validate_value(value, validator)
247247
end
248248
end
249249

250+
attr_reader :queue, :watcher # used in specs
251+
250252
def register
251253
require "addressable/uri"
252254
require "digest/md5"
@@ -274,8 +276,6 @@ def register
274276
:check_archive_validity => @check_archive_validity,
275277
}
276278

277-
@completed_file_handlers = []
278-
279279
@path.each do |path|
280280
if Pathname.new(path).relative?
281281
raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
@@ -319,12 +319,6 @@ def register
319319
@watcher_class = FileWatch::ObservingTail
320320
else
321321
@watcher_class = FileWatch::ObservingRead
322-
if @file_completed_action.include?('log')
323-
@completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path)
324-
end
325-
if @file_completed_action.include?('delete')
326-
@completed_file_handlers << DeleteCompletedFileHandler.new
327-
end
328322
end
329323
@codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
330324
@completely_stopped = Concurrent::AtomicBoolean.new
@@ -344,7 +338,19 @@ def start_processing
344338
# if the pipeline restarts this input,
345339
# make sure previous files are closed
346340
stop
341+
347342
@watcher = @watcher_class.new(@filewatch_config)
343+
344+
@completed_file_handlers = []
345+
if read_mode?
346+
if @file_completed_action.include?('log')
347+
@completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path)
348+
end
349+
if @file_completed_action.include?('delete')
350+
@completed_file_handlers << DeleteCompletedFileHandler.new(@watcher.watch)
351+
end
352+
end
353+
348354
@path.each { |path| @watcher.watch_this(path) }
349355
end
350356

Diff for: spec/filewatch/watched_files_collection_spec.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ module FileWatch
8585
collection.add(wf3)
8686
expect(collection.keys).to eq([filepath1, filepath2, filepath3])
8787

88-
collection.delete([filepath2,filepath3])
88+
collection.remove_paths([filepath2,filepath3])
8989
expect(collection.keys).to eq([filepath1])
90-
90+
expect(collection.values.size).to eq 1
9191
end
9292
end
9393

Diff for: spec/inputs/file_read_spec.rb

+77
Original file line numberDiff line numberDiff line change
@@ -247,4 +247,81 @@
247247
end
248248
end
249249
end
250+
251+
let(:temp_directory) do
252+
Stud::Temporary.directory
253+
end
254+
255+
let(:interval) { 0.1 }
256+
257+
let(:options) do
258+
{
259+
'mode' => "read",
260+
'path' => "#{temp_directory}/*",
261+
'stat_interval' => interval,
262+
'discover_interval' => interval,
263+
'sincedb_path' => "#{temp_directory}/.sincedb",
264+
'sincedb_write_interval' => interval
265+
}
266+
end
267+
268+
let(:queue) do
269+
Queue.new
270+
end
271+
272+
let(:plugin) do
273+
LogStash::Inputs::File.new(options)
274+
end
275+
276+
subject do
277+
plugin.register
278+
Thread.start do
279+
Thread.current.abort_on_exception = true
280+
plugin.run queue
281+
end
282+
plugin
283+
end
284+
285+
after { subject.stop }
286+
287+
context 'delete on complete' do
288+
289+
let(:sample_file) do
290+
File.join(temp_directory, "sample.log")
291+
end
292+
293+
before(:each) do
294+
subject # register and start processing
295+
File.open(sample_file, 'w') { |fd| fd.write("sample-content\n") }
296+
end
297+
298+
let(:options) do
299+
super.merge({ 'file_completed_action' => "delete", 'exit_after_read' => false })
300+
end
301+
302+
it 'processes a file' do
303+
wait_for_watcher_discovery
304+
305+
expect( subject.queue.size ).to eql 1
306+
event = subject.queue.pop
307+
expect( event.get('message') ).to eql 'sample-content'
308+
end
309+
310+
it 'removes watched file from collection' do
311+
wait_for_watcher_discovery
312+
313+
watched_files = plugin.watcher.watch.watched_files_collection
314+
expect( watched_files ).to be_empty
315+
end
316+
317+
private
318+
319+
def wait_for_watcher_discovery(interval: self.interval)
320+
subject
321+
sleep(3 * interval) # let the watched discover files
322+
raise "plugin did not start processing" unless subject.queue
323+
end
324+
325+
end
326+
250327
end

0 commit comments

Comments
 (0)