Skip to content

Commit

Permalink
Merge pull request #331 from bestie/bestie/integration-test-performance
Browse files Browse the repository at this point in the history
Integration test performance and refactoring
  • Loading branch information
dasch authored Jun 21, 2023
2 parents 8f2491d + b183d99 commit b1b0574
Show file tree
Hide file tree
Showing 15 changed files with 255 additions and 192 deletions.
8 changes: 6 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ GEM
minitest (>= 5.1)
tzinfo (~> 2.0)
zeitwerk (~> 2.3)
byebug (11.1.3)
coderay (1.1.3)
concurrent-ruby (1.2.2)
diff-lcs (1.5.0)
Expand All @@ -28,6 +29,9 @@ GEM
pry (0.14.2)
coderay (~> 1.1)
method_source (~> 1.0)
pry-byebug (3.10.1)
byebug (~> 11.0)
pry (>= 0.13, < 0.15)
rake (13.0.6)
rdkafka (0.12.0)
ffi (~> 1.15)
Expand Down Expand Up @@ -58,11 +62,11 @@ DEPENDENCIES
activesupport (~> 6.1.0)
bundler (>= 1.13, < 3)
dogstatsd-ruby (>= 4.0.0, < 6.0.0)
pry
pry-byebug
racecar!
rake (> 10.0)
rspec (~> 3.0)
timecop

BUNDLED WITH
2.3.7
2.4.9
8 changes: 6 additions & 2 deletions lib/racecar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,16 @@ def self.instrumenter
end

def self.run(processor)
runner(processor).run
end

def self.runner(processor)
runner = Runner.new(processor, config: config, logger: logger, instrumenter: config.instrumenter)

if config.parallel_workers && config.parallel_workers > 1
ParallelRunner.new(runner: runner, config: config, logger: logger).run
ParallelRunner.new(runner: runner, config: config, logger: logger)
else
runner.run
runner
end
end
end
8 changes: 7 additions & 1 deletion lib/racecar/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def initialize(args)
@parser = build_parser
@parser.parse!(args)
@consumer_name = args.first or raise Racecar::Error, "no consumer specified"
@runner = nil
end

def run
Expand Down Expand Up @@ -65,10 +66,15 @@ def run
end

processor = consumer_class.new
Racecar.run(processor)
@runner = Racecar.runner(processor)
@runner.run
nil
end

def stop
@runner.stop
end

private

attr_reader :consumer_name
Expand Down
7 changes: 7 additions & 0 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

require "racecar/liveness_probe"
require "racecar/instrumenter"
require "racecar/rebalance_listener"

module Racecar
class Config < KingKonf::Config
Expand Down Expand Up @@ -227,6 +228,7 @@ def validate!
end

def load_consumer_class(consumer_class)
self.consumer_class = consumer_class
self.group_id = consumer_class.group_id || self.group_id

self.group_id ||= [
Expand All @@ -243,6 +245,7 @@ def load_consumer_class(consumer_class)
self.fetch_messages = consumer_class.fetch_messages || self.fetch_messages
self.pidfile ||= "#{group_id}.pid"
end
attr_accessor :consumer_class

def on_error(&handler)
@error_handler = handler
Expand Down Expand Up @@ -293,6 +296,10 @@ def liveness_probe
)
end

def rebalance_listener
RebalanceListener.new(self)
end

private

def rdkafka_security_config
Expand Down
5 changes: 4 additions & 1 deletion lib/racecar/consumer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def close

def current
@consumers[@consumer_id_iterator.peek] ||= begin
consumer = Rdkafka::Config.new(rdkafka_config(current_subscription)).consumer
consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription))
consumer_config.consumer_rebalance_listener = @config.rebalance_listener

consumer = consumer_config.consumer
@instrumenter.instrument('join_group') do
consumer.subscribe current_subscription.topic
end
Expand Down
4 changes: 4 additions & 0 deletions lib/racecar/parallel_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def run
wait_for_exit
end

def stop
terminate_workers
end

private

attr_accessor :workers
Expand Down
22 changes: 22 additions & 0 deletions lib/racecar/rebalance_listener.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module Racecar
class RebalanceListener
def initialize(config)
@config = config
@consumer_class = config.consumer_class
end

attr_reader :config, :consumer_class

def on_partitions_assigned(_consumer, topic_partition_list)
consumer_class.respond_to?(:on_partitions_assigned) &&
consumer_class.on_partitions_assigned(topic_partition_list.to_h)
rescue
end

def on_partitions_revoked(_consumer, topic_partition_list)
consumer_class.respond_to?(:on_partitions_revoked) &&
consumer_class.on_partitions_revoked(topic_partition_list.to_h)
rescue
end
end
end
2 changes: 1 addition & 1 deletion racecar.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency "rdkafka", "~> 0.12.0"

spec.add_development_dependency "bundler", [">= 1.13", "< 3"]
spec.add_development_dependency "pry"
spec.add_development_dependency "pry-byebug"
spec.add_development_dependency "rake", "> 10.0"
spec.add_development_dependency "rspec", "~> 3.0"
spec.add_development_dependency "timecop"
Expand Down
5 changes: 3 additions & 2 deletions spec/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
expect($stderr).to receive(:puts).with(/Wro+m!/)
expect($stderr).to receive(:puts).with(/Ctrl-C to shutdown consumer/)
end
let(:mock_runner) { double(:mock_runner, run: nil) }

it "doesn't start Rails if --without-rails option is specified" do
args = ["--require", "./examples/cat_consumer.rb", "CatConsumer", "--without-rails"]

allow(Racecar).to receive(:run)
allow(Racecar).to receive(:runner).and_return(mock_runner)
expect(Racecar::RailsConfigFileLoader).not_to receive(:load!)

Racecar::Cli.main(args)
Expand All @@ -31,7 +32,7 @@
it "starts Rails if the --without-rails option is omitted" do
args = ["--require", "./examples/cat_consumer.rb", "CatConsumer"]

allow(Racecar).to receive(:run)
allow(Racecar).to receive(:runner).and_return(mock_runner)
expect(Racecar::RailsConfigFileLoader).to receive(:load!)

Racecar::Cli.main(args)
Expand Down
2 changes: 1 addition & 1 deletion spec/consumer_set_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def tpl(subscription)
RSpec.describe Racecar::ConsumerSet do
let(:config) { Racecar::Config.new }
let(:rdconsumer) { double("rdconsumer", subscribe: true) }
let(:rdconfig) { double("rdconfig", consumer: rdconsumer) }
let(:rdconfig) { double("rdconfig", consumer: rdconsumer, "consumer_rebalance_listener=": nil) }
let(:logger) { Logger.new(StringIO.new) }
let(:instrumenter) { Racecar::NullInstrumenter }
let(:consumer_set) { Racecar::ConsumerSet.new(config, logger, instrumenter) }
Expand Down
Loading

0 comments on commit b1b0574

Please sign in to comment.