Skip to content

Commit ab9a990

Browse files
committed
fix broken gzip file produced sometimes
1 parent 0c34403 commit ab9a990

File tree

2 files changed

+82
-23
lines changed

2 files changed

+82
-23
lines changed

lib/logstash/outputs/file.rb

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -121,22 +121,46 @@ def multi_receive_encoded(events_and_encoded)
121121
# append to the file
122122
chunks.each {|chunk| fd.write(chunk) }
123123
end
124-
fd.flush unless @flusher && @flusher.alive?
124+
on_flush(fd, path) unless @flusher && @flusher.alive?
125125
end
126126

127127
close_stale_files
128128
end
129129
end
130130

131+
def on_flush(fd, path)
132+
fd.flush
133+
if @files[path][:isTempFile]
134+
copy_to_gzip(fd, path)
135+
end
136+
end
137+
138+
def copy_to_gzip(fd, path)
139+
zipfd = get_file(path)
140+
zipfd = Zlib::GzipWriter.new(zipfd)
141+
fd.seek(0, IO::SEEK_SET)
142+
data = fd.read
143+
fd.truncate(0)
144+
fd.seek(0, IO::SEEK_SET)
145+
if @write_behavior == "overwrite"
146+
zipfd.truncate(0)
147+
zipfd.seek(0, IO::SEEK_SET)
148+
end
149+
zipfd.write(data)
150+
zipfd.flush
151+
zipfd.to_io.flush
152+
zipfd.close
153+
end
154+
131155
def close
132156
@flusher.stop unless @flusher.nil?
133157
@io_mutex.synchronize do
134158
@logger.debug("Close: closing files")
135159

136-
@files.each do |path, fd|
160+
@files.each do |path, fileObj|
137161
begin
138-
fd.close
139-
@logger.debug("Closed file #{path}", :fd => fd)
162+
fileObj[:fd].close
163+
@logger.debug("Closed file #{fileObj[:fd]}", :fd => fileObj[:fd])
140164
rescue Exception => e
141165
@logger.error("Exception while flushing and closing files.", :exception => e)
142166
end
@@ -199,9 +223,12 @@ def flush_pending_files
199223
@io_mutex.synchronize do
200224
@logger.debug("Starting flush cycle")
201225

202-
@files.each do |path, fd|
203-
@logger.debug("Flushing file", :path => path, :fd => fd)
204-
fd.flush
226+
@files.each do |path, fileObj|
227+
@logger.debug("Flushing file", :path => path, :fd => fileObj[:fd])
228+
fileObj[:fd].flush
229+
if fileObj[:isTempFile]
230+
copy_to_gzip(fileObj[:fd], path)
231+
end
205232
end
206233
end
207234
rescue => e
@@ -215,15 +242,18 @@ def close_stale_files
215242
return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval
216243

217244
@logger.debug("Starting stale files cleanup cycle", :files => @files)
218-
inactive_files = @files.select { |path, fd| not fd.active }
245+
inactive_files = @files.select { |path, fileObj| not fileObj[:fd].active }
219246
@logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
220-
inactive_files.each do |path, fd|
247+
inactive_files.each do |path, fileObj|
221248
@logger.info("Closing file %s" % path)
222-
fd.close
249+
fileObj[:fd].close
250+
if fileObj[:isTempFile] && File.exist?(fileObj[:fd].path)
251+
File.delete(fileObj[:fd].path)
252+
end
223253
@files.delete(path)
224254
end
225255
# mark all files as inactive, a call to write will mark them as active again
226-
@files.each { |path, fd| fd.active = false }
256+
@files.each { |path, fileObj| fileObj[:fd].active = false }
227257
@last_stale_cleanup_cycle = now
228258
end
229259

@@ -236,21 +266,41 @@ def deleted?(path)
236266
end
237267

238268
def open(path)
269+
originalPath = path
239270
if !deleted?(path) && cached?(path)
240-
return @files[path]
271+
return @files[path][:fd]
241272
end
242273

243274
if deleted?(path)
244275
if @create_if_deleted
245276
@logger.debug("Required path was deleted, creating the file again", :path => path)
246277
@files.delete(path)
247278
else
248-
return @files[path] if cached?(path)
279+
return @files[path][:fd] if cached?(path)
280+
end
281+
end
282+
283+
#Fix for broken gzip issue.
284+
if gzip
285+
tmpfile = java.io.File.createTempFile("outfile-", "-temp");
286+
path = tmpfile.path
287+
#create file at original path also, so that temp file is not created again
288+
make_dir(originalPath)
289+
gzFile = get_file(originalPath)
290+
#if gzFile is fifo type, file writer object is returned that needs to closed.
291+
if gzFile.class == Java::JavaIo::FileWriter
292+
gzFile.close
249293
end
250294
end
251295

252296
@logger.info("Opening file", :path => path)
297+
make_dir(path)
298+
fd = get_file(path)
299+
@files[originalPath] = {:fd => IOWriter.new(fd), :isTempFile => gzip}
300+
return @files[originalPath][:fd]
301+
end
253302

303+
def make_dir(path)
254304
dir = File.dirname(path)
255305
if !Dir.exist?(dir)
256306
@logger.info("Creating directory", :directory => dir)
@@ -260,7 +310,9 @@ def open(path)
260310
FileUtils.mkdir_p(dir)
261311
end
262312
end
313+
end
263314

315+
def get_file(path)
264316
# work around a bug opening fifos (bug JRUBY-6280)
265317
stat = File.stat(path) rescue nil
266318
if stat && stat.ftype == "fifo"
@@ -272,10 +324,7 @@ def open(path)
272324
fd = File.new(path, "a+")
273325
end
274326
end
275-
if gzip
276-
fd = Zlib::GzipWriter.new(fd)
277-
end
278-
@files[path] = IOWriter.new(fd)
327+
return fd
279328
end
280329

281330
##

spec/outputs/file_spec.rb

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,22 @@
7070
agent do
7171
line_num = 0
7272
# Now check all events for order and correctness.
73-
events = Zlib::GzipReader.open(tmp_file.path).map {|line| LogStash::Event.new(LogStash::Json.load(line)) }
74-
sorted = events.sort_by {|e| e.get("sequence")}
75-
sorted.each do |event|
76-
insist {event.get("message")} == "hello world"
77-
insist {event.get("sequence")} == line_num
78-
line_num += 1
73+
File.open(tmp_file.path) do |file|
74+
zio = file
75+
loop do
76+
io = Zlib::GzipReader.new(zio)
77+
events = io.map {|line| LogStash::Event.new(LogStash::Json.load(line)) }
78+
sorted = events.sort_by {|e| e.get("sequence")}
79+
sorted.each do |event|
80+
insist {event.get("message")} == "hello world"
81+
insist {event.get("sequence")} == line_num
82+
line_num += 1
83+
end
84+
unused = io.unused
85+
io.finish
86+
break if unused.nil?
87+
zio.pos -= unused.length
88+
end
7989
end
8090
insist {line_num} == event_count
8191
end # agent

0 commit comments

Comments
 (0)