diff --git a/lib/solid_queue/runner.rb b/lib/solid_queue/runner.rb index 12fd122a..b464a099 100644 --- a/lib/solid_queue/runner.rb +++ b/lib/solid_queue/runner.rb @@ -5,13 +5,8 @@ module Runner extend ActiveSupport::Concern included do - include ActiveSupport::Callbacks - define_callbacks :start, :run, :shutdown - include AppExecutor, Procline include ProcessRegistration, Interruptible - - attr_accessor :supervisor_pid end def start(mode: :supervised) @@ -109,13 +104,5 @@ def all_work_completed? def running_inline? mode.inline? end - - def hostname - @hostname ||= Socket.gethostname - end - - def process_pid - @pid ||= ::Process.pid - end end end diff --git a/lib/solid_queue/runner/process_registration.rb b/lib/solid_queue/runner/process_registration.rb index 93267c83..87668ee8 100644 --- a/lib/solid_queue/runner/process_registration.rb +++ b/lib/solid_queue/runner/process_registration.rb @@ -4,6 +4,9 @@ module SolidQueue::Runner::ProcessRegistration extend ActiveSupport::Concern included do + include ActiveSupport::Callbacks + define_callbacks :start, :run, :shutdown + set_callback :start, :before, :register set_callback :start, :before, :start_heartbeat @@ -11,6 +14,8 @@ module SolidQueue::Runner::ProcessRegistration set_callback :shutdown, :before, :stop_heartbeat set_callback :shutdown, :after, :deregister + + attr_accessor :supervisor_pid end def inspect @@ -46,6 +51,14 @@ def heartbeat process.heartbeat end + def hostname + @hostname ||= Socket.gethostname + end + + def process_pid + @pid ||= ::Process.pid + end + def metadata { kind: self.class.name.demodulize, hostname: hostname, pid: process_pid, supervisor_pid: supervisor_pid } end diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index 4cf9b7fd..5b38873e 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -9,7 +9,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase @pid = run_supervisor_as_fork wait_for_registered_processes(3, timeout: 0.2.second) - assert_registered_processes_for(:background, :default) + assert_registered_workers_for(:background, :default) end teardown do @@ -76,7 +76,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase assert_job_status(pause, :claimed) # Processes didn't have a chance to deregister either - assert_registered_processes_for(:background, :default) + assert_registered_workers_for(:background, :default) end test "term supervisor while there are jobs in-flight" do @@ -127,7 +127,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase assert_job_status(pause, :claimed) # The process running the long job couldn't deregister, the other did - assert_registered_processes_for(:background) + assert_registered_workers_for(:background) end test "process some jobs that raise errors" do @@ -177,7 +177,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase terminate_supervisor # TODO: change this to clean termination when replacing a worker also deregisters its process ID - assert_registered_processes_for(:background) + assert_registered_workers_for(:background) end private @@ -198,10 +198,12 @@ def assert_clean_termination assert_no_claimed_jobs end - def assert_registered_processes_for(*queues) + def assert_registered_workers_for(*queues) skip_active_record_query_cache do registered_queues = SolidQueue::Process.all.map { |process| process.metadata["queues"] }.compact assert_equal queues.map(&:to_s).sort, registered_queues.sort + assert_equal [ "Worker" ], SolidQueue::Process.all.map { |process| process.metadata["kind"] }.uniq + assert_equal [ @pid ], SolidQueue::Process.all.map { |process| process.metadata["supervisor_pid"] }.uniq end end diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 3a83c237..e65ae749 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -17,6 +17,7 @@ class SupervisorTest < ActiveSupport::TestCase test "start in work mode (default)" do pid = run_supervisor_as_fork wait_for_registered_processes(0.3) + assert_registered_workers(2, supervisor_pid: pid) terminate_process(pid) @@ -26,6 +27,7 @@ class SupervisorTest < ActiveSupport::TestCase test "start in schedule mode" do pid = run_supervisor_as_fork(mode: :schedule) wait_for_registered_processes(0.3) + assert_registered_scheduler(supervisor_pid: pid) terminate_process(pid) @@ -76,4 +78,26 @@ class SupervisorTest < ActiveSupport::TestCase terminate_process(pid) end + + private + def assert_registered_workers(count, **metadata) + skip_active_record_query_cache do + assert_equal count, SolidQueue::Process.count + + SolidQueue::Process.all.each do |process| + assert_equal "Worker", process.metadata["kind"] + assert metadata < process.metadata.symbolize_keys + end + end + end + + def assert_registered_scheduler(**metadata) + skip_active_record_query_cache do + assert_equal 1, SolidQueue::Process.count + scheduler = SolidQueue::Process.first + + assert_equal "Scheduler", scheduler.metadata["kind"] + assert metadata < scheduler.metadata.symbolize_keys + end + end end