Skip to content

Commit dd5dfa5

Browse files
author
Guy Boertje
committed
handle rotations better.
especially in the case where the rotated files are discoverable - in this case we were not the moving the state and the sincedb record correctly. Next - abstract the *nix and Windows stat calls and structure into two classes with the same API.
1 parent 88115d8 commit dd5dfa5

21 files changed

+496
-257
lines changed

Diff for: lib/filewatch/bootstrap.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ module FileWatch
1111
# this is used in the read loop e.g.
1212
# @opts[:file_chunk_count].times do
1313
# where file_chunk_count defaults to this constant
14-
FIXNUM_MAX = (2**(0.size * 8 - 2) - 1)
14+
MAX_ITERATIONS = (2**(0.size * 8 - 2) - 2) / 32768
1515

1616
require_relative "helper"
1717

Diff for: lib/filewatch/discoverer.rb

+19-18
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def can_exclude?(watched_file, new_discovery)
3737
@exclude.each do |pattern|
3838
if watched_file.pathname.fnmatch?(pattern)
3939
if new_discovery
40-
logger.debug("Discoverer can_exclude?: #{watched_file.path}: skipping " +
40+
logger.trace("Discoverer can_exclude?: #{watched_file.path}: skipping " +
4141
"because it matches exclude #{pattern}")
4242
end
4343
watched_file.unwatch
@@ -49,41 +49,42 @@ def can_exclude?(watched_file, new_discovery)
4949

5050
def discover_files(path)
5151
fileset = Dir.glob(path).select{|f| File.file?(f) && !File.symlink?(f)}
52-
logger.debug("Discoverer found files, count: #{fileset.size}")
52+
logger.trace("Discoverer found files, count: #{fileset.size}")
53+
rotated_paths = []
5354
fileset.each do |file|
54-
logger.debug("Discoverer found file, path: #{file}")
55+
logger.trace("Discoverer found file, path: #{file}")
5556
pathname = Pathname.new(file)
5657
new_discovery = false
5758
watched_file = @watched_files_collection.watched_file_by_path(file)
5859
if watched_file.nil?
5960
new_discovery = true
6061
watched_file = WatchedFile.new(pathname, pathname.stat, @settings)
61-
logger.debug("Discoverer new discovery: #{watched_file.path}, inode: #{watched_file.sincedb_key.inode}, discover_files: #{path} (exclude is #{@exclude.inspect})")
62+
logger.trace("Discoverer new discovery: #{watched_file.path}, inode: #{watched_file.sincedb_key.inode}")
6263
end
6364
# if it already unwatched or its excluded then we can skip
6465
next if watched_file.unwatched? || can_exclude?(watched_file, new_discovery)
6566

6667
if new_discovery
67-
if watched_file.file_ignorable?
68-
logger.debug("Discoverer discover_files: #{file}: skipping because it was last modified more than #{@settings.ignore_older} seconds ago")
69-
# on discovery ignorable watched_files are put into the ignored state and that
70-
# updates the size from the internal stat
71-
# so the existing contents are not read.
72-
# because, normally, a newly discovered file will
73-
# have a watched_file size of zero
74-
# they are still added to the collection so we know they are there for the next periodic discovery
75-
watched_file.ignore
76-
end
77-
78-
# now add the discovered file to the watched_files collection and adjust the sincedb collections
79-
@watched_files_collection.add(watched_file)
8068
# initially when the sincedb collection is filled with records from the persistence file
8169
# each value is not associated with a watched file
8270
# a sincedb_value can be:
8371
# unassociated
8472
# associated with this watched_file
8573
# associated with a different watched_file
86-
@sincedb_collection.associate(watched_file)
74+
if @sincedb_collection.associate(watched_file)
75+
if watched_file.file_ignorable?
76+
logger.trace("Discoverer discover_files: #{file}: skipping because it was last modified more than #{@settings.ignore_older} seconds ago")
77+
# on discovery ignorable watched_files are put into the ignored state and that
78+
# updates the size from the internal stat
79+
# so the existing contents are not read.
80+
# because, normally, a newly discovered file will
81+
# have a watched_file size of zero
82+
# they are still added to the collection so we know they are there for the next periodic discovery
83+
watched_file.ignore
84+
end
85+
# now add the discovered file to the watched_files collection and adjust the sincedb collections
86+
@watched_files_collection.add(watched_file)
87+
end
8788
end
8889
# at this point the watched file is created, is in the db but not yet opened or being processed
8990
end

Diff for: lib/filewatch/observing_base.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ def initialize(opts={})
4444
:exclude => [],
4545
:start_new_files_at => :end,
4646
:delimiter => "\n",
47-
:file_chunk_count => FIXNUM_MAX,
47+
:file_chunk_count => MAX_ITERATIONS,
48+
:file_chunk_size => FILE_READ_SIZE,
4849
:file_sort_by => "last_modified",
4950
:file_sort_direction => "asc",
5051
}.merge(opts)

