From bc4359a5546b20a74cd1d416b1f035e38e68ce1f Mon Sep 17 00:00:00 2001 From: Roman Mazur Date: Mon, 12 Jan 2015 13:35:24 +0200 Subject: [PATCH] Process logs in batches and expose source info Previous implementation was listing all the objects in the bucket. If you feed a considerably large bucket for the first time, this can take too much time postponing actual events import and also blow your machine memory. Now we list at most `@batch_size` objects and start their processing. Also this change exposes `s3_bucket` and `s3_key` that might be very useful. Like `path` exposed by the `file` plugin. --- lib/logstash/inputs/s3.rb | 62 ++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/lib/logstash/inputs/s3.rb b/lib/logstash/inputs/s3.rb index 3f4ee5d..c8a8d1f 100644 --- a/lib/logstash/inputs/s3.rb +++ b/lib/logstash/inputs/s3.rb @@ -60,6 +60,9 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base # Value is in seconds. config :interval, :validate => :number, :default => 60 + # How many bucket objects should be fetched to start processing. + config :batch_size, :validate => :number, :default => 20 + public def register require "digest/md5" @@ -67,7 +70,7 @@ def register @region_endpoint = @region if @region && !@region.empty? - @logger.info("Registering s3 input", :bucket => @bucket, :region_endpoint => @region_endpoint) + @logger.info("Registering s3 input", :bucket => @bucket, :region_endpoint => @region_endpoint, :batch_size => @batch_size) if @credentials.length == 0 @access_key_id = ENV['AWS_ACCESS_KEY_ID'] @@ -147,49 +150,60 @@ def process_new(queue, since=nil) since = sincedb_read() end - objects = list_new(since) - objects.each do |k| - @logger.debug("S3 input processing", :bucket => @bucket, :key => k) - lastmod = @s3bucket.objects[k].last_modified - process_log(queue, k) - sincedb_write(lastmod) - end + fetch_new_and_process(queue, since) end # def process_new private - def list_new(since=nil) + def fetch_new_and_process(queue, since=nil) if since.nil? since = Time.new(0) end - objects = {} + objects = [] @s3bucket.objects.with_prefix(@prefix).each do |log| - if log.last_modified > since - objects[log.key] = log.last_modified + # We skip already processed objects. + # Zero length objects have nothing to process. + # They can represent "directories". So we skip them as well. + if log.last_modified > since && log.content_length > 0 + objects << log + if objects.count >= @batch_size + process_s3_objects(queue, objects) + objects.clear + end end end + if not objects.empty? + process_s3_objects(queue, objects) + end - return sorted_objects = objects.keys.sort {|a,b| objects[a] <=> objects[b]} + end # def fetch_new_and_process - end # def list_new + private + def process_s3_objects(queue, objects) + objects.each do |log| + @logger.info("S3 input processing", :bucket => @bucket, :key => log.key) + lastmod = log.last_modified + process_log(queue, log) + sincedb_write(lastmod) + end + end # def process_s3_objects private - def process_log(queue, key) + def process_log(queue, object) - object = @s3bucket.objects[key] tmp = Dir.mktmpdir("logstash-") begin - filename = File.join(tmp, File.basename(key)) + filename = File.join(tmp, File.basename(object.key)) File.open(filename, 'wb') do |s3file| object.read do |chunk| s3file.write(chunk) end end - process_local_log(queue, filename) + process_local_log(queue, filename, object.key) unless @backup_to_bucket.nil? - backup_object = @backup_bucket.objects[key] + backup_object = @backup_bucket.objects[object.key] backup_object.write(Pathname.new(filename)) end unless @backup_to_dir.nil? @@ -204,7 +218,7 @@ def process_log(queue, key) end # def process_log private - def process_local_log(queue, filename) + def process_local_log(queue, filename, key) metadata = { :version => nil, @@ -214,11 +228,11 @@ def process_local_log(queue, filename) if filename.end_with?('.gz') gz = Zlib::GzipReader.new(file) gz.each_line do |line| - metadata = process_line(queue, metadata, line) + metadata = process_line(queue, metadata, line, key) end else file.each do |line| - metadata = process_line(queue, metadata, line) + metadata = process_line(queue, metadata, line, key) end end end @@ -226,7 +240,7 @@ def process_local_log(queue, filename) end # def process_local_log private - def process_line(queue, metadata, line) + def process_line(queue, metadata, line, key) if /#Version: .+/.match(line) junk, version = line.strip().split(/#Version: (.+)/) @@ -241,6 +255,8 @@ def process_line(queue, metadata, line) else @codec.decode(line) do |event| decorate(event) + event["s3_bucket"] = @bucket + event["s3_key"] = key unless metadata[:version].nil? event["cloudfront_version"] = metadata[:version] end