Skip to content

Commit ef9b8d5

Browse files
andseljsvd
andauthored
Fix ReadFile handler to consider the value stored in sincedb on plugin restart (#307)
Fixes read mode to restart the read from reference stored in sincedb in case the file wasn't completely consumed Update the file pointer of a read mode file to the max between the read bytes or the sincedb reference for the same file. This solves a problem, that when a pipeline is restarted, it's able to recover from the last known reference, without restarting from the beginning, and reprocessing already processed lines. Co-authored-by: João Duarte <[email protected]>
1 parent e3924d6 commit ef9b8d5

File tree

7 files changed

+65
-5
lines changed

7 files changed

+65
-5
lines changed

Diff for: CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.4.3
2+
- Fixes read mode to restart the read from reference stored in sincedb in case the file wasn't completely consumed. [#307](https://github.com/logstash-plugins/logstash-input-file/pull/307)
3+
14
## 4.4.2
25
- Doc: Fix attribute by removing extra character [#310](https://github.com/logstash-plugins/logstash-input-file/pull/310)
36

Diff for: lib/filewatch/read_mode/handlers/read_file.rb

+10
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,19 @@
22

33
module FileWatch module ReadMode module Handlers
44
class ReadFile < Base
5+
6+
# seek file to which ever is furthest: either current bytes read or sincedb position
7+
private
8+
def seek_to_furthest_position(watched_file)
9+
previous_pos = sincedb_collection.find(watched_file).position
10+
watched_file.file_seek([watched_file.bytes_read, previous_pos].max)
11+
end
12+
13+
public
514
def handle_specifically(watched_file)
615
if open_file(watched_file)
716
add_or_update_sincedb_collection(watched_file) unless sincedb_collection.member?(watched_file.sincedb_key)
17+
seek_to_furthest_position(watched_file)
818
loop do
919
break if quit?
1020
loop_control = watched_file.loop_control_adjusted_for_stat_size

Diff for: logstash-input-file.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-file'
4-
s.version = '4.4.2'
4+
s.version = '4.4.3'
55
s.licenses = ['Apache-2.0']
66
s.summary = "Streams events from files"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

Diff for: spec/filewatch/read_mode_handlers_read_file_spec.rb

+40
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,45 @@ module FileWatch
3636
processor.read_file(watched_file)
3737
end
3838
end
39+
40+
context "when restart from existing sincedb" do
41+
let(:settings) do
42+
Settings.from_options(
43+
:sincedb_write_interval => 0,
44+
:sincedb_path => File::NULL,
45+
:file_chunk_size => 10
46+
)
47+
end
48+
49+
let(:processor) { double("fake processor") }
50+
let(:observer) { TestObserver.new }
51+
let(:watch) { double("watch") }
52+
53+
before(:each) {
54+
allow(watch).to receive(:quit?).and_return(false)#.and_return(false).and_return(true)
55+
allow(processor).to receive(:watch).and_return(watch)
56+
}
57+
58+
it "read from where it left" do
59+
listener = observer.listener_for(Pathname.new(pathname).to_path)
60+
sut = ReadMode::Handlers::ReadFile.new(processor, sdb_collection, observer, settings)
61+
62+
# simulate a previous partial read of the file
63+
sincedb_value = SincedbValue.new(0)
64+
sincedb_value.set_watched_file(watched_file)
65+
sdb_collection.set(watched_file.sincedb_key, sincedb_value)
66+
67+
68+
# simulate a consumption of first line, (size + newline) bytes
69+
sdb_collection.increment(watched_file.sincedb_key, File.readlines(pathname)[0].size + 2)
70+
71+
# exercise
72+
sut.handle(watched_file)
73+
74+
# verify
75+
expect(listener.lines.size).to eq(1)
76+
expect(listener.lines[0]).to start_with("2010-03-12 23:51:21 SEA4 192.0.2.222 play 3914 OK")
77+
end
78+
end
3979
end
4080
end

Diff for: spec/filewatch/spec_helper.rb

+2
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ def sysread(amount)
8080
multiplier = amount / string.length
8181
string * multiplier
8282
end
83+
def sysseek(offset, whence)
84+
end
8385
end
8486

8587
FIXTURE_DIR = File.join('spec', 'fixtures')

Diff for: spec/helpers/spec_helper.rb

+7-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,13 @@ def initialize
3131

3232
def trace_for(symbol)
3333
params = @tracer.map {|k,v| k == symbol ? v : nil}.compact
34-
params.empty? ? false : params
34+
if params.empty?
35+
false
36+
else
37+
# merge all params with same key
38+
# there could be multiple instances of same call, e.g. [[:accept, true], [:auto_flush, true], [:close, true], [:auto_flush, true]]
39+
params.reduce {|b1, b2| b1 and b2}
40+
end
3541
end
3642

3743
def clear

Diff for: spec/inputs/file_tail_spec.rb

+2-3
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@
332332
.then("wait accept") do
333333
wait(0.75).for {
334334
subject.codec.identity_map[tmpfile_path].codec.trace_for(:accept)
335-
}.to eq([true]), "accept didn't"
335+
}.to eq(true), "accept didn't"
336336
end
337337
.then("request a stop") do
338338
# without this the subject.run doesn't invokes the #exit_flush which is the only @codec.flush_mapped invocation
@@ -341,12 +341,11 @@
341341
.then("wait for auto_flush") do
342342
wait(2).for {
343343
subject.codec.identity_map[tmpfile_path].codec.trace_for(:auto_flush)
344-
.reduce {|b1, b2| b1 and b2} # there could be multiple instances of same call, e.g. [[:accept, true], [:auto_flush, true], [:close, true], [:auto_flush, true]]
345344
}.to eq(true), "autoflush didn't"
346345
end
347346
subject.run(events)
348347
actions.assert_no_errors
349-
expect(subject.codec.identity_map[tmpfile_path].codec.trace_for(:accept)).to eq([true])
348+
expect(subject.codec.identity_map[tmpfile_path].codec.trace_for(:accept)).to eq(true)
350349
end
351350
end
352351

0 commit comments

Comments
 (0)