Diff for: lib/filewatch/read_mode/handlers/base.rb

+19-6
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def initialize(sincedb_collection, observer, settings)
1414
end
1515

1616
def handle(watched_file)
17-
logger.debug("handling: #{watched_file.path}")
17+
logger.trace("handling: #{watched_file.path}")
1818
unless watched_file.has_listener?
1919
watched_file.set_listener(@observer)
2020
end
@@ -29,7 +29,7 @@ def handle_specifically(watched_file)
2929

3030
def open_file(watched_file)
3131
return true if watched_file.file_open?
32-
logger.debug("opening #{watched_file.path}")
32+
logger.trace("opening #{watched_file.path}")
3333
begin
3434
watched_file.open
3535
rescue
@@ -41,7 +41,7 @@ def open_file(watched_file)
4141
logger.warn("failed to open #{watched_file.path}: #{$!.inspect}, #{$!.backtrace.take(3)}")
4242
watched_file.last_open_warning_at = now
4343
else
44-
logger.debug("suppressed warning for `failed to open` #{watched_file.path}: #{$!.inspect}")
44+
logger.trace("suppressed warning for `failed to open` #{watched_file.path}: #{$!.inspect}")
4545
end
4646
watched_file.watch # set it back to watch so we can try it again
4747
end
@@ -60,21 +60,34 @@ def add_or_update_sincedb_collection(watched_file)
6060
elsif sincedb_value.watched_file == watched_file
6161
update_existing_sincedb_collection_value(watched_file, sincedb_value)
6262
else
63-
logger.warn? && logger.warn("mismatch on sincedb_value.watched_file, this should have been handled by Discoverer")
63+
msg = "add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file"
64+
logger.trace(msg)
65+
existing_watched_file = sincedb_value.watched_file
66+
if existing_watched_file.nil?
67+
sincedb_value.set_watched_file(watched_file)
68+
logger.trace("add_or_update_sincedb_collection: switching as new file")
69+
watched_file.rotate_as_file
70+
watched_file.update_bytes_read(sincedb_value.position)
71+
else
72+
sincedb_value.set_watched_file(watched_file)
73+
logger.trace("add_or_update_sincedb_collection: switching from...", "watched_file details" => watched_file.details)
74+
watched_file.rotate_from(existing_watched_file)
75+
end
76+
6477
end
6578
watched_file.initial_completed
6679
end
6780

6881
def update_existing_sincedb_collection_value(watched_file, sincedb_value)
69-
logger.debug("update_existing_sincedb_collection_value: #{watched_file.path}, last value #{sincedb_value.position}, cur size #{watched_file.last_stat_size}")
82+
logger.trace("update_existing_sincedb_collection_value: #{watched_file.path}, last value #{sincedb_value.position}, cur size #{watched_file.last_stat_size}")
7083
# sincedb_value is the source of truth
7184
watched_file.update_bytes_read(sincedb_value.position)
7285
end
7386

7487
def add_new_value_sincedb_collection(watched_file)
7588
sincedb_value = SincedbValue.new(0)
7689
sincedb_value.set_watched_file(watched_file)
77-
logger.debug("add_new_value_sincedb_collection: #{watched_file.path}", "position" => sincedb_value.position)
90+
logger.trace("add_new_value_sincedb_collection: #{watched_file.path}", "position" => sincedb_value.position)
7891
sincedb_collection.set(watched_file.sincedb_key, sincedb_value)
7992
end
8093
end

