Skip to content

Refactor: improve debug logging (log catched exceptions) #280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 1, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 4.2.3
- Refactor: improve debug logging (log catched exceptions) [#280](https://github.com/logstash-plugins/logstash-input-file/pull/280)

## 4.2.2
- Fix: sincedb_clean_after not being respected [#276](https://github.com/logstash-plugins/logstash-input-file/pull/276)

15 changes: 7 additions & 8 deletions lib/filewatch/read_mode/handlers/base.rb
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ def quit?
end

def handle(watched_file)
logger.trace("handling: #{watched_file.path}")
logger.trace? && logger.trace("handling:", :path => watched_file.path)
unless watched_file.has_listener?
watched_file.set_listener(@observer)
end
@@ -41,14 +41,14 @@ def open_file(watched_file)
# don't emit this message too often. if a file that we can't
# read is changing a lot, we'll try to open it more often, and spam the logs.
now = Time.now.to_i
logger.trace("opening OPEN_WARN_INTERVAL is '#{OPEN_WARN_INTERVAL}'")
logger.trace? && logger.trace("opening OPEN_WARN_INTERVAL is '#{OPEN_WARN_INTERVAL}'")
if watched_file.last_open_warning_at.nil? || now - watched_file.last_open_warning_at > OPEN_WARN_INTERVAL
backtrace = e.backtrace
backtrace = backtrace.take(3) if backtrace && !logger.debug?
logger.warn("failed to open", :path => watched_file.path, :exception => e.class, :message => e.message, :backtrace => backtrace)
watched_file.last_open_warning_at = now
else
logger.trace("suppressed warning (failed to open)", :path => watched_file.path, :exception => e.class, :message => e.message)
logger.trace? && logger.trace("suppressed warning (failed to open)", :path => watched_file.path, :exception => e.class, :message => e.message)
end
watched_file.watch # set it back to watch so we can try it again
end
@@ -67,8 +67,7 @@ def add_or_update_sincedb_collection(watched_file)
elsif sincedb_value.watched_file == watched_file
update_existing_sincedb_collection_value(watched_file, sincedb_value)
else
msg = "add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file"
logger.trace(msg)
logger.trace? && logger.trace("add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file")
existing_watched_file = sincedb_value.watched_file
if existing_watched_file.nil?
sincedb_value.set_watched_file(watched_file)
@@ -77,7 +76,7 @@ def add_or_update_sincedb_collection(watched_file)
watched_file.update_bytes_read(sincedb_value.position)
else
sincedb_value.set_watched_file(watched_file)
logger.trace("add_or_update_sincedb_collection: switching from", :watched_file => watched_file.details)
logger.trace? && logger.trace("add_or_update_sincedb_collection: switching from", :watched_file => watched_file.details)
watched_file.rotate_from(existing_watched_file)
end

@@ -86,15 +85,15 @@ def add_or_update_sincedb_collection(watched_file)
end

def update_existing_sincedb_collection_value(watched_file, sincedb_value)
logger.trace("update_existing_sincedb_collection_value: #{watched_file.path}, last value #{sincedb_value.position}, cur size #{watched_file.last_stat_size}")
logger.trace? && logger.trace("update_existing_sincedb_collection_value: #{watched_file.path}, last value #{sincedb_value.position}, cur size #{watched_file.last_stat_size}")
# sincedb_value is the source of truth
watched_file.update_bytes_read(sincedb_value.position)
end

def add_new_value_sincedb_collection(watched_file)
sincedb_value = SincedbValue.new(0)
sincedb_value.set_watched_file(watched_file)
logger.trace("add_new_value_sincedb_collection:", :path => watched_file.path, :position => sincedb_value.position)
logger.trace? && logger.trace("add_new_value_sincedb_collection:", :path => watched_file.path, :position => sincedb_value.position)
sincedb_collection.set(watched_file.sincedb_key, sincedb_value)
end
end
47 changes: 22 additions & 25 deletions lib/filewatch/sincedb_collection.rb
Original file line number Diff line number Diff line change
@@ -47,16 +47,16 @@ def open
@time_sdb_opened = Time.now.to_f
begin
path.open do |file|
logger.trace("open: reading from #{path}")
logger.debug("open: reading from #{path}")
@serializer.deserialize(file) do |key, value|
logger.trace("open: importing ... '#{key}' => '#{value}'")
logger.trace? && logger.trace("open: importing #{key.inspect} => #{value.inspect}")
set_key_value(key, value)
end
end
logger.trace("open: count of keys read: #{@sincedb.keys.size}")
rescue => e
#No existing sincedb to load
logger.trace("open: error:", :path => path, :exception => e.class, :message => e.message)
logger.debug("open: error opening #{path}", :exception => e.class, :message => e.message)
end
end

@@ -68,35 +68,32 @@ def associate(watched_file)
# and due to the window handling of many files
# this file may not be opened in this session.
# a new value will be added when the file is opened
logger.trace("associate: unmatched")
logger.trace("associate: unmatched", :filename => watched_file.filename)
return true
end
logger.trace? && logger.trace("associate: found sincedb record", :filename => watched_file.filename,
:sincedb_key => watched_file.sincedb_key, :sincedb_value => sincedb_value)
if sincedb_value.watched_file.nil?
# not associated
if sincedb_value.watched_file.nil? # not associated
if sincedb_value.path_in_sincedb.nil?
handle_association(sincedb_value, watched_file)
logger.trace("associate: inode matched but no path in sincedb")
logger.trace? && logger.trace("associate: inode matched but no path in sincedb", :filename => watched_file.filename)
return true
end
if sincedb_value.path_in_sincedb == watched_file.path
# the path on disk is the same as discovered path
# and the inode is the same.
# the path on disk is the same as discovered path and the inode is the same.
handle_association(sincedb_value, watched_file)
logger.trace("associate: inode and path matched")
logger.trace? && logger.trace("associate: inode and path matched", :filename => watched_file.filename)
return true
end
# the path on disk is different from discovered unassociated path
# but they have the same key (inode)
# the path on disk is different from discovered unassociated path but they have the same key (inode)
# treat as a new file, a new value will be added when the file is opened
sincedb_value.clear_watched_file
delete(watched_file.sincedb_key)
logger.trace("associate: matched but allocated to another")
logger.trace? && logger.trace("associate: matched but allocated to another", :filename => watched_file.filename)
return true
end
if sincedb_value.watched_file.equal?(watched_file) # pointer equals
logger.trace("associate: already associated")
logger.trace? && logger.trace("associate: already associated", :filename => watched_file.filename)
return true
end
# sincedb_value.watched_file is not this discovered watched_file but they have the same key (inode)
@@ -107,7 +104,7 @@ def associate(watched_file)
# after the original is deleted
# are not yet in the delete phase, let this play out
existing_watched_file = sincedb_value.watched_file
logger.trace? && logger.trace("----------------- >> associate: the found sincedb_value has a watched_file - this is a rename",
logger.trace? && logger.trace("associate: found sincedb_value has a watched_file - this is a rename",
:this_watched_file => watched_file.details, :existing_watched_file => existing_watched_file.details)
watched_file.rotation_in_progress
true
@@ -197,43 +194,43 @@ def handle_association(sincedb_value, watched_file)
watched_file.initial_completed
if watched_file.all_read?
watched_file.ignore
logger.trace? && logger.trace("handle_association fully read, ignoring.....", :watched_file => watched_file.details, :sincedb_value => sincedb_value)
logger.trace? && logger.trace("handle_association fully read, ignoring", :watched_file => watched_file.details, :sincedb_value => sincedb_value)
end
end

def set_key_value(key, value)
if @time_sdb_opened < value.last_changed_at_expires(@settings.sincedb_expiry_duration)
logger.trace("open: setting #{key.inspect} to #{value.inspect}")
set(key, value)
else
logger.trace("open: record has expired, skipping: #{key.inspect} #{value.inspect}")
logger.debug("set_key_value: record has expired, skipping: #{key.inspect} => #{value.inspect}")
end
end

def sincedb_write(time = Time.now)
logger.trace("sincedb_write: #{path} (time = #{time})")
logger.trace? && logger.trace("sincedb_write: #{path} (time = #{time})")
begin
@write_method.call(time)
@serializer.expired_keys.each do |key|
expired_keys = @write_method.call(time)
expired_keys.each do |key|
@sincedb[key].unset_watched_file
delete(key)
logger.trace? && logger.trace("sincedb_write: cleaned", :key => key)
end
@sincedb_last_write = time.to_i
@write_requested = false
rescue Errno::EACCES
# no file handles free perhaps
# maybe it will work next time
logger.trace("sincedb_write: #{path} error: #{$!}")
rescue Errno::EACCES => e
# no file handles free perhaps - maybe it will work next time
logger.debug("sincedb_write: #{path} error:", :exception => e.class, :message => e.message)
end
end

# @return expired keys
def atomic_write(time)
FileHelper.write_atomically(@full_path) do |io|
@serializer.serialize(@sincedb, io, time.to_f)
end
end

# @return expired keys
def non_atomic_write(time)
IO.open(IO.sysopen(@full_path, "w+")) do |io|
@serializer.serialize(@sincedb, io, time.to_f)
16 changes: 5 additions & 11 deletions lib/filewatch/sincedb_record_serializer.rb
Original file line number Diff line number Diff line change
@@ -3,30 +3,25 @@
module FileWatch
class SincedbRecordSerializer

attr_reader :expired_keys

def self.days_to_seconds(days)
(24 * 3600) * days.to_f
end

def initialize(sincedb_value_expiry)
@sincedb_value_expiry = sincedb_value_expiry
@expired_keys = []
end

def update_sincedb_value_expiry_from_days(days)
@sincedb_value_expiry = SincedbRecordSerializer.days_to_seconds(days)
end

# @return Array expired keys (ones that were not written to the file)
def serialize(db, io, as_of = Time.now.to_f)
@expired_keys.clear
expired_keys = []
db.each do |key, value|
if as_of > value.last_changed_at_expires(@sincedb_value_expiry)
@expired_keys << key
expired_keys << key
next
end
io.write(serialize_record(key, value))
end
expired_keys
end

def deserialize(io)
@@ -36,8 +31,7 @@ def deserialize(io)
end

def serialize_record(k, v)
# effectively InodeStruct#to_s SincedbValue#to_s
"#{k} #{v}\n"
"#{k} #{v}\n" # effectively InodeStruct#to_s SincedbValue#to_s
end

def deserialize_record(record)
55 changes: 32 additions & 23 deletions lib/filewatch/tail_mode/handlers/base.rb
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ def quit?
end

def handle(watched_file)
logger.trace("handling: #{watched_file.filename}")
logger.trace? && logger.trace("handling:", :path => watched_file.path)
unless watched_file.has_listener?
watched_file.set_listener(@observer)
end
@@ -37,7 +37,7 @@ def update_existing_specifically(watched_file, sincedb_value)

def controlled_read(watched_file, loop_control)
changed = false
logger.trace("reading...", "iterations" => loop_control.count, "amount" => loop_control.size, "filename" => watched_file.filename)
logger.trace? && logger.trace(__method__.to_s, :iterations => loop_control.count, :amount => loop_control.size, :filename => watched_file.filename)
# from a real config (has 102 file inputs)
# -- This cfg creates a file input for every log file to create a dedicated file pointer and read all file simultaneously
# -- If we put all log files in one file input glob we will have indexing delay, because Logstash waits until the first file becomes EOF
@@ -48,7 +48,7 @@ def controlled_read(watched_file, loop_control)
loop_control.count.times do
break if quit?
begin
logger.debug("read_to_eof: get chunk")
logger.debug? && logger.debug("#{__method__} get chunk")
result = watched_file.read_extract_lines(loop_control.size) # expect BufferExtractResult
logger.trace(result.warning, result.additional) unless result.warning.empty?
changed = true
@@ -57,40 +57,42 @@ def controlled_read(watched_file, loop_control)
# sincedb position is now independent from the watched_file bytes_read
sincedb_collection.increment(watched_file.sincedb_key, line.bytesize + @settings.delimiter_byte_size)
end
rescue EOFError
rescue EOFError => e
# it only makes sense to signal EOF in "read" mode not "tail"
logger.debug(__method__.to_s, exception_details(watched_file.path, e, false))
loop_control.flag_read_error
break
rescue Errno::EWOULDBLOCK, Errno::EINTR
rescue Errno::EWOULDBLOCK, Errno::EINTR => e
logger.debug(__method__.to_s, exception_details(watched_file.path, e, false))
watched_file.listener.error
loop_control.flag_read_error
break
rescue => e
logger.error("read_to_eof: general error reading #{watched_file.path}", "error" => e.inspect, "backtrace" => e.backtrace.take(4))
logger.error("#{__method__} general error reading", exception_details(watched_file.path, e))
watched_file.listener.error
loop_control.flag_read_error
break
end
end
logger.debug("read_to_eof: exit due to quit") if quit?
logger.debug("#{__method__} stopped loop due quit") if quit?
sincedb_collection.request_disk_flush if changed
end

def open_file(watched_file)
return true if watched_file.file_open?
logger.trace("opening #{watched_file.filename}")
logger.trace? && logger.trace("open_file", :filename => watched_file.filename)
begin
watched_file.open
rescue
rescue => e
# don't emit this message too often. if a file that we can't
# read is changing a lot, we'll try to open it more often, and spam the logs.
now = Time.now.to_i
logger.trace("open_file OPEN_WARN_INTERVAL is '#{OPEN_WARN_INTERVAL}'")
logger.trace? && logger.trace("open_file OPEN_WARN_INTERVAL is '#{OPEN_WARN_INTERVAL}'")
if watched_file.last_open_warning_at.nil? || now - watched_file.last_open_warning_at > OPEN_WARN_INTERVAL
logger.warn("failed to open #{watched_file.path}: #{$!.inspect}, #{$!.backtrace.take(3)}")
logger.warn("failed to open file", exception_details(watched_file.path, e))
watched_file.last_open_warning_at = now
else
logger.trace("suppressed warning for `failed to open` #{watched_file.path}: #{$!.inspect}")
logger.debug("open_file suppressed warning `failed to open file`", exception_details(watched_file.path, e, false))
end
watched_file.watch # set it back to watch so we can try it again
else
@@ -108,40 +110,38 @@ def add_or_update_sincedb_collection(watched_file)
update_existing_sincedb_collection_value(watched_file, sincedb_value)
watched_file.initial_completed
else
msg = "add_or_update_sincedb_collection: found sincedb record"
logger.trace(msg,
"sincedb key" => watched_file.sincedb_key,
"sincedb value" => sincedb_value
)
logger.trace? && logger.trace("add_or_update_sincedb_collection: found sincedb record",
:sincedb_key => watched_file.sincedb_key, :sincedb_value => sincedb_value)
# detected a rotation, Discoverer can't handle this because this watched file is not a new discovery.
# we must handle it here, by transferring state and have the sincedb value track this watched file
# rotate_as_file and rotate_from will switch the sincedb key to the inode that the path is now pointing to
# and pickup the sincedb_value from before.
msg = "add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file"
logger.trace(msg)
logger.debug("add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file")
existing_watched_file = sincedb_value.watched_file
if existing_watched_file.nil?
sincedb_value.set_watched_file(watched_file)
logger.trace("add_or_update_sincedb_collection: switching as new file")
logger.trace? && logger.trace("add_or_update_sincedb_collection: switching as new file")
watched_file.rotate_as_file
watched_file.update_bytes_read(sincedb_value.position)
else
sincedb_value.set_watched_file(watched_file)
logger.trace("add_or_update_sincedb_collection: switching from...", "watched_file details" => watched_file.details)
logger.trace? && logger.trace("add_or_update_sincedb_collection: switching from:", :watched_file => watched_file.details)
watched_file.rotate_from(existing_watched_file)
end
end
sincedb_value
end

def update_existing_sincedb_collection_value(watched_file, sincedb_value)
logger.trace("update_existing_sincedb_collection_value: #{watched_file.filename}, last value #{sincedb_value.position}, cur size #{watched_file.last_stat_size}")
logger.trace? && logger.trace("update_existing_sincedb_collection_value", :position => sincedb_value.position,
:filename => watched_file.filename, :last_stat_size => watched_file.last_stat_size)
update_existing_specifically(watched_file, sincedb_value)
end

def add_new_value_sincedb_collection(watched_file)
sincedb_value = get_new_value_specifically(watched_file)
logger.trace("add_new_value_sincedb_collection", "position" => sincedb_value.position, "watched_file details" => watched_file.details)
logger.trace? && logger.trace("add_new_value_sincedb_collection", :position => sincedb_value.position,
:watched_file => watched_file.details)
sincedb_collection.set(watched_file.sincedb_key, sincedb_value)
sincedb_value
end
@@ -153,5 +153,14 @@ def get_new_value_specifically(watched_file)
watched_file.update_bytes_read(position)
value
end

private

def exception_details(path, e, trace = true)
details = { :path => path, :exception => e.class, :message => e.message }
details[:backtrace] = e.backtrace if trace && logger.debug?
details
end

end
end end end
Loading