Skip to content

Force all files under rotation to start at 0 or at the sincedb record. #217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 29, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 4.1.7
- Fixed problem in rotation handling where the target file being rotated was
subjected to the start_position setting when it must always start from the beginning.
[Issue #214](https://github.com/logstash-plugins/logstash-input-file/issues/214)

## 4.1.6
- Fixed Errno::ENOENT exception in Discoverer. [Issue #204](https://github.com/logstash-plugins/logstash-input-file/issues/204)

Expand All @@ -15,7 +20,7 @@
was possible to read into memory allocated but not filled with data resulting
in ASCII NUL (0) bytes in the message field. Now, files are read up to the
size as given by the remote filesystem client. Applies to tail and read modes.

## 4.1.3
- Fixed `read` mode of regular files sincedb write is requested in each read loop
iteration rather than waiting for the end-of-file to be reached. Note: for gz files,
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ jar.finalizedBy(copyGemjar)
// See http://www.gradle.org/docs/current/userguide/gradle_wrapper.html
task wrapper(type: Wrapper) {
description = 'Install Gradle wrapper'
gradleVersion = '4.5.1'
gradleVersion = '4.9'
}
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.5.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip
2 changes: 2 additions & 0 deletions lib/filewatch/read_mode/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def process_watched(watched_files)
end
end

## TODO add process_rotation_in_progress

def process_active(watched_files)
logger.trace("Active processing")
# Handles watched_files in the active state.
Expand Down
10 changes: 6 additions & 4 deletions lib/filewatch/tail_mode/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,16 @@ def process_rotation_in_progress(watched_files)
potential_sdb_value = @sincedb_collection.get(potential_key)
logger.trace(">>> Rotation In Progress", "watched_file" => watched_file.details, "found_sdb_value" => sdb_value, "potential_key" => potential_key, "potential_sdb_value" => potential_sdb_value)
if potential_sdb_value.nil?
logger.trace("---------- >>>> Rotation In Progress: rotating as existing file")
watched_file.rotate_as_file
trace_message = "---------- >>>> Rotation In Progress: no potential sincedb value "
if sdb_value.nil?
logger.trace("---------- >>> Rotation In Progress: rotating as initial file, no potential sincedb value AND no found sincedb value")
watched_file.rotate_as_initial_file
trace_message.concat("AND no found sincedb value")
else
logger.trace("---------- >>>> Rotation In Progress: rotating as existing file, no potential sincedb value BUT found sincedb value")
watched_file.rotate_as_file
trace_message.concat("BUT found sincedb value")
sdb_value.clear_watched_file
end
logger.trace(trace_message)
new_sdb_value = SincedbValue.new(0)
new_sdb_value.set_watched_file(watched_file)
@sincedb_collection.set(potential_key, new_sdb_value)
Expand Down
6 changes: 0 additions & 6 deletions lib/filewatch/watched_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ def set_stat(stat)
@sdb_key_v1 = @stat.inode_struct
end

def rotate_as_initial_file
# rotation, when no sincedb record exists for new inode - we have never seen this inode before.
rotate_as_file
@initial = true
end

def rotate_as_file(bytes_read = 0)
# rotation, when a sincedb record exists for new inode, but no watched file to rotate from
# probably caused by a deletion detected in the middle of the rename cascade
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-file.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-file'
s.version = '4.1.6'
s.version = '4.1.7'
s.licenses = ['Apache-2.0']
s.summary = "Streams events from files"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
46 changes: 45 additions & 1 deletion spec/filewatch/rotate_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module FileWatch
let(:max) { 4095 }
let(:stat_interval) { 0.01 }
let(:discover_interval) { 15 }
let(:start_new_files_at) { :beginning }
let(:start_new_files_at) { :end }
let(:sincedb_path) { directory.join("tailing.sdb") }
let(:opts) do
{
Expand Down Expand Up @@ -447,5 +447,49 @@ module FileWatch
expect(listener3.lines.size).to eq(0)
end
end

context "? rotation: when an active file is renamed inside the glob - issue 214" do
let(:watch_dir) { directory.join("*L.log") }
let(:file_path) { directory.join("1L.log") }
let(:second_file) { directory.join("2L.log") }
subject { described_class.new(conf) }
let(:listener1) { observer.listener_for(file1_path) }
let(:listener2) { observer.listener_for(second_file.to_path) }
let(:stat_interval) { 0.25 }
let(:discover_interval) { 1 }
let(:line4) { "Line 4 - Some other non lorem ipsum content" }
let(:actions) do
RSpec::Sequencing
.run_after(0.75, "create file") do
file_path.open("wb") { |file| file.puts(line1); file.puts(line2) }
end
.then_after(0.5, "rename") do
file_path.rename(second_file)
file_path.open("wb") { |file| file.puts("#{line3}") }
end
.then("wait for expectations to be met") do
wait(2.0).for{listener1.lines.size + listener2.lines.size}.to eq(3)
end
.then_after(0.5, "rename again") do
file_path.rename(second_file)
file_path.open("wb") { |file| file.puts("#{line4}") }
end
.then("wait for expectations to be met") do
wait(2.0).for{listener1.lines.size + listener2.lines.size}.to eq(4)
end
.then("quit") do
tailing.quit
end
end

it "content is read correctly, the renamed file is not reread from scratch" do
actions.activate_quietly
tailing.watch_this(watch_dir.to_path)
tailing.subscribe(observer)
actions.assert_no_errors
expect(listener1.lines).to eq([line1, line2, line3, line4])
expect(listener2.lines).to eq([])
end
end
end
end