Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process logs in batches and expose source info #7

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 39 additions & 23 deletions lib/logstash/inputs/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base
# Value is in seconds.
config :interval, :validate => :number, :default => 60

# How many bucket objects should be fetched to start processing.
config :batch_size, :validate => :number, :default => 20

public
def register
require "digest/md5"
require "aws-sdk"

@region_endpoint = @region if @region && [email protected]?

@logger.info("Registering s3 input", :bucket => @bucket, :region_endpoint => @region_endpoint)
@logger.info("Registering s3 input", :bucket => @bucket, :region_endpoint => @region_endpoint, :batch_size => @batch_size)

if @credentials.length == 0
@access_key_id = ENV['AWS_ACCESS_KEY_ID']
Expand Down Expand Up @@ -147,49 +150,60 @@ def process_new(queue, since=nil)
since = sincedb_read()
end

objects = list_new(since)
objects.each do |k|
@logger.debug("S3 input processing", :bucket => @bucket, :key => k)
lastmod = @s3bucket.objects[k].last_modified
process_log(queue, k)
sincedb_write(lastmod)
end
fetch_new_and_process(queue, since)

end # def process_new

private
def list_new(since=nil)
def fetch_new_and_process(queue, since=nil)

if since.nil?
since = Time.new(0)
end

objects = {}
objects = []
@s3bucket.objects.with_prefix(@prefix).each do |log|
if log.last_modified > since
objects[log.key] = log.last_modified
# We skip already processed objects.
# Zero length objects have nothing to process.
# They can represent "directories". So we skip them as well.
if log.last_modified > since && log.content_length > 0
objects << log
if objects.count >= @batch_size
process_s3_objects(queue, objects)
objects.clear
end
end
end
if not objects.empty?
process_s3_objects(queue, objects)
end

return sorted_objects = objects.keys.sort {|a,b| objects[a] <=> objects[b]}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you've eliminated the sort step. In cases where alphabetical order matches chronological order by last_modified time, this will work (since the S3 API always returns results in alphabetical order) but if not this creates a problem because the sincedb assumes that objects are always handled in the same order.

end # def fetch_new_and_process

end # def list_new
private
def process_s3_objects(queue, objects)
objects.each do |log|
@logger.info("S3 input processing", :bucket => @bucket, :key => log.key)
lastmod = log.last_modified
process_log(queue, log)
sincedb_write(lastmod)
end
end # def process_s3_objects

private
def process_log(queue, key)
def process_log(queue, object)

object = @s3bucket.objects[key]
tmp = Dir.mktmpdir("logstash-")
begin
filename = File.join(tmp, File.basename(key))
filename = File.join(tmp, File.basename(object.key))
File.open(filename, 'wb') do |s3file|
object.read do |chunk|
s3file.write(chunk)
end
end
process_local_log(queue, filename)
process_local_log(queue, filename, object.key)
unless @backup_to_bucket.nil?
backup_object = @backup_bucket.objects[key]
backup_object = @backup_bucket.objects[object.key]
backup_object.write(Pathname.new(filename))
end
unless @backup_to_dir.nil?
Expand All @@ -204,7 +218,7 @@ def process_log(queue, key)
end # def process_log

private
def process_local_log(queue, filename)
def process_local_log(queue, filename, key)

metadata = {
:version => nil,
Expand All @@ -214,19 +228,19 @@ def process_local_log(queue, filename)
if filename.end_with?('.gz')
gz = Zlib::GzipReader.new(file)
gz.each_line do |line|
metadata = process_line(queue, metadata, line)
metadata = process_line(queue, metadata, line, key)
end
else
file.each do |line|
metadata = process_line(queue, metadata, line)
metadata = process_line(queue, metadata, line, key)
end
end
end

end # def process_local_log

private
def process_line(queue, metadata, line)
def process_line(queue, metadata, line, key)

if /#Version: .+/.match(line)
junk, version = line.strip().split(/#Version: (.+)/)
Expand All @@ -241,6 +255,8 @@ def process_line(queue, metadata, line)
else
@codec.decode(line) do |event|
decorate(event)
event["s3_bucket"] = @bucket
event["s3_key"] = key
unless metadata[:version].nil?
event["cloudfront_version"] = metadata[:version]
end
Expand Down