Skip to content

Refactor: improve debug logging (log catched exceptions) #280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/filewatch/read_mode/handlers/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def quit?
end

def handle(watched_file)
logger.trace("handling: #{watched_file.path}")
logger.trace? && logger.trace("handling:", :path => watched_file.path)
unless watched_file.has_listener?
watched_file.set_listener(@observer)
end
Expand Down
47 changes: 22 additions & 25 deletions lib/filewatch/sincedb_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ def open
@time_sdb_opened = Time.now.to_f
begin
path.open do |file|
logger.trace("open: reading from #{path}")
logger.debug("open: reading from #{path}")
@serializer.deserialize(file) do |key, value|
logger.trace("open: importing ... '#{key}' => '#{value}'")
logger.trace? && logger.trace("open: importing #{key.inspect} => #{value.inspect}")
set_key_value(key, value)
end
end
logger.trace("open: count of keys read: #{@sincedb.keys.size}")
rescue => e
#No existing sincedb to load
logger.trace("open: error:", :path => path, :exception => e.class, :message => e.message)
logger.debug("open: error opening #{path}", :exception => e.class, :message => e.message)
end
end

Expand All @@ -68,35 +68,32 @@ def associate(watched_file)
# and due to the window handling of many files
# this file may not be opened in this session.
# a new value will be added when the file is opened
logger.trace("associate: unmatched")
logger.trace("associate: unmatched", :filename => watched_file.filename)
return true
end
logger.trace? && logger.trace("associate: found sincedb record", :filename => watched_file.filename,
:sincedb_key => watched_file.sincedb_key, :sincedb_value => sincedb_value)
if sincedb_value.watched_file.nil?
# not associated
if sincedb_value.watched_file.nil? # not associated
if sincedb_value.path_in_sincedb.nil?
handle_association(sincedb_value, watched_file)
logger.trace("associate: inode matched but no path in sincedb")
logger.trace? && logger.trace("associate: inode matched but no path in sincedb", :filename => watched_file.filename)
return true
end
if sincedb_value.path_in_sincedb == watched_file.path
# the path on disk is the same as discovered path
# and the inode is the same.
# the path on disk is the same as discovered path and the inode is the same.
handle_association(sincedb_value, watched_file)
logger.trace("associate: inode and path matched")
logger.trace? && logger.trace("associate: inode and path matched", :filename => watched_file.filename)
return true
end
# the path on disk is different from discovered unassociated path
# but they have the same key (inode)
# the path on disk is different from discovered unassociated path but they have the same key (inode)
# treat as a new file, a new value will be added when the file is opened
sincedb_value.clear_watched_file
delete(watched_file.sincedb_key)
logger.trace("associate: matched but allocated to another")
logger.trace? && logger.trace("associate: matched but allocated to another", :filename => watched_file.filename)
return true
end
if sincedb_value.watched_file.equal?(watched_file) # pointer equals
logger.trace("associate: already associated")
logger.trace? && logger.trace("associate: already associated", :filename => watched_file.filename)
return true
end
# sincedb_value.watched_file is not this discovered watched_file but they have the same key (inode)
Expand All @@ -107,7 +104,7 @@ def associate(watched_file)
# after the original is deleted
# are not yet in the delete phase, let this play out
existing_watched_file = sincedb_value.watched_file
logger.trace? && logger.trace("----------------- >> associate: the found sincedb_value has a watched_file - this is a rename",
logger.trace? && logger.trace("associate: found sincedb_value has a watched_file - this is a rename",
:this_watched_file => watched_file.details, :existing_watched_file => existing_watched_file.details)
watched_file.rotation_in_progress
true
Expand Down Expand Up @@ -197,43 +194,43 @@ def handle_association(sincedb_value, watched_file)
watched_file.initial_completed
if watched_file.all_read?
watched_file.ignore
logger.trace? && logger.trace("handle_association fully read, ignoring.....", :watched_file => watched_file.details, :sincedb_value => sincedb_value)
logger.trace? && logger.trace("handle_association fully read, ignoring", :watched_file => watched_file.details, :sincedb_value => sincedb_value)
end
end

def set_key_value(key, value)
if @time_sdb_opened < value.last_changed_at_expires(@settings.sincedb_expiry_duration)
logger.trace("open: setting #{key.inspect} to #{value.inspect}")
set(key, value)
else
logger.trace("open: record has expired, skipping: #{key.inspect} #{value.inspect}")
logger.debug("set_key_value: record has expired, skipping: #{key.inspect} => #{value.inspect}")
end
end

def sincedb_write(time = Time.now)
logger.trace("sincedb_write: #{path} (time = #{time})")
logger.trace? && logger.trace("sincedb_write: #{path} (time = #{time})")
begin
@write_method.call(time)
@serializer.expired_keys.each do |key|
expired_keys = @write_method.call(time)
expired_keys.each do |key|
@sincedb[key].unset_watched_file
delete(key)
logger.trace? && logger.trace("sincedb_write: cleaned", :key => key)
end
@sincedb_last_write = time.to_i
@write_requested = false
rescue Errno::EACCES
# no file handles free perhaps
# maybe it will work next time
logger.trace("sincedb_write: #{path} error: #{$!}")
rescue Errno::EACCES => e
# no file handles free perhaps - maybe it will work next time
logger.debug("sincedb_write: #{path} error:", :exception => e.class, :message => e.message)
end
end

# @return expired keys
def atomic_write(time)
FileHelper.write_atomically(@full_path) do |io|
@serializer.serialize(@sincedb, io, time.to_f)
end
end

# @return expired keys
def non_atomic_write(time)
IO.open(IO.sysopen(@full_path, "w+")) do |io|
@serializer.serialize(@sincedb, io, time.to_f)
Expand Down
16 changes: 5 additions & 11 deletions lib/filewatch/sincedb_record_serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,25 @@
module FileWatch
class SincedbRecordSerializer

attr_reader :expired_keys

def self.days_to_seconds(days)
(24 * 3600) * days.to_f
end

def initialize(sincedb_value_expiry)
@sincedb_value_expiry = sincedb_value_expiry
@expired_keys = []
end

def update_sincedb_value_expiry_from_days(days)
@sincedb_value_expiry = SincedbRecordSerializer.days_to_seconds(days)
end

# @return Array expired keys (ones that were not written to the file)
def serialize(db, io, as_of = Time.now.to_f)
@expired_keys.clear
expired_keys = []
db.each do |key, value|
if as_of > value.last_changed_at_expires(@sincedb_value_expiry)
@expired_keys << key
expired_keys << key
next
end
io.write(serialize_record(key, value))
end
expired_keys
end

def deserialize(io)
Expand All @@ -36,8 +31,7 @@ def deserialize(io)
end

def serialize_record(k, v)
# effectively InodeStruct#to_s SincedbValue#to_s
"#{k} #{v}\n"
"#{k} #{v}\n" # effectively InodeStruct#to_s SincedbValue#to_s
end

def deserialize_record(record)
Expand Down
52 changes: 30 additions & 22 deletions lib/filewatch/tail_mode/handlers/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def quit?
end

def handle(watched_file)
logger.trace("handling: #{watched_file.filename}")
logger.trace? && logger.trace("handling:", :path => watched_file.path)
unless watched_file.has_listener?
watched_file.set_listener(@observer)
end
Expand All @@ -37,7 +37,7 @@ def update_existing_specifically(watched_file, sincedb_value)

def controlled_read(watched_file, loop_control)
changed = false
logger.trace("reading...", "iterations" => loop_control.count, "amount" => loop_control.size, "filename" => watched_file.filename)
logger.trace(__method__.to_s, :iterations => loop_control.count, :amount => loop_control.size, :filename => watched_file.filename)
# from a real config (has 102 file inputs)
# -- This cfg creates a file input for every log file to create a dedicated file pointer and read all file simultaneously
# -- If we put all log files in one file input glob we will have indexing delay, because Logstash waits until the first file becomes EOF
Expand All @@ -48,7 +48,7 @@ def controlled_read(watched_file, loop_control)
loop_control.count.times do
break if quit?
begin
logger.debug("read_to_eof: get chunk")
logger.debug("#{__method__} get chunk")
result = watched_file.read_extract_lines(loop_control.size) # expect BufferExtractResult
logger.trace(result.warning, result.additional) unless result.warning.empty?
changed = true
Expand All @@ -57,40 +57,42 @@ def controlled_read(watched_file, loop_control)
# sincedb position is now independent from the watched_file bytes_read
sincedb_collection.increment(watched_file.sincedb_key, line.bytesize + @settings.delimiter_byte_size)
end
rescue EOFError
rescue EOFError => e
# it only makes sense to signal EOF in "read" mode not "tail"
logger.debug(__method__.to_s, exception_details(watched_file.path, e, false))
loop_control.flag_read_error
break
rescue Errno::EWOULDBLOCK, Errno::EINTR
rescue Errno::EWOULDBLOCK, Errno::EINTR => e
logger.debug(__method__.to_s, exception_details(watched_file.path, e, false))
watched_file.listener.error
loop_control.flag_read_error
break
rescue => e
logger.error("read_to_eof: general error reading #{watched_file.path}", "error" => e.inspect, "backtrace" => e.backtrace.take(4))
logger.error("#{__method__} general error reading", exception_details(watched_file.path, e))
watched_file.listener.error
loop_control.flag_read_error
break
end
end
logger.debug("read_to_eof: exit due to quit") if quit?
logger.debug("#{__method__} stopped loop due quit") if quit?
sincedb_collection.request_disk_flush if changed
end

def open_file(watched_file)
return true if watched_file.file_open?
logger.trace("opening #{watched_file.filename}")
logger.trace? && logger.trace("open_file", :filename => watched_file.filename)
begin
watched_file.open
rescue
rescue => e
# don't emit this message too often. if a file that we can't
# read is changing a lot, we'll try to open it more often, and spam the logs.
now = Time.now.to_i
logger.trace("open_file OPEN_WARN_INTERVAL is '#{OPEN_WARN_INTERVAL}'")
logger.trace? && logger.trace("open_file OPEN_WARN_INTERVAL is '#{OPEN_WARN_INTERVAL}'")
if watched_file.last_open_warning_at.nil? || now - watched_file.last_open_warning_at > OPEN_WARN_INTERVAL
logger.warn("failed to open #{watched_file.path}: #{$!.inspect}, #{$!.backtrace.take(3)}")
logger.warn("failed to open file", exception_details(watched_file.path, e))
watched_file.last_open_warning_at = now
else
logger.trace("suppressed warning for `failed to open` #{watched_file.path}: #{$!.inspect}")
logger.debug("open_file suppressed warning `failed to open file`", exception_details(watched_file.path, e, false))
end
watched_file.watch # set it back to watch so we can try it again
else
Expand All @@ -108,17 +110,13 @@ def add_or_update_sincedb_collection(watched_file)
update_existing_sincedb_collection_value(watched_file, sincedb_value)
watched_file.initial_completed
else
msg = "add_or_update_sincedb_collection: found sincedb record"
logger.trace(msg,
"sincedb key" => watched_file.sincedb_key,
"sincedb value" => sincedb_value
)
logger.trace("add_or_update_sincedb_collection: found sincedb record",
:sincedb_key => watched_file.sincedb_key, :sincedb_value => sincedb_value)
# detected a rotation, Discoverer can't handle this because this watched file is not a new discovery.
# we must handle it here, by transferring state and have the sincedb value track this watched file
# rotate_as_file and rotate_from will switch the sincedb key to the inode that the path is now pointing to
# and pickup the sincedb_value from before.
msg = "add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file"
logger.trace(msg)
logger.debug("add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file")
existing_watched_file = sincedb_value.watched_file
if existing_watched_file.nil?
sincedb_value.set_watched_file(watched_file)
Expand All @@ -127,21 +125,22 @@ def add_or_update_sincedb_collection(watched_file)
watched_file.update_bytes_read(sincedb_value.position)
else
sincedb_value.set_watched_file(watched_file)
logger.trace("add_or_update_sincedb_collection: switching from...", "watched_file details" => watched_file.details)
logger.trace("add_or_update_sincedb_collection: switching from:", :watched_file => watched_file.details)
watched_file.rotate_from(existing_watched_file)
end
end
sincedb_value
end

def update_existing_sincedb_collection_value(watched_file, sincedb_value)
logger.trace("update_existing_sincedb_collection_value: #{watched_file.filename}, last value #{sincedb_value.position}, cur size #{watched_file.last_stat_size}")
logger.trace("update_existing_sincedb_collection_value", :position => sincedb_value.position,
:filename => watched_file.filename, :last_stat_size => watched_file.last_stat_size)
update_existing_specifically(watched_file, sincedb_value)
end

def add_new_value_sincedb_collection(watched_file)
sincedb_value = get_new_value_specifically(watched_file)
logger.trace("add_new_value_sincedb_collection", "position" => sincedb_value.position, "watched_file details" => watched_file.details)
logger.trace("add_new_value_sincedb_collection", :position => sincedb_value.position, :watched_file => watched_file.details)
sincedb_collection.set(watched_file.sincedb_key, sincedb_value)
sincedb_value
end
Expand All @@ -153,5 +152,14 @@ def get_new_value_specifically(watched_file)
watched_file.update_bytes_read(position)
value
end

private

def exception_details(path, e, trace = true)
details = { :path => path, :exception => e.class, :message => e.message }
details[:backtrace] = e.backtrace if trace && logger.debug?
details
end

end
end end end
2 changes: 1 addition & 1 deletion lib/filewatch/tail_mode/handlers/delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def handle_specifically(watched_file)
# TODO consider trying to find the renamed file - it will have the same inode.
# Needs a rotate scheme rename hint from user e.g. "<name>-YYYY-MM-DD-N.<ext>" or "<name>.<ext>.N"
# send the found content to the same listener (stream identity)
logger.trace("delete", :path => watched_file.path, :watched_file => watched_file.details)
logger.trace(__method__.to_s, :path => watched_file.path, :watched_file => watched_file.details)
if watched_file.bytes_unread > 0
logger.warn(DATA_LOSS_WARNING, :path => watched_file.path, :unread_bytes => watched_file.bytes_unread)
end
Expand Down
5 changes: 2 additions & 3 deletions lib/filewatch/tail_mode/handlers/shrink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ def handle_specifically(watched_file)
end

def update_existing_specifically(watched_file, sincedb_value)
# we have a match but size is smaller
# set all to zero
# we have a match but size is smaller - set all to zero
watched_file.reset_bytes_unread
sincedb_value.update_position(0)
logger.trace("update_existing_specifically: was truncated seeking to beginning", "watched file" => watched_file.details, "sincedb value" => sincedb_value)
logger.trace("update_existing_specifically: was truncated seeking to beginning", :watched_file => watched_file.details, :sincedb_value => sincedb_value)
end
end
end end end
8 changes: 4 additions & 4 deletions lib/filewatch/tail_mode/handlers/unignore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ def get_new_value_specifically(watched_file)
# for file initially ignored their bytes_read was set to stat.size
# use this value not the `start_new_files_at` for the position
# logger.trace("get_new_value_specifically", "watched_file" => watched_file.inspect)
SincedbValue.new(watched_file.bytes_read).tap do |val|
val.set_watched_file(watched_file)
logger.trace("-------------------- >>>>> get_new_value_specifically: unignore", "watched file" => watched_file.details, "sincedb value" => val)
SincedbValue.new(watched_file.bytes_read).tap do |sincedb_value|
sincedb_value.set_watched_file(watched_file)
logger.trace("get_new_value_specifically: unignore", :watched_file => watched_file.details, :sincedb_value => sincedb_value)
end
end

Expand All @@ -26,7 +26,7 @@ def update_existing_specifically(watched_file, sincedb_value)
# we will handle grow or shrink
# for now we seek to where we were before the file got ignored (grow)
# or to the start (shrink)
logger.trace("-------------------- >>>>> update_existing_specifically: unignore", "watched file" => watched_file.details, "sincedb value" => sincedb_value)
logger.trace("update_existing_specifically: unignore", :watched_file => watched_file.details, :sincedb_value => sincedb_value)
position = 0
if watched_file.shrunk?
watched_file.update_bytes_read(0)
Expand Down
1 change: 1 addition & 0 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ def post_process_this(event)
def handle_deletable_path(path)
return if tail_mode?
return if @completed_file_handlers.empty?
@logger.debug(__method__.to_s, :path => path)
@completed_file_handlers.each { |handler| handler.handle(path) }
end

Expand Down
Loading