Diff for: lib/filewatch/read_mode/handlers/read_file.rb

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ def handle_specifically(watched_file)
99
watched_file.read_loop_count.times do
1010
begin
1111
result = watched_file.read_extract_lines # expect BufferExtractResult
12-
logger.info(result.warning, result.additional) unless result.warning.empty?
12+
logger.trace(result.warning, result.additional) unless result.warning.empty?
1313
changed = true
1414
result.lines.each do |line|
1515
watched_file.listener.accept(line)
@@ -22,8 +22,9 @@ def handle_specifically(watched_file)
2222
watched_file.listener.accept(line) unless line.empty?
2323
watched_file.listener.eof
2424
watched_file.file_close
25-
# unset_watched_file will set sincedb_value.position to be watched_file.bytes_read
26-
sincedb_collection.unset_watched_file(watched_file)
25+
key = watched_file.sincedb_key
26+
sincedb_collection.reading_completed(key)
27+
sincedb_collection.clear_watched_file(key)
2728
watched_file.listener.deleted
2829
watched_file.unwatch
2930
break

Diff for: lib/filewatch/read_mode/handlers/read_zip_file.rb

+5-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ def handle_specifically(watched_file)
1717
# should we track lines read in the sincedb and
1818
# fast forward through the lines until we reach unseen content?
1919
# meaning that we can quit in the middle of a zip file
20+
key = watched_file.sincedb_key
21+
sincedb_collection.reading_completed(key)
22+
2023
begin
2124
file_stream = FileInputStream.new(watched_file.path)
2225
gzip_stream = GZIPInputStream.new(file_stream)
@@ -30,7 +33,7 @@ def handle_specifically(watched_file)
3033
logger.error("Cannot decompress the gzip file at path: #{watched_file.path}")
3134
watched_file.listener.error
3235
else
33-
sincedb_collection.store_last_read(watched_file.sincedb_key, watched_file.last_stat_size)
36+
sincedb_collection.store_last_read(key, watched_file.last_stat_size)
3437
sincedb_collection.request_disk_flush
3538
watched_file.listener.deleted
3639
watched_file.unwatch
@@ -41,7 +44,7 @@ def handle_specifically(watched_file)
4144
close_and_ignore_ioexception(gzip_stream) unless gzip_stream.nil?
4245
close_and_ignore_ioexception(file_stream) unless file_stream.nil?
4346
end
44-
sincedb_collection.unset_watched_file(watched_file)
47+
sincedb_collection.clear_watched_file(key)
4548
end
4649

4750
private

Diff for: lib/filewatch/read_mode/processor.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def process_ignored(watched_files)
4646
end
4747

4848
def process_watched(watched_files)
49-
logger.debug("Watched processing")
49+
logger.trace("Watched processing")
5050
# Handles watched_files in the watched state.
5151
# for a slice of them:
5252
# move to the active state
@@ -79,7 +79,7 @@ def process_watched(watched_files)
7979
end
8080

8181
def process_active(watched_files)
82-
logger.debug("Active processing")
82+
logger.trace("Active processing")
8383
# Handles watched_files in the active state.
8484
watched_files.select {|wf| wf.active? }.each do |watched_file|
8585
path = watched_file.path
@@ -107,7 +107,7 @@ def common_deleted_reaction(watched_file, action)
107107
# file has gone away or we can't read it anymore.
108108
watched_file.unwatch
109109
deletable_filepaths << watched_file.path
110-
logger.debug("#{action} - stat failed: #{watched_file.path}, removing from collection")
110+
logger.trace("#{action} - stat failed: #{watched_file.path}, removing from collection")
111111
end
112112

113113
def common_error_reaction(path, error, action)

Diff for: lib/filewatch/settings.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def initialize
2222
:delimiter => "\n",
2323
:file_chunk_size => FILE_READ_SIZE,
2424
:max_active => 4095,
25-
:file_chunk_count => FIXNUM_MAX,
25+
:file_chunk_count => MAX_ITERATIONS,
2626
:sincedb_clean_after => 14,
2727
:exclude => [],
2828
:stat_interval => 1,

0 commit comments

Comments
 (0)