diff --git a/lib/racecar/producer.rb b/lib/racecar/producer.rb index 3ac3f94..21e5db1 100644 --- a/lib/racecar/producer.rb +++ b/lib/racecar/producer.rb @@ -11,13 +11,26 @@ module Racecar class Producer @@mutex = Mutex.new + @@internal_producer = nil class << self + # Close the internal rdkafka producer. Subsequent attempts to + # produce messages after shutdown will result in errors. def shutdown! @@mutex.synchronize do - if !@internal_producer.nil? - @internal_producer.close - end + @@internal_producer&.close + end + end + + # Reset internal state. Subsequent attempts to produce messages will + # reinitialize internal resources. + # + # Before forking a process, it is recommended to call this method. See: + # https://github.com/karafka/rdkafka-ruby/blob/main/README.md#forking + def reset! + @@mutex.synchronize do + @@internal_producer&.close + @@internal_producer = nil end end end @@ -28,12 +41,12 @@ def initialize(config: nil, logger: nil, instrumenter: NullInstrumenter) @delivery_handles = [] @instrumenter = instrumenter @batching = false - @internal_producer = init_internal_producer(config) + init_internal_producer(config) end def init_internal_producer(config) @@mutex.synchronize do - @@init_internal_producer ||= begin + @@internal_producer ||= begin # https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md producer_config = { "bootstrap.servers" => config.brokers.join(","), @@ -109,7 +122,9 @@ def wait_for_delivery private - attr_reader :internal_producer + def internal_producer + @@internal_producer || init_internal_producer(@config) + end def deliver_with_error_handling(handle) handle.wait diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index 4765328..fcb3cb0 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -11,15 +11,63 @@ let(:value) { "test_message" } let(:delivery_handle) { instance_double("Rdkafka::Producer::DeliveryHandle") } - before do - allow(producer).to receive(:internal_producer).and_return(double("Rdkafka::Producer", :produce => delivery_handle)) + let(:rdkafka_config) { double("Rdkafka::Config", producer: rdkafka_producer) } + let(:rdkafka_producer) do + double( + "Rdkafka::Producer", + :produce => delivery_handle, + :delivery_callback= => nil, + :close => nil + ) end - after do - Racecar::Producer.shutdown! - Racecar::Producer.class_variable_set(:@@init_internal_producer, nil) + before { allow(Rdkafka::Config).to receive(:new).and_return(rdkafka_config) } + after { Racecar::Producer.reset! } + + describe "#initialize" do + it "creates one shared rdkafka producer" do + expect(Rdkafka::Config).to receive(:new).with( + hash_including( + "bootstrap.servers" => "localhost:9092", + "client.id" => "racecar", + "statistics.interval.ms" => 0, + "message.timeout.ms" => 300000.0, + "partitioner" => "consistent_random" + ) + ).and_return(rdkafka_config).once + expect(rdkafka_config).to receive(:producer).once + 3.times { Racecar::Producer.new(config: config) } + end end + describe ".shutdown!" do + it "closes the rdkafka producer if it has been initialized" do + producer + expect(rdkafka_producer).to receive(:close) + Racecar::Producer.shutdown! + end + + it "does not raise an error if no producer was initialized" do + expect(rdkafka_producer).to_not receive(:close) + expect { Racecar::Producer.shutdown! }.to_not raise_error + end + end + + describe ".reset!" do + it "closes the rdkafka producer" do + producer + expect(rdkafka_producer).to receive(:close) + Racecar::Producer.reset! + end + + it "recreates a new rdkafka producer after reset" do + Racecar::Producer.new(config: config) + expect(Rdkafka::Config).to receive(:new).and_return(rdkafka_config).once + expect(rdkafka_config).to receive(:producer).once + Racecar::Producer.reset! + Racecar::Producer.new(config: config) + end + end describe "#produce_async" do it "sends the message without waiting for feedback or guarantees" do @@ -60,9 +108,12 @@ test_config end - it "sets the partitioner corretly" do - internal_producer = producer.instance_variable_get("@internal_producer") - expect(internal_producer.instance_variable_get("@partitioner_name")).to eq("murmur2_random") + it "sets the partitioner correctly on the internal producer" do + expect(Rdkafka::Config).to receive(:new).with( + hash_including("partitioner" => "murmur2_random") + ).and_return(rdkafka_config).once + expect(rdkafka_config).to receive(:producer).and_return(rdkafka_producer).once + expect(producer.send(:internal_producer)).to eq(rdkafka_producer) end end end