Skip to content

Commit

Permalink
Refactor: reduce locking while appending events
Browse files Browse the repository at this point in the history
  • Loading branch information
kares committed Jan 27, 2021
1 parent 94fd9fd commit 6561681
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'logstash/namespace'
require 'logstash/outputs/base'
require 'java'
require 'concurrent/map'
require 'logstash-integration-kafka_jars.rb'
require 'logstash/plugin_mixins/kafka_support'

Expand Down Expand Up @@ -190,7 +191,7 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base

public
def register
@thread_batch_map = Concurrent::Hash.new
@thread_batch_map = Concurrent::Map.new

if !@retries.nil?
if @retries < 0
Expand All @@ -204,34 +205,33 @@ def register
@producer = create_producer
if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer'
@codec.on_event do |event, data|
write_to_kafka(event, data)
push_event_data(event, data)
end
elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
@codec.on_event do |event, data|
write_to_kafka(event, data.to_java_bytes)
push_event_data(event, data.to_java_bytes)
end
else
raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer"
end
end

def prepare(record)
def append_record(record)
# This output is threadsafe, so we need to keep a batch per thread.
@thread_batch_map[Thread.current].add(record)
@thread_batch_map[Thread.current] << record
end

def multi_receive(events)
t = Thread.current
if !@thread_batch_map.include?(t)
@thread_batch_map[t] = java.util.ArrayList.new(events.size)
end
key = Thread.current

batch = @thread_batch_map.get(key)
batch = @thread_batch_map.fetch_or_store(key, Array.new(events.size)) unless batch

events.each do |event|
break if event == LogStash::SHUTDOWN
@codec.encode(event)
end

batch = @thread_batch_map[t]
if batch.any?
retrying_send(batch)
batch.clear
Expand Down Expand Up @@ -315,13 +315,13 @@ def handle_kafka_error(e, record)
end
end

def write_to_kafka(event, serialized_data)
def push_event_data(event, serialized_data)
if @message_key.nil?
record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data)
else
record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data)
end
prepare(record)
append_record(record)
rescue LogStash::ShutdownSignal
logger.debug('producer received shutdown signal')
rescue => e
Expand Down

0 comments on commit 6561681

Please sign in to comment.