From 236478ff0b589e737598159951d32bff709e1a5d Mon Sep 17 00:00:00 2001 From: Stephen Best Date: Thu, 18 Jan 2024 15:53:47 +0100 Subject: [PATCH] Configurable strategy for consuming multiple topics The current ConsumerSet implementation will only switch topics when the current topic returns no messages after the poll timeout. This is not practical when one or more of the topics has a 'high' message volume as the broker will always return a message and Racecar may never switch to poll the other topics. * 'high' here means message frequency (hz) > poll interval (s). --- lib/racecar/config.rb | 3 +++ lib/racecar/consumer_set.rb | 12 ++++++--- spec/consumer_set_spec.rb | 54 +++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/lib/racecar/config.rb b/lib/racecar/config.rb index bac42dd5..17039f09 100644 --- a/lib/racecar/config.rb +++ b/lib/racecar/config.rb @@ -185,6 +185,9 @@ class Config < KingKonf::Config desc "Used only by the liveness probe: Max time (in seconds) between liveness events before the process is considered not healthy" integer :liveness_probe_max_interval, default: 5 + desc "Strategy for switching topics when there are multiple subscriptions. `exhaust-topic` will only switch when the consumer poll returns no messages. `round-robin` will switch after each poll regardless.\nWarning: `round-robin` will be the default in Racecar 3.x" + string :multi_subscription_strategy, allowed_values: %w(round-robin exhaust-topic), default: "exhaust-topic" + # The error handler must be set directly on the object. attr_reader :error_handler diff --git a/lib/racecar/consumer_set.rb b/lib/racecar/consumer_set.rb index 275996df..a3428102 100644 --- a/lib/racecar/consumer_set.rb +++ b/lib/racecar/consumer_set.rb @@ -175,6 +175,7 @@ def poll_with_retries(max_wait_time_ms) # polls a message for the current consumer, handling any API edge cases. def poll_current_consumer(max_wait_time_ms) + @last_poll_read_nil_message = false msg = current.poll(max_wait_time_ms) rescue Rdkafka::RdkafkaError => e case e.code @@ -212,9 +213,14 @@ def reset_current_consumer end def maybe_select_next_consumer - return unless @last_poll_read_nil_message - @last_poll_read_nil_message = false - select_next_consumer + case @config.multi_subscription_strategy + when "round-robin" + select_next_consumer + else # "exhaust-topic" + if @last_poll_read_nil_message + select_next_consumer + end + end end def select_next_consumer diff --git a/spec/consumer_set_spec.rb b/spec/consumer_set_spec.rb index 981aa018..b14244ca 100644 --- a/spec/consumer_set_spec.rb +++ b/spec/consumer_set_spec.rb @@ -572,5 +572,59 @@ def message_generator(messages) count.times { polled += consumer_set.batch_poll(100) rescue [] } expect(polled).to eq [:msg1, :msg1, :msg1, :msgN, :msgN, :msgN] end + + context "when multiple consumers are configured as 'round-robin'" do + before do + config.multi_subscription_strategy = "round-robin" + allow(rdconsumer1).to receive(:poll).and_return(topic1_message) + allow(rdconsumer2).to receive(:poll).and_return(topic2_message) + allow(rdconsumer3).to receive(:poll).and_return(topic3_message) + end + + let(:config) { Racecar::Config.new } + let(:interval) { 1000.0 } + let(:topic1_message) { double(:topic1_message) } + let(:topic2_message) { double(:topic2_message) } + let(:topic3_message) { double(:topic3_message) } + + + describe "#poll" do + it "consumes 1 message from each topic in turn" do + messages = 6.times.map { + consumer_set.poll(interval) + } + + expect(messages).to eq([ + topic1_message, + topic2_message, + topic3_message, + topic1_message, + topic2_message, + topic3_message, + ]) + end + end + + describe "#batch_poll" do + before do + config.fetch_messages = 1 + end + + it "consumes 1 batch from each topic in turn" do + messages = 6.times.map { + consumer_set.batch_poll(interval) + } + + expect(messages).to eq([ + [topic1_message], + [topic2_message], + [topic3_message], + [topic1_message], + [topic2_message], + [topic3_message], + ]) + end + end + end end end