From c4d56a1c18e184b3e84d145e7ad60cd4a4a56594 Mon Sep 17 00:00:00 2001 From: Stephen Best Date: Thu, 22 Jun 2023 11:16:58 +0200 Subject: [PATCH] Fix: Runner adds exceptions to instrumentation payload --- lib/racecar/runner.rb | 2 ++ spec/runner_spec.rb | 27 ++++++++++++++++++++++++--- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/lib/racecar/runner.rb b/lib/racecar/runner.rb index 0536a5f7..b6acb34e 100644 --- a/lib/racecar/runner.rb +++ b/lib/racecar/runner.rb @@ -201,6 +201,7 @@ def process(message) consumer.store_offset(message) end rescue => e + instrumentation_payload[:exception] = e instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e) instrumentation_payload[:retries_count] = pause.pauses_count config.error_handler.call(e, instrumentation_payload) @@ -232,6 +233,7 @@ def process_batch(messages) processor.deliver! consumer.store_offset(messages.last) rescue => e + instrumentation_payload[:exception] = e instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e) instrumentation_payload[:retries_count] = pause.pauses_count config.error_handler.call(e, instrumentation_payload) diff --git a/spec/runner_spec.rb b/spec/runner_spec.rb index 1c32db3c..ce0c9638 100644 --- a/spec/runner_spec.rb +++ b/spec/runner_spec.rb @@ -246,8 +246,14 @@ def consumer_rebalance_listener=(_listenr) end class FakeInstrumenter < Racecar::Instrumenter - def initialize(*) + def initialize(received = []) super(backend: Racecar::NullInstrumenter) + @received = received + end + + def instrument(event_name, payload = {}, &block) + @received << [event_name, payload] + super end end @@ -375,7 +381,8 @@ def initialize(*) let(:config) { Racecar::Config.new } let(:logger) { Logger.new(StringIO.new) } let(:kafka) { FakeRdkafka.new(runner: runner) } - let(:instrumenter) { FakeInstrumenter.new } + let(:instrumenter) { FakeInstrumenter.new(instrumented_events) } + let(:instrumented_events) { [] } let(:runner) do Racecar::Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter) @@ -419,8 +426,9 @@ def initialize(*) create_time: nil, key: nil, value: error, + exception: error, headers: nil, - retries_count: anything, + retries_count: instance_of(Integer), unrecoverable_delivery_error: false, } @@ -431,6 +439,18 @@ def initialize(*) runner.run end + it "adds the exception to the instrumentation payload" do + error = StandardError.new("surprise") + + kafka.deliver_message(error, topic: "greetings") + + runner.run + + expect(instrumented_events).to include( + ["process_message", hash_including(exception: error) ] + ) + end + it "keeps track of the number of retries when a message causes an exception" do error = StandardError.new("surprise") @@ -555,6 +575,7 @@ def initialize(*) message_count: 1, retries_count: anything, unrecoverable_delivery_error: false, + exception: error, } expect(config.error_handler).to receive(:call).at_least(:once).with(error, info)