diff --git a/lib/racecar/config.rb b/lib/racecar/config.rb index bac42dd5..17039f09 100644 --- a/lib/racecar/config.rb +++ b/lib/racecar/config.rb @@ -185,6 +185,9 @@ class Config < KingKonf::Config desc "Used only by the liveness probe: Max time (in seconds) between liveness events before the process is considered not healthy" integer :liveness_probe_max_interval, default: 5 + desc "Strategy for switching topics when there are multiple subscriptions. `exhaust-topic` will only switch when the consumer poll returns no messages. `round-robin` will switch after each poll regardless.\nWarning: `round-robin` will be the default in Racecar 3.x" + string :multi_subscription_strategy, allowed_values: %w(round-robin exhaust-topic), default: "exhaust-topic" + # The error handler must be set directly on the object. attr_reader :error_handler diff --git a/lib/racecar/consumer_set.rb b/lib/racecar/consumer_set.rb index 275996df..a3428102 100644 --- a/lib/racecar/consumer_set.rb +++ b/lib/racecar/consumer_set.rb @@ -175,6 +175,7 @@ def poll_with_retries(max_wait_time_ms) # polls a message for the current consumer, handling any API edge cases. def poll_current_consumer(max_wait_time_ms) + @last_poll_read_nil_message = false msg = current.poll(max_wait_time_ms) rescue Rdkafka::RdkafkaError => e case e.code @@ -212,9 +213,14 @@ def reset_current_consumer end def maybe_select_next_consumer - return unless @last_poll_read_nil_message - @last_poll_read_nil_message = false - select_next_consumer + case @config.multi_subscription_strategy + when "round-robin" + select_next_consumer + else # "exhaust-topic" + if @last_poll_read_nil_message + select_next_consumer + end + end end def select_next_consumer diff --git a/spec/consumer_set_spec.rb b/spec/consumer_set_spec.rb index 981aa018..b14244ca 100644 --- a/spec/consumer_set_spec.rb +++ b/spec/consumer_set_spec.rb @@ -572,5 +572,59 @@ def message_generator(messages) count.times { polled += consumer_set.batch_poll(100) rescue [] } expect(polled).to eq [:msg1, :msg1, :msg1, :msgN, :msgN, :msgN] end + + context "when multiple consumers are configured as 'round-robin'" do + before do + config.multi_subscription_strategy = "round-robin" + allow(rdconsumer1).to receive(:poll).and_return(topic1_message) + allow(rdconsumer2).to receive(:poll).and_return(topic2_message) + allow(rdconsumer3).to receive(:poll).and_return(topic3_message) + end + + let(:config) { Racecar::Config.new } + let(:interval) { 1000.0 } + let(:topic1_message) { double(:topic1_message) } + let(:topic2_message) { double(:topic2_message) } + let(:topic3_message) { double(:topic3_message) } + + + describe "#poll" do + it "consumes 1 message from each topic in turn" do + messages = 6.times.map { + consumer_set.poll(interval) + } + + expect(messages).to eq([ + topic1_message, + topic2_message, + topic3_message, + topic1_message, + topic2_message, + topic3_message, + ]) + end + end + + describe "#batch_poll" do + before do + config.fetch_messages = 1 + end + + it "consumes 1 batch from each topic in turn" do + messages = 6.times.map { + consumer_set.batch_poll(interval) + } + + expect(messages).to eq([ + [topic1_message], + [topic2_message], + [topic3_message], + [topic1_message], + [topic2_message], + [topic3_message], + ]) + end + end + end end end