Skip to content

Commit 1cdfb49

Browse files
committed
fix broken gzip file produced sometimes
1 parent 0c34403 commit 1cdfb49

File tree

2 files changed

+83
-19
lines changed

2 files changed

+83
-19
lines changed

lib/logstash/outputs/file.rb

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -121,22 +121,49 @@ def multi_receive_encoded(events_and_encoded)
121121
# append to the file
122122
chunks.each {|chunk| fd.write(chunk) }
123123
end
124-
fd.flush unless @flusher && @flusher.alive?
124+
on_flush(fd, path) unless @flusher && @flusher.alive?
125125
end
126126

127127
close_stale_files
128128
end
129129
end
130130

131+
def on_flush(fd, path)
132+
fd.flush
133+
if @files[path][:isTempFile]
134+
copy_to_gzip(fd, path)
135+
end
136+
end
137+
138+
def copy_to_gzip(fd, path)
139+
zipfd = get_file(path)
140+
zipfd = Zlib::GzipWriter.new(zipfd)
141+
fd.seek(0, IO::SEEK_SET)
142+
data = fd.read
143+
fd.truncate(0)
144+
fd.seek(0, IO::SEEK_SET)
145+
if @write_behavior == "overwrite"
146+
zipfd.truncate(0)
147+
zipfd.seek(0, IO::SEEK_SET)
148+
end
149+
zipfd.write(data)
150+
zipfd.flush
151+
zipfd.to_io.flush
152+
zipfd.close
153+
end
154+
131155
def close
132156
@flusher.stop unless @flusher.nil?
133157
@io_mutex.synchronize do
134158
@logger.debug("Close: closing files")
135159

136-
@files.each do |path, fd|
160+
@files.each do |path, fileObj|
137161
begin
138-
fd.close
139-
@logger.debug("Closed file #{path}", :fd => fd)
162+
if fileObj[:isTempFile]
163+
copy_to_gzip(fileObj[:fd], path)
164+
end
165+
fileObj[:fd].close
166+
@logger.debug("Closed file #{fileObj[:fd]}", :fd => fileObj[:fd])
140167
rescue Exception => e
141168
@logger.error("Exception while flushing and closing files.", :exception => e)
142169
end
@@ -199,9 +226,12 @@ def flush_pending_files
199226
@io_mutex.synchronize do
200227
@logger.debug("Starting flush cycle")
201228

202-
@files.each do |path, fd|
203-
@logger.debug("Flushing file", :path => path, :fd => fd)
204-
fd.flush
229+
@files.each do |path, fileObj|
230+
@logger.debug("Flushing file", :path => path, :fd => fileObj[:fd])
231+
fileObj[:fd].flush
232+
if fileObj[:isTempFile]
233+
copy_to_gzip(fileObj[:fd], path)
234+
end
205235
end
206236
end
207237
rescue => e
@@ -215,15 +245,18 @@ def close_stale_files
215245
return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval
216246

217247
@logger.debug("Starting stale files cleanup cycle", :files => @files)
218-
inactive_files = @files.select { |path, fd| not fd.active }
248+
inactive_files = @files.select { |path, fileObj| not fileObj[:fd].active }
219249
@logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
220-
inactive_files.each do |path, fd|
250+
inactive_files.each do |path, fileObj|
221251
@logger.info("Closing file %s" % path)
222-
fd.close
252+
fileObj[:fd].close
253+
if fileObj[:isTempFile] && File.exist?(fileObj[:fd].path)
254+
File.delete(fileObj[:fd].path)
255+
end
223256
@files.delete(path)
224257
end
225258
# mark all files as inactive, a call to write will mark them as active again
226-
@files.each { |path, fd| fd.active = false }
259+
@files.each { |path, fileObj| fileObj[:fd].active = false }
227260
@last_stale_cleanup_cycle = now
228261
end
229262

@@ -236,21 +269,41 @@ def deleted?(path)
236269
end
237270

238271
def open(path)
272+
originalPath = path
239273
if !deleted?(path) && cached?(path)
240-
return @files[path]
274+
return @files[path][:fd]
241275
end
242276

243277
if deleted?(path)
244278
if @create_if_deleted
245279
@logger.debug("Required path was deleted, creating the file again", :path => path)
246280
@files.delete(path)
247281
else
248-
return @files[path] if cached?(path)
282+
return @files[path][:fd] if cached?(path)
283+
end
284+
end
285+
286+
#Fix for broken gzip issue.
287+
if gzip
288+
tmpfile = java.io.File.createTempFile("outfile-", "-temp");
289+
path = tmpfile.path
290+
#create file at original path also, so that temp file is not created again
291+
make_dir(originalPath)
292+
gzFile = get_file(originalPath)
293+
#if gzFile is fifo type, file writer object is returned that needs to closed.
294+
if gzFile.class == Java::JavaIo::FileWriter
295+
gzFile.close
249296
end
250297
end
251298

252299
@logger.info("Opening file", :path => path)
300+
make_dir(path)
301+
fd = get_file(path)
302+
@files[originalPath] = {:fd => IOWriter.new(fd), :isTempFile => gzip}
303+
return @files[originalPath][:fd]
304+
end
253305

306+
def make_dir(path)
254307
dir = File.dirname(path)
255308
if !Dir.exist?(dir)
256309
@logger.info("Creating directory", :directory => dir)
@@ -260,7 +313,9 @@ def open(path)
260313
FileUtils.mkdir_p(dir)
261314
end
262315
end
316+
end
263317

318+
def get_file(path)
264319
# work around a bug opening fifos (bug JRUBY-6280)
265320
stat = File.stat(path) rescue nil
266321
if stat && stat.ftype == "fifo"
@@ -272,10 +327,7 @@ def open(path)
272327
fd = File.new(path, "a+")
273328
end
274329
end
275-
if gzip
276-
fd = Zlib::GzipWriter.new(fd)
277-
end
278-
@files[path] = IOWriter.new(fd)
330+
return fd
279331
end
280332

281333
##

spec/outputs/file_spec.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,21 @@
6969

7070
agent do
7171
line_num = 0
72+
global_events = []
7273
# Now check all events for order and correctness.
73-
events = Zlib::GzipReader.open(tmp_file.path).map {|line| LogStash::Event.new(LogStash::Json.load(line)) }
74-
sorted = events.sort_by {|e| e.get("sequence")}
74+
File.open(tmp_file.path) do |file|
75+
zio = file
76+
loop do
77+
io = Zlib::GzipReader.new(zio)
78+
events = io.map {|line| LogStash::Event.new(LogStash::Json.load(line)) }
79+
global_events = global_events + events
80+
unused = io.unused
81+
io.finish
82+
break if unused.nil?
83+
zio.pos -= unused.length
84+
end
85+
end
86+
sorted = global_events.sort_by {|e| e.get("sequence")}
7587
sorted.each do |event|
7688
insist {event.get("message")} == "hello world"
7789
insist {event.get("sequence")} == line_num

0 commit comments

Comments
 (0)