Skip to content

Fix ReadFile handler to consider the value stored in sincedb on plugin restart #307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

andsel
Copy link
Contributor

@andsel andsel commented Apr 1, 2022

Release notes

Fixes read mode when sincedb already stores a reference for the file not completely consumed.

What does this PR do?

Update the file pointer of a read mode file to the max between the read bytes or the sincedb reference for the same file.
This solves a problem, that when a pipeline is restarted, it's able to recover from the last known reference, without restarting from the beginning, and reprocessing already processed lines.

Why is it important/What is the impact to the user?

When a pipeline with file input in read mode is restarted, this let the plugin to recover from where it left if that information is present in the sincedb store.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

Pipeline definition:

- pipeline.id: SDH_650
  pipeline.workers: 1
  pipeline.batch.size: 5
  config.string: |
    input {
        file {
            path => "/home/andrea/workspace/logstash_configs/file_input_sdh650/sample_fixture.csv"
            sincedb_path => "/home/andrea/workspace/logstash_configs/file_input_sdh650/sincedb"
            mode  => "read"
            start_position => "beginning"
        }
    }

    filter {
        csv {
            separator => ","
            columns => ["id", "host", "fqdn", "IP", "mac", "role", "type", "make", "model", "oid", "fid", "time"]
            remove_field => ["path", "host", "message", "@version" ]   
        }
        sleep {
            time => 1
            every => 10
        }
    }

    output {
        elasticsearch { 
            index => "650" 
            hosts => "http://localhost:9200"
            user => "elastic"
            password => "changeme"
        }
        stdout { codec => dots }
    }

Some curls to configure the ES output index and an aggregation query to verify:

PUT /650
{
  "mappings": {
    "properties": {
      "id":    { "type": "keyword" },  
      "host":  { "type": "text"  }, 
      "fqdn":   { "type": "text"  },
      "IP":   { "type": "text"  },
      "mac":   { "type": "text"  },
      "role":   { "type": "keyword"  },
      "type":   { "type": "keyword"  },
      "make":   { "type": "text"  },
      "model":   { "type": "text"  },
      "oid":   { "type": "text"  },
      "fid":   { "type": "text"  },
      "time":   { "type": "text"  }
    }
  }
}
DELETE 650

GET 650/_search
{
  "aggs": {
    "types": {
      "terms": { "field": "type" }
    }
  }
}

The expectation is to have 2 buckets, equally sized. Without the fix a bucket contains more documents, which means some rows was reprocessed on a pipeline reload.

How to test this PR locally

Follow step steps in #290

Related issues

Use cases

Screenshots

Logs

@andsel andsel added the bug label Apr 1, 2022
@andsel andsel changed the title Added test to verify that ReadFile handler doesn't consider the value stored in sincedb Fix ReadFile handler to consider the value stored in sincedb on plugin restart Apr 7, 2022
@kaisecheng kaisecheng self-requested a review April 11, 2022 11:40
Comment on lines 6 to 18
if open_file(watched_file)
add_or_update_sincedb_collection(watched_file) unless sincedb_collection.member?(watched_file.sincedb_key)
if sincedb_collection.member?(watched_file.sincedb_key)
previous_pos = sincedb_collection.find(watched_file).position
watched_file.file_seek([watched_file.bytes_read, previous_pos].max)
end
loop do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since add_or_update_sincedb_collection ensures the file is in the sincedb and sets the correct position from the sincedb to the watched_file, can't we simplify this a lot to just be:

Suggested change
if open_file(watched_file)
add_or_update_sincedb_collection(watched_file) unless sincedb_collection.member?(watched_file.sincedb_key)
if sincedb_collection.member?(watched_file.sincedb_key)
previous_pos = sincedb_collection.find(watched_file).position
watched_file.file_seek([watched_file.bytes_read, previous_pos].max)
end
loop do
if open_file(watched_file)
add_or_update_sincedb_collection(watched_file)
watched_file.file_seek(watched_file.bytes_read)
loop do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd good point! the semantic remains the same and the flow is simplified

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It fails the test https://app.travis-ci.com/github/logstash-plugins/logstash-input-file/jobs/566809162#L974 because without the max(watched_file.bytes_read, sincedb previous_pos) it seems that bytes_read hasn't the same value of position. Need to investigate why.

Copy link
Contributor Author

@andsel andsel Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the guard

add_or_update_sincedb_collection(watched_file) unless sincedb_collection.member?

it happens that watched_file.bytes_read is always updated to the incedb_collection.find(watched_file).position.
Now this position comes from the sinceDB and it's the last pin point.

What happens in this test? The objective of the test is to verify stripped-reads from a couple of files:
file1

string1\nstring2

file2

stringA\nstringB

Striped reads means that it reads one line for each one, so the position point \n character.
The test fails because has a file_chunk_size is 10 bytes, so the first run for the first file it reads 10 bytes and put in a buffer, which is:

string1\nst

if seek set filepointer to the filepoistion (8th bytes) then the next chuck grabbed from the file is

string2

which goes in the buffer which contained st creating the string ststring2.
This is the reason to have the max(bytes_read, last_pos).

Copy link

@kaisecheng kaisecheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested it locally. Both solutions work well to fix repeat read in restart. CI has been breaking for some time before this change.

@andsel andsel requested a review from jsvd April 12, 2022 15:10
@andsel
Copy link
Contributor Author

andsel commented Apr 27, 2022

Hold on this PR till the CI is back to green on main, then rebase and ask for review again.

@jsvd jsvd removed their request for review April 28, 2022 10:05
@andsel andsel force-pushed the fix/read_mode_honor_sincedb_reference_after_a_restart branch from 8d32d95 to e4ce1b9 Compare May 2, 2022 07:20
@andsel andsel requested a review from jsvd May 6, 2022 09:42
@andsel andsel force-pushed the fix/read_mode_honor_sincedb_reference_after_a_restart branch from 7669002 to 85cf51d Compare June 6, 2022 08:42
@andsel andsel requested a review from jsvd June 6, 2022 08:55
Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@andsel andsel merged commit ef9b8d5 into logstash-plugins:main Jun 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

unable to read the whole file when pipeline get reload
4 participants