Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3 input plugin not handling shutdown correctly, leading to duplicates once started again #226

Open
PadaKwaak opened this issue Apr 14, 2021 · 2 comments

Comments

@PadaKwaak
Copy link

The s3 input plugin does not store the position of the file it was busy processing when it detected that it should stop.
From the log file, you can see that the following code was called:

@logger.warn("Logstash S3 input, stop reading in the middle of the file, we will read it again when logstash is started")

Because it simply stops processing and does not store the position that it already processed, when you start logstash again, it would parse the same lines again, which then leads to duplicates.

I would have expected the S3 input plugin to either
a) continue processing till the end of the file and then update the sincedb file, or to
b) stop processing immediately, but before it scans the next line, and then write the current position of the file to the sincedb file like the file input plugin does.

  • Version: tested with 3.4.1, but the latest version (3.6.0) has the same shutdown handling
  • My config:
input {
    s3 {
        aws_credentials_file => "/usr/share/logstash/.aws/credentials"
        sincedb_path => "/opt/logstash/data/plugins/inputs/s3/sincedb_file"

        region => "us-east-1"
        bucket => "mybucket"
        prefix => "myfolder/"

        interval => 60
        additional_settings => {
          force_path_style => true
          follow_redirects => false
        }

        codec => json
    }
}

output {
    stdout { codec => rubydebug }
}
  • Steps to Reproduce:
    Upload a very large file onto S3 and let the S3 plugin ingest the file.
    While it is busy ingesting the file, let logstash shutdown (eg. sending it SIGTERM).
    You should then notice the following log message:
Logstash S3 input, stop reading in the middle of the file, we will read it again when logstash is started

When you then start logstash again, it would process the same records again.

@PadaKwaak
Copy link
Author

In the meantime I have made a modification to the v3.4.1 on my own fork to not stop in the middle (ie. my option (b) that I suggested above), and confirmed that it is now working as expected when Logstash receives a shutdown signal while busy processing like a 6MB .gzip json encoded file:

[2021-04-16T09:50:52,717][WARN ][logstash.runner          ] SIGTERM received. Shutting down.
[2021-04-16T09:50:53,256][WARN ][org.logstash.input.DeadLetterQueueInputPlugin] closing dead letter queue input plugin
[2021-04-16T09:50:53,631][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"dlq-pipeline", :thread=>"#<Thread:0x3c9d0685@/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:51 run>"}
[2021-04-16T09:50:58,965][WARN ][org.logstash.execution.ShutdownWatcherExt] {"inflight_count"=>0, "stalling_threads_info"=>{["LogStash::Filters::Grok", {"patterns_dir"=>["/opt/logstash/patterns"], "match"=>{"[@metadata][s3][key]"=>"/(?<s3_file>[^/]+)\\.gz"}, "id"=>"dd3398e8c462a0384b18a7d950b46d28cb9f7fd8956073d9e58f5f18a9ea7402"}]=>[{"thread_id"=>37, "name"=>nil, "current_call"=>"[...]/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.4-java/lib/manticore/response.rb:50:in `call'"}, {"thread_id"=>38, "name"=>nil, "current_call"=>"[...]/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.4-java/lib/manticore/response.rb:50:in `call'"}]}}
[2021-04-16T09:50:58,976][ERROR][org.logstash.execution.ShutdownWatcherExt] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2021-04-16T09:51:04,004][WARN ][org.logstash.execution.ShutdownWatcherExt] {"inflight_count"=>0, "stalling_threads_info"=>{"other"=>[{"thread_id"=>39, "name"=>"[s3-pipeline]<s3", "current_call"=>"[...]/vendor/bundle/jruby/2.3.0/gems/logstash-codec-json-3.0.5/lib/logstash/codecs/json.rb:48:in `from_json'"}], ["LogStash::Filters::Grok", {"patterns_dir"=>["/opt/logstash/patterns"], "match"=>{"[@metadata][s3][key]"=>"/(?<s3_file>[^/]+)\\.gz"}, "id"=>"dd3398e8c462a0384b18a7d950b46d28cb9f7fd8956073d9e58f5f18a9ea7402"}]=>[{"thread_id"=>37, "name"=>nil, "current_call"=>"[...]/vendor/bundle/jruby/2.3.0/gems/concurrent-ruby-1.0.5-java/lib/concurrent/map.rb:93:in `[]'"}, {"thread_id"=>38, "name"=>nil, "current_call"=>"[...]/vendor/bundle/jruby/2.3.0/gems/logstash-filter-grok-4.0.4/lib/logstash/filters/grok.rb:339:in `each'"}]}}
[2021-04-16T09:51:06,799][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"s3-pipeline", :thread=>"#<Thread:0x748701b3 run>"}

I made the modification on top of 3.4.1, since that was the version I was using earlier: v3.4.1...PadaKwaak:v342

Once the issue with 3.6.0 has been fixed that I've raised with #225, I'll see if I can create a pull request where this above modification is enabled/disabled with a setting so that people can choose to use it or not.

@kaisecheng
Copy link
Contributor

The duplication here is caused by an interruption before a file can update the checkpoint/ timestamp in sincedb. The sincedb is updated per file, so when the pipeline restarts it processes the same file again. As it does not cause a missing event, I do not classify this as a bug. A better way to handle it is enhancing sincedb to have a checkpoint per file and record the latest event id. The whole plugin is getting old and needs refactoring.

It is crucial to keep the restart process fast from the system administration point of view. I would avoid stalling the shutdown.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants