Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable strategy for consuming multiple topics #363

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class Config < KingKonf::Config
desc "Used only by the liveness probe: Max time (in seconds) between liveness events before the process is considered not healthy"
integer :liveness_probe_max_interval, default: 5

desc "Strategy for switching topics when there are multiple subscriptions. `exhaust-topic` will only switch when the consumer poll returns no messages. `round-robin` will switch after each poll regardless.\nWarning: `round-robin` will be the default in Racecar 3.x"
string :multi_subscription_strategy, allowed_values: %w(round-robin exhaust-topic), default: "exhaust-topic"

# The error handler must be set directly on the object.
attr_reader :error_handler

Expand Down
12 changes: 9 additions & 3 deletions lib/racecar/consumer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def poll_with_retries(max_wait_time_ms)

# polls a message for the current consumer, handling any API edge cases.
def poll_current_consumer(max_wait_time_ms)
@last_poll_read_nil_message = false
msg = current.poll(max_wait_time_ms)
rescue Rdkafka::RdkafkaError => e
case e.code
Expand Down Expand Up @@ -212,9 +213,14 @@ def reset_current_consumer
end

def maybe_select_next_consumer
return unless @last_poll_read_nil_message
@last_poll_read_nil_message = false
select_next_consumer
case @config.multi_subscription_strategy
when "round-robin"
select_next_consumer
else # "exhaust-topic"
if @last_poll_read_nil_message
select_next_consumer
end
end
end

def select_next_consumer
Expand Down
54 changes: 54 additions & 0 deletions spec/consumer_set_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -572,5 +572,59 @@ def message_generator(messages)
count.times { polled += consumer_set.batch_poll(100) rescue [] }
expect(polled).to eq [:msg1, :msg1, :msg1, :msgN, :msgN, :msgN]
end

context "when multiple consumers are configured as 'round-robin'" do
before do
config.multi_subscription_strategy = "round-robin"
allow(rdconsumer1).to receive(:poll).and_return(topic1_message)
allow(rdconsumer2).to receive(:poll).and_return(topic2_message)
allow(rdconsumer3).to receive(:poll).and_return(topic3_message)
end

let(:config) { Racecar::Config.new }
let(:interval) { 1000.0 }
let(:topic1_message) { double(:topic1_message) }
let(:topic2_message) { double(:topic2_message) }
let(:topic3_message) { double(:topic3_message) }


describe "#poll" do
it "consumes 1 message from each topic in turn" do
messages = 6.times.map {
consumer_set.poll(interval)
}

expect(messages).to eq([
topic1_message,
topic2_message,
topic3_message,
topic1_message,
topic2_message,
topic3_message,
])
end
end

describe "#batch_poll" do
before do
config.fetch_messages = 1
end

it "consumes 1 batch from each topic in turn" do
messages = 6.times.map {
consumer_set.batch_poll(interval)
}

expect(messages).to eq([
[topic1_message],
[topic2_message],
[topic3_message],
[topic1_message],
[topic2_message],
[topic3_message],
])
end
end
end
end
end