Skip to content

Commit

Permalink
New forking implementation (first draft)
Browse files Browse the repository at this point in the history
* Set config value `forks` instead of `parallel_workers`
* Supervisor process forks worker `n` processes
* Supervisor terminates workers when it is stopped
* Workers monitor supervisor and terminate themselves if supervisor
  exits to prevent zombie processes
* Supervisor runs pre-fork hook before forking
* Workers run post-fork hook immediately after forking
  • Loading branch information
bestie committed Feb 5, 2024
1 parent 9615a24 commit 9594e8a
Show file tree
Hide file tree
Showing 7 changed files with 423 additions and 9 deletions.
3 changes: 3 additions & 0 deletions lib/racecar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "racecar/consumer_set"
require "racecar/runner"
require "racecar/parallel_runner"
require "racecar/forking_runner"
require "racecar/producer"
require "racecar/config"
require "racecar/version"
Expand Down Expand Up @@ -74,6 +75,8 @@ def self.runner(processor)

if config.parallel_workers && config.parallel_workers > 1
ParallelRunner.new(runner: runner, config: config, logger: logger)
elsif config.forks && config.forks > 0
ForkingRunner.new(runner: runner, config: config, logger: logger)
else
runner
end
Expand Down
13 changes: 13 additions & 0 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,24 @@ class Config < KingKonf::Config
desc "Strategy for switching topics when there are multiple subscriptions. `exhaust-topic` will only switch when the consumer poll returns no messages. `round-robin` will switch after each poll regardless.\nWarning: `round-robin` will be the default in Racecar 3.x"
string :multi_subscription_strategy, allowed_values: %w(round-robin exhaust-topic), default: "exhaust-topic"

desc "How many worker processes to fork"
integer :forks, default: 0

# The error handler must be set directly on the object.
attr_reader :error_handler

attr_accessor :subscriptions, :logger, :parallel_workers

attr_accessor :prefork, :postfork

def prefork
@prefork ||= lambda { |*_| }
end

def postfork
@postfork ||= lambda { |*_| }
end

def statistics_interval_ms
if Rdkafka::Config.statistics_callback
statistics_interval * 1000
Expand Down
131 changes: 131 additions & 0 deletions lib/racecar/forking_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# frozen_string_literal: true

module Racecar
class ForkingRunner
def initialize(runner:, config:, logger:, liveness_monitor: LivenessMonitor.new)
@runner = runner
@config = config
@logger = logger
@pids = []
@liveness_monitor = liveness_monitor
@running = false
end

attr_reader :config, :runner, :logger, :pids, :liveness_monitor
private :config, :runner, :logger, :pids, :liveness_monitor

def run
config.prefork.call
@running = true

@pids = config.forks.times.map do |n|
pid = fork do
liveness_monitor.child_post_fork
config.postfork.call

liveness_monitor.on_exit do
logger.warn "Supervisor dead, exiting."
runner.stop
end

runner.run
end
logger.info "Racecar forked consumer process #{pid}"

pid
end

liveness_monitor.parent_post_fork

install_signal_handlers

wait_for_child_processes
end

def stop
@running = false
$stdout.puts "Racecar::ForkingRunner runner stopping #{Process.pid}"
terminate_workers
end

def running?
!!@running
end

private

def terminate_workers
pids.each do |pid|
begin
Process.kill("TERM", pid)
rescue Errno::ESRCH
end
end
end

def check_workers
pids.each do |pid|
unless worker_running?(pid)
$stdout.puts("A forker worker has exited. Shuttin' it down ...")
stop
return
end
end
end

def worker_running?(pid)
_, status = Process.waitpid2(pid, Process::WNOHANG)
status.nil?
rescue Errno::ECHILD
false
end

def wait_for_child_processes
pids.each do |pid|
begin
Process.wait(pid)
rescue Errno::ECHILD
end
end
end

def install_signal_handlers
Signal.trap("CHLD") do |sid|
# Received when child process terminates
# $stderr.puts "👼👼👼👼👼👼👼 SIGCHLD"
check_workers if running?
end
Signal.trap("TERM") do |sid|
stop
end
Signal.trap("INT") do |sid|
stop
end
end

class LivenessMonitor
def initialize(pipe_ends = IO.pipe)
@read_end, @write_end = pipe_ends
@monitor_thread = nil
end

attr_reader :read_end, :write_end, :monitor_thread
private :read_end, :write_end, :monitor_thread

def parent_post_fork
read_end.close
end

def child_post_fork
write_end.close
end

def on_exit(&block)
monitor_thread = Thread.new do
IO.select([read_end])
block.call
end
end
end
end
end
Binary file added racecar-2.10.beta.3.3ad7680.gem
Binary file not shown.
11 changes: 6 additions & 5 deletions spec/integration/cooperative_sticky_assignment_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
start_consumer

wait_for_assignments(2)
reset_consumer_events
publish_messages
wait_for_a_few_messages

Expand All @@ -46,8 +47,8 @@
wait_for_all_messages

aggregate_failures do
expect_consumer0_did_not_have_partitions_revoked_but_consumer1_did
expect_consumer0_took_over_processing_from_consumer1
expect_consumer0_did_not_have_partitions_revoked_but_consumer1_did
end
end

Expand Down Expand Up @@ -87,6 +88,10 @@ def start_consumer
consumer_index_by_id["#{Process.pid}-#{thread.object_id}"] = consumers.index(runner)
end

def reset_consumer_events
@received_consumer_events = []
end

def terminate_consumer1
consumers[1].stop
end
Expand All @@ -105,10 +110,6 @@ def wait_for_all_messages
end

def set_config
Racecar.config.fetch_messages = 1
Racecar.config.max_wait_time = 0.1
Racecar.config.session_timeout = 6 # minimum allowed by default broker config
Racecar.config.heartbeat_interval = 1.5
Racecar.config.partition_assignment_strategy = "cooperative-sticky"
Racecar.config.load_consumer_class(consumer_class)
end
Expand Down
Loading

0 comments on commit 9594e8a

Please sign in to comment.