From 6561681fb3ef8f2051a370232ca98ed280a17c8a Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Wed, 27 Jan 2021 13:20:16 +0100 Subject: [PATCH] Refactor: reduce locking while appending events --- lib/logstash/outputs/kafka.rb | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 053e109b..71c6cafb 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -1,6 +1,7 @@ require 'logstash/namespace' require 'logstash/outputs/base' require 'java' +require 'concurrent/map' require 'logstash-integration-kafka_jars.rb' require 'logstash/plugin_mixins/kafka_support' @@ -190,7 +191,7 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base public def register - @thread_batch_map = Concurrent::Hash.new + @thread_batch_map = Concurrent::Map.new if !@retries.nil? if @retries < 0 @@ -204,34 +205,33 @@ def register @producer = create_producer if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer' @codec.on_event do |event, data| - write_to_kafka(event, data) + push_event_data(event, data) end elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer' @codec.on_event do |event, data| - write_to_kafka(event, data.to_java_bytes) + push_event_data(event, data.to_java_bytes) end else raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer" end end - def prepare(record) + def append_record(record) # This output is threadsafe, so we need to keep a batch per thread. - @thread_batch_map[Thread.current].add(record) + @thread_batch_map[Thread.current] << record end def multi_receive(events) - t = Thread.current - if !@thread_batch_map.include?(t) - @thread_batch_map[t] = java.util.ArrayList.new(events.size) - end + key = Thread.current + + batch = @thread_batch_map.get(key) + batch = @thread_batch_map.fetch_or_store(key, Array.new(events.size)) unless batch events.each do |event| break if event == LogStash::SHUTDOWN @codec.encode(event) end - batch = @thread_batch_map[t] if batch.any? retrying_send(batch) batch.clear @@ -315,13 +315,13 @@ def handle_kafka_error(e, record) end end - def write_to_kafka(event, serialized_data) + def push_event_data(event, serialized_data) if @message_key.nil? record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data) else record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data) end - prepare(record) + append_record(record) rescue LogStash::ShutdownSignal logger.debug('producer received shutdown signal') rescue => e