From bdae84e90441a7af187076c12a7752d4cac6b0b0 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Wed, 27 Jan 2021 12:45:55 +0100 Subject: [PATCH] Refactor: dry-out exception handling in output --- lib/logstash/outputs/kafka.rb | 49 +++++++++++++++++------------------ 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index b7f4c629..afc41c86 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -50,6 +50,9 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base java_import org.apache.kafka.clients.producer.ProducerRecord + java_import org.apache.kafka.common.KafkaException + java_import org.apache.kafka.common.errors.RetriableException + java_import org.apache.kafka.common.errors.InterruptException include LogStash::PluginMixins::KafkaSupport @@ -255,18 +258,9 @@ def retrying_send(batch) futures = batch.collect do |record| begin - # send() can throw an exception even before the future is created. @producer.send(record) - rescue org.apache.kafka.common.errors.InterruptException, - org.apache.kafka.common.errors.RetriableException => e - logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.message) - failures << record - nil - rescue org.apache.kafka.common.KafkaException => e - # This error is not retriable, drop event - # TODO: add DLQ support - logger.warn("producer send failed, dropping record",:exception => e.class, :message => e.message, - :record_value => record.value) + rescue => e # send() can throw an exception even before the future is created + failures << record if handle_kafka_error(e, record) == :retry nil end end @@ -277,18 +271,8 @@ def retrying_send(batch) begin future.get rescue java.util.concurrent.ExecutionException => e - # TODO(sissel): Add metric to count failures, possibly by exception type. - if e.get_cause.is_a? org.apache.kafka.common.errors.RetriableException or - e.get_cause.is_a? org.apache.kafka.common.errors.InterruptException - logger.info("producer send failed, will retry sending", :exception => e.cause.class, - :message => e.cause.message) - failures << batch[i] - elsif e.get_cause.is_a? org.apache.kafka.common.KafkaException - # This error is not retriable, drop event - # TODO: add DLQ support - logger.warn("producer send failed, dropping record", :exception => e.cause.class, - :message => e.cause.message, :record_value => batch[i].value) - end + record = batch[i] + failures << record if handle_kafka_error(e.cause, record) == :retry end end end @@ -314,6 +298,22 @@ def close private + def handle_kafka_error(e, record) + # TODO(sissel): Add metric to count failures, possibly by exception type. + case e + when RetriableException, InterruptException + logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.message) + return :retry + when KafkaException + # This error is not retriable, drop event + # TODO: add DLQ support + logger.warn("producer send failed, dropping record", :exception => e.class, :message => e.message, :record_value => record.value) + else + logger.error("producer send failed, unexpected exception", :exception => e.class, :message => e.message, :backtrace => e.backtrace) + raise e + end + end + def write_to_kafka(event, serialized_data) if @message_key.nil? record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data) @@ -369,8 +369,7 @@ def create_producer org.apache.kafka.clients.producer.KafkaProducer.new(props) rescue => e logger.error("Unable to create Kafka producer from given configuration", - :kafka_error_message => e, - :cause => e.respond_to?(:getCause) ? e.getCause() : nil) + :message => e.message, :exception => e.class, :cause => e.cause) raise e end end