Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix object timestamp logic #243

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions lib/logstash/inputs/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -376,18 +376,22 @@ def ignore_filename?(filename)
def process_log(queue, log)
@logger.debug("Processing", :bucket => @bucket, :key => log.key)
object = @s3bucket.object(log.key)
# Eager-loads the object data so the last_modified field is populated right before the download
object.load

filename = File.join(temporary_directory, File.basename(log.key))
if download_remote_file(object, filename)
if process_local_log(queue, filename, object)
if object.last_modified == log.last_modified
refreshed_object = @s3bucket.object(log.key)
# If the object was modified during download and processing, do not backup/delete it and process it again during the next iteration
if object.last_modified == refreshed_object.last_modified
backup_to_bucket(object)
backup_to_dir(filename)
delete_file_from_bucket(object)
FileUtils.remove_entry_secure(filename, true)
sincedb.write(log.last_modified)
sincedb.write(object.last_modified)
else
@logger.info("#{log.key} is updated at #{object.last_modified} and will process in the next cycle")
@logger.info("#{log.key} was updated at #{refreshed_object.last_modified} and will process in the next cycle")
end
end
else
Expand Down
5 changes: 4 additions & 1 deletion spec/inputs/s3_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
FileUtils.mkdir_p(sincedb_path)
Aws.config[:stub_responses] = true
Thread.abort_on_exception = true
allow_any_instance_of(Aws::S3::Object).to receive(:load)
end

context "when interrupting the plugin" do
Expand Down Expand Up @@ -583,7 +584,7 @@
end
end

context 's3 object updated after getting summary' do
context 's3 object updated during processing' do
it 'should not update sincedb' do
s3_summary = [
double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'),
Expand All @@ -592,6 +593,8 @@

s3_objects = [
double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'),
double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'),
double(:key => 'TODAY', :last_modified => Time.now.round - (cutoff * 10), :content_length => 5, :storage_class => 'STANDARD'),
double(:key => 'TODAY_UPDATED', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD')
]

Expand Down