Skip to content

Commit aec0be9

Browse files
authored
Fix: release watched files on completion (in read-mode) (#271)
not doing so leads to a steady increase in watched collection's size over time (esp. in use-cases where user is pulling in new files). the left-over file is never to be processed again - it's being deleted anyway using the completion handler.
1 parent a0384c0 commit aec0be9

File tree

10 files changed

+124
-19
lines changed

10 files changed

+124
-19
lines changed

Diff for: CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.1.18
2+
- Fix: release watched files on completion (in read-mode) [#271](https://github.com/logstash-plugins/logstash-input-file/pull/271)
3+
14
## 4.1.17
25
- Added configuration setting `check_archive_validity` settings to enable
36
gzipped files verification, issue

Diff for: lib/filewatch/read_mode/handlers/read_file.rb

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ def handle_specifically(watched_file)
2222
sincedb_collection.reading_completed(key)
2323
sincedb_collection.clear_watched_file(key)
2424
watched_file.listener.deleted
25+
# NOTE: on top of un-watching we should also remove from the watched files collection
26+
# if the file is getting deleted (on completion), that part currently resides in
27+
# DeleteCompletedFileHandler - triggered above using `watched_file.listener.deleted`
2528
watched_file.unwatch
2629
end
2730
end

Diff for: lib/filewatch/watch.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def iterate_on_state
6767
watched_files = @watched_files_collection.values
6868
@processor.process_all_states(watched_files)
6969
ensure
70-
@watched_files_collection.delete(@processor.deletable_filepaths)
70+
@watched_files_collection.remove_paths(@processor.deletable_filepaths)
7171
@processor.deletable_filepaths.clear
7272
end
7373
end # def each

Diff for: lib/filewatch/watched_files_collection.rb

+9-5
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,17 @@ def add(watched_file)
1515
@sort_method.call
1616
end
1717

18-
def delete(paths)
19-
Array(paths).each do |f|
20-
index = @pointers.delete(f)
21-
@files.delete_at(index)
22-
refresh_pointers
18+
def remove_paths(paths)
19+
removed_files = Array(paths).map do |path|
20+
index = @pointers.delete(path)
21+
if index
22+
watched_file = @files.delete_at(index)
23+
refresh_pointers
24+
watched_file
25+
end
2326
end
2427
@sort_method.call
28+
removed_files
2529
end
2630

2731
def close_all

Diff for: lib/logstash/inputs/delete_completed_file_handler.rb

+5
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22

33
module LogStash module Inputs
44
class DeleteCompletedFileHandler
5+
def initialize(watch)
6+
@watch = watch
7+
end
8+
59
def handle(path)
610
Pathname.new(path).unlink rescue nil
11+
@watch.watched_files_collection.remove_paths([path])
712
end
813
end
914
end end

Diff for: lib/logstash/inputs/file.rb

+24-10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
require "pathname"
77
require "socket" # for Socket.gethostname
88
require "fileutils"
9+
require "concurrent/atomic/atomic_reference"
910

1011
require_relative "file/patch"
1112
require_relative "file_listener"
@@ -247,6 +248,9 @@ def validate_value(value, validator)
247248
end
248249
end
249250

251+
# @private used in specs
252+
attr_reader :watcher
253+
250254
def register
251255
require "addressable/uri"
252256
require "digest/md5"
@@ -274,8 +278,6 @@ def register
274278
:check_archive_validity => @check_archive_validity,
275279
}
276280

277-
@completed_file_handlers = []
278-
279281
@path.each do |path|
280282
if Pathname.new(path).relative?
281283
raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
@@ -319,15 +321,10 @@ def register
319321
@watcher_class = FileWatch::ObservingTail
320322
else
321323
@watcher_class = FileWatch::ObservingRead
322-
if @file_completed_action.include?('log')
323-
@completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path)
324-
end
325-
if @file_completed_action.include?('delete')
326-
@completed_file_handlers << DeleteCompletedFileHandler.new
327-
end
328324
end
329325
@codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
330326
@completely_stopped = Concurrent::AtomicBoolean.new
327+
@queue = Concurrent::AtomicReference.new
331328
end # def register
332329

333330
def completely_stopped?
@@ -344,13 +341,25 @@ def start_processing
344341
# if the pipeline restarts this input,
345342
# make sure previous files are closed
346343
stop
344+
347345
@watcher = @watcher_class.new(@filewatch_config)
346+
347+
@completed_file_handlers = []
348+
if read_mode?
349+
if @file_completed_action.include?('log')
350+
@completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path)
351+
end
352+
if @file_completed_action.include?('delete')
353+
@completed_file_handlers << DeleteCompletedFileHandler.new(@watcher.watch)
354+
end
355+
end
356+
348357
@path.each { |path| @watcher.watch_this(path) }
349358
end
350359

