@@ -18,7 +18,7 @@ def quit?
18
18
end
19
19
20
20
def handle ( watched_file )
21
- logger . trace ( "handling: #{ watched_file . filename } " )
21
+ logger . trace? && logger . trace ( "handling:" , :path => watched_file . path )
22
22
unless watched_file . has_listener?
23
23
watched_file . set_listener ( @observer )
24
24
end
@@ -37,7 +37,7 @@ def update_existing_specifically(watched_file, sincedb_value)
37
37
38
38
def controlled_read ( watched_file , loop_control )
39
39
changed = false
40
- logger . trace ( "reading..." , " iterations" => loop_control . count , " amount" => loop_control . size , " filename" => watched_file . filename )
40
+ logger . trace? && logger . trace ( __method__ . to_s , : iterations => loop_control . count , : amount => loop_control . size , : filename => watched_file . filename )
41
41
# from a real config (has 102 file inputs)
42
42
# -- This cfg creates a file input for every log file to create a dedicated file pointer and read all file simultaneously
43
43
# -- If we put all log files in one file input glob we will have indexing delay, because Logstash waits until the first file becomes EOF
@@ -48,7 +48,7 @@ def controlled_read(watched_file, loop_control)
48
48
loop_control . count . times do
49
49
break if quit?
50
50
begin
51
- logger . debug ( "read_to_eof: get chunk")
51
+ logger . debug? && logger . debug ( " #{ __method__ } get chunk")
52
52
result = watched_file . read_extract_lines ( loop_control . size ) # expect BufferExtractResult
53
53
logger . trace ( result . warning , result . additional ) unless result . warning . empty?
54
54
changed = true
@@ -57,40 +57,42 @@ def controlled_read(watched_file, loop_control)
57
57
# sincedb position is now independent from the watched_file bytes_read
58
58
sincedb_collection . increment ( watched_file . sincedb_key , line . bytesize + @settings . delimiter_byte_size )
59
59
end
60
- rescue EOFError
60
+ rescue EOFError => e
61
61
# it only makes sense to signal EOF in "read" mode not "tail"
62
+ logger . debug ( __method__ . to_s , exception_details ( watched_file . path , e , false ) )
62
63
loop_control . flag_read_error
63
64
break
64
- rescue Errno ::EWOULDBLOCK , Errno ::EINTR
65
+ rescue Errno ::EWOULDBLOCK , Errno ::EINTR => e
66
+ logger . debug ( __method__ . to_s , exception_details ( watched_file . path , e , false ) )
65
67
watched_file . listener . error
66
68
loop_control . flag_read_error
67
69
break
68
70
rescue => e
69
- logger . error ( "read_to_eof: general error reading #{ watched_file . path } " , "error" => e . inspect , "backtrace" => e . backtrace . take ( 4 ) )
71
+ logger . error ( "#{ __method__ } general error reading" , exception_details ( watched_file . path , e ) )
70
72
watched_file . listener . error
71
73
loop_control . flag_read_error
72
74
break
73
75
end
74
76
end
75
- logger . debug ( "read_to_eof: exit due to quit" ) if quit?
77
+ logger . debug ( "#{ __method__ } stopped loop due quit" ) if quit?
76
78
sincedb_collection . request_disk_flush if changed
77
79
end
78
80
79
81
def open_file ( watched_file )
80
82
return true if watched_file . file_open?
81
- logger . trace ( "opening #{ watched_file . filename } " )
83
+ logger . trace? && logger . trace ( "open_file" , :filename => watched_file . filename )
82
84
begin
83
85
watched_file . open
84
- rescue
86
+ rescue => e
85
87
# don't emit this message too often. if a file that we can't
86
88
# read is changing a lot, we'll try to open it more often, and spam the logs.
87
89
now = Time . now . to_i
88
- logger . trace ( "open_file OPEN_WARN_INTERVAL is '#{ OPEN_WARN_INTERVAL } '" )
90
+ logger . trace? && logger . trace ( "open_file OPEN_WARN_INTERVAL is '#{ OPEN_WARN_INTERVAL } '" )
89
91
if watched_file . last_open_warning_at . nil? || now - watched_file . last_open_warning_at > OPEN_WARN_INTERVAL
90
- logger . warn ( "failed to open #{ watched_file . path } : #{ $! . inspect } , #{ $! . backtrace . take ( 3 ) } " )
92
+ logger . warn ( "failed to open file" , exception_details ( watched_file . path , e ) )
91
93
watched_file . last_open_warning_at = now
92
94
else
93
- logger . trace ( " suppressed warning for `failed to open` #{ watched_file . path } : #{ $! . inspect } " )
95
+ logger . debug ( "open_file suppressed warning `failed to open file`" , exception_details ( watched_file . path , e , false ) )
94
96
end
95
97
watched_file . watch # set it back to watch so we can try it again
96
98
else
@@ -108,40 +110,38 @@ def add_or_update_sincedb_collection(watched_file)
108
110
update_existing_sincedb_collection_value ( watched_file , sincedb_value )
109
111
watched_file . initial_completed
110
112
else
111
- msg = "add_or_update_sincedb_collection: found sincedb record"
112
- logger . trace ( msg ,
113
- "sincedb key" => watched_file . sincedb_key ,
114
- "sincedb value" => sincedb_value
115
- )
113
+ logger . trace? && logger . trace ( "add_or_update_sincedb_collection: found sincedb record" ,
114
+ :sincedb_key => watched_file . sincedb_key , :sincedb_value => sincedb_value )
116
115
# detected a rotation, Discoverer can't handle this because this watched file is not a new discovery.
117
116
# we must handle it here, by transferring state and have the sincedb value track this watched file
118
117
# rotate_as_file and rotate_from will switch the sincedb key to the inode that the path is now pointing to
119
118
# and pickup the sincedb_value from before.
120
- msg = "add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file"
121
- logger . trace ( msg )
119
+ logger . debug ( "add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file" )
122
120
existing_watched_file = sincedb_value . watched_file
123
121
if existing_watched_file . nil?
124
122
sincedb_value . set_watched_file ( watched_file )
125
- logger . trace ( "add_or_update_sincedb_collection: switching as new file" )
123
+ logger . trace? && logger . trace ( "add_or_update_sincedb_collection: switching as new file" )
126
124
watched_file . rotate_as_file
127
125
watched_file . update_bytes_read ( sincedb_value . position )
128
126
else
129
127
sincedb_value . set_watched_file ( watched_file )
130
- logger . trace ( "add_or_update_sincedb_collection: switching from... " , " watched_file details" => watched_file . details )
128
+ logger . trace? && logger . trace ( "add_or_update_sincedb_collection: switching from: " , : watched_file => watched_file . details )
131
129
watched_file . rotate_from ( existing_watched_file )
132
130
end
133
131
end
134
132
sincedb_value
135
133
end
136
134
137
135
def update_existing_sincedb_collection_value ( watched_file , sincedb_value )
138
- logger . trace ( "update_existing_sincedb_collection_value: #{ watched_file . filename } , last value #{ sincedb_value . position } , cur size #{ watched_file . last_stat_size } " )
136
+ logger . trace? && logger . trace ( "update_existing_sincedb_collection_value" , :position => sincedb_value . position ,
137
+ :filename => watched_file . filename , :last_stat_size => watched_file . last_stat_size )
139
138
update_existing_specifically ( watched_file , sincedb_value )
140
139
end
141
140
142
141
def add_new_value_sincedb_collection ( watched_file )
143
142
sincedb_value = get_new_value_specifically ( watched_file )
144
- logger . trace ( "add_new_value_sincedb_collection" , "position" => sincedb_value . position , "watched_file details" => watched_file . details )
143
+ logger . trace? && logger . trace ( "add_new_value_sincedb_collection" , :position => sincedb_value . position ,
144
+ :watched_file => watched_file . details )
145
145
sincedb_collection . set ( watched_file . sincedb_key , sincedb_value )
146
146
sincedb_value
147
147
end
@@ -153,5 +153,14 @@ def get_new_value_specifically(watched_file)
153
153
watched_file . update_bytes_read ( position )
154
154
value
155
155
end
156
+
157
+ private
158
+
159
+ def exception_details ( path , e , trace = true )
160
+ details = { :path => path , :exception => e . class , :message => e . message }
161
+ details [ :backtrace ] = e . backtrace if trace && logger . debug?
162
+ details
163
+ end
164
+
156
165
end
157
166
end end end
0 commit comments