Skip to content

Commit

Permalink
Refactor: dry-out exception handling in output
Browse files Browse the repository at this point in the history
  • Loading branch information
kares committed Apr 12, 2021
1 parent c22c599 commit bdae84e
Showing 1 changed file with 24 additions and 25 deletions.
49 changes: 24 additions & 25 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
class LogStash::Outputs::Kafka < LogStash::Outputs::Base

java_import org.apache.kafka.clients.producer.ProducerRecord
java_import org.apache.kafka.common.KafkaException
java_import org.apache.kafka.common.errors.RetriableException
java_import org.apache.kafka.common.errors.InterruptException

include LogStash::PluginMixins::KafkaSupport

Expand Down Expand Up @@ -255,18 +258,9 @@ def retrying_send(batch)

futures = batch.collect do |record|
begin
# send() can throw an exception even before the future is created.
@producer.send(record)
rescue org.apache.kafka.common.errors.InterruptException,
org.apache.kafka.common.errors.RetriableException => e
logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.message)
failures << record
nil
rescue org.apache.kafka.common.KafkaException => e
# This error is not retriable, drop event
# TODO: add DLQ support
logger.warn("producer send failed, dropping record",:exception => e.class, :message => e.message,
:record_value => record.value)
rescue => e # send() can throw an exception even before the future is created
failures << record if handle_kafka_error(e, record) == :retry
nil
end
end
Expand All @@ -277,18 +271,8 @@ def retrying_send(batch)
begin
future.get
rescue java.util.concurrent.ExecutionException => e
# TODO(sissel): Add metric to count failures, possibly by exception type.
if e.get_cause.is_a? org.apache.kafka.common.errors.RetriableException or
e.get_cause.is_a? org.apache.kafka.common.errors.InterruptException
logger.info("producer send failed, will retry sending", :exception => e.cause.class,
:message => e.cause.message)
failures << batch[i]
elsif e.get_cause.is_a? org.apache.kafka.common.KafkaException
# This error is not retriable, drop event
# TODO: add DLQ support
logger.warn("producer send failed, dropping record", :exception => e.cause.class,
:message => e.cause.message, :record_value => batch[i].value)
end
record = batch[i]
failures << record if handle_kafka_error(e.cause, record) == :retry
end
end
end
Expand All @@ -314,6 +298,22 @@ def close

private

def handle_kafka_error(e, record)
# TODO(sissel): Add metric to count failures, possibly by exception type.
case e
when RetriableException, InterruptException
logger.info("producer send failed, will retry sending", :exception => e.class, :message => e.message)
return :retry
when KafkaException
# This error is not retriable, drop event
# TODO: add DLQ support
logger.warn("producer send failed, dropping record", :exception => e.class, :message => e.message, :record_value => record.value)
else
logger.error("producer send failed, unexpected exception", :exception => e.class, :message => e.message, :backtrace => e.backtrace)
raise e
end
end

def write_to_kafka(event, serialized_data)
if @message_key.nil?
record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data)
Expand Down Expand Up @@ -369,8 +369,7 @@ def create_producer
org.apache.kafka.clients.producer.KafkaProducer.new(props)
rescue => e
logger.error("Unable to create Kafka producer from given configuration",
:kafka_error_message => e,
:cause => e.respond_to?(:getCause) ? e.getCause() : nil)
:message => e.message, :exception => e.class, :cause => e.cause)
raise e
end
end
Expand Down

0 comments on commit bdae84e

Please sign in to comment.