351360
def run(queue)
352361
start_processing
353-
@queue = queue
362+
@queue.set queue
354363
@watcher.subscribe(self) # halts here until quit is called
355364
# last action of the subscribe call is to write the sincedb
356365
exit_flush
@@ -361,7 +370,7 @@ def post_process_this(event)
361370
event.set("[@metadata][host]", @host)
362371
event.set("host", @host) unless event.include?("host")
363372
decorate(event)
364-
@queue << event
373+
@queue.get << event
365374
end
366375

367376
def handle_deletable_path(path)
@@ -382,6 +391,11 @@ def stop
382391
end
383392
end
384393

394+
# @private used in specs
395+
def queue
396+
@queue.get
397+
end
398+
385399
private
386400

387401
def build_sincedb_base_from_settings(settings)

Diff for: logstash-input-file.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-file'
4-
s.version = '4.1.17'
4+
s.version = '4.1.18'
55
s.licenses = ['Apache-2.0']
66
s.summary = "Streams events from files"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

Diff for: spec/filewatch/watched_files_collection_spec.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ module FileWatch
8585
collection.add(wf3)
8686
expect(collection.keys).to eq([filepath1, filepath2, filepath3])
8787

88-
collection.delete([filepath2,filepath3])
88+
collection.remove_paths([filepath2,filepath3])
8989
expect(collection.keys).to eq([filepath1])
90-
90+
expect(collection.values.size).to eq 1
9191
end
9292
end
9393

Diff for: spec/helpers/spec_helper.rb

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# encoding: utf-8
22

33
require "logstash/devutils/rspec/spec_helper"
4+
require "rspec/wait"
45
require "rspec_sequencing"
56

67
module FileInput

Diff for: spec/inputs/file_read_spec.rb

+75
Original file line numberDiff line numberDiff line change
@@ -247,4 +247,79 @@
247247
end
248248
end
249249
end
250+
251+
let(:temp_directory) { Stud::Temporary.directory }
252+
let(:interval) { 0.1 }
253+
let(:options) do
254+
{
255+
'mode' => "read",
256+
'path' => "#{temp_directory}/*",
257+
'stat_interval' => interval,
258+
'discover_interval' => interval,
259+
'sincedb_path' => "#{temp_directory}/.sincedb",
260+
'sincedb_write_interval' => interval
261+
}
262+
end
263+
264+
let(:queue) { Queue.new }
265+
let(:plugin) { LogStash::Inputs::File.new(options) }
266+
267+
describe 'delete on complete' do
268+
269+
let(:options) do
270+
super.merge({ 'file_completed_action' => "delete", 'exit_after_read' => false })
271+
end
272+
273+
let(:sample_file) { File.join(temp_directory, "sample.log") }
274+
275+
before do
276+
plugin.register
277+
@run_thread = Thread.new(plugin) do |plugin|
278+
Thread.current.abort_on_exception = true
279+
plugin.run queue
280+
end
281+
282+
File.open(sample_file, 'w') { |fd| fd.write("sample-content\n") }
283+
284+
wait_for_start_processing(@run_thread)
285+
end
286+
287+
after { plugin.stop }
288+
289+
it 'processes a file' do
290+
wait_for_file_removal(sample_file) # watched discovery
291+
292+
expect( plugin.queue.size ).to eql 1
293+
event = plugin.queue.pop
294+
expect( event.get('message') ).to eql 'sample-content'
295+
end
296+
297+
it 'removes watched file from collection' do
298+
wait_for_file_removal(sample_file) # watched discovery
299+
sleep(0.25) # give CI some space to execute the removal
300+
# TODO shouldn't be necessary once WatchedFileCollection does proper locking
301+
watched_files = plugin.watcher.watch.watched_files_collection
302+
expect( watched_files ).to be_empty
303+
end
304+
305+
private
306+
307+
def wait_for_start_processing(run_thread, timeout: 1.0)
308+
begin
309+
Timeout.timeout(timeout) do
310+
sleep(0.01) while run_thread.status != 'sleep'
311+
sleep(timeout) unless plugin.queue
312+
end
313+
rescue Timeout::Error
314+
raise "plugin did not start processing (timeout: #{timeout})" unless plugin.queue
315+
else
316+
raise "plugin did not start processing" unless plugin.queue
317+
end
318+
end
319+
320+
def wait_for_file_removal(path, timeout: 3 * interval)
321+
wait(timeout).for { File.exist?(path) }.to be_falsey
322+
end
323+
324+
end
250325
end

0 commit comments

Comments
 (0)