From ca24864fced74b299c380dd3fdc4955d16d5ed8b Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 2 Nov 2023 20:59:23 +0100 Subject: [PATCH] Link supervisor with forks when registering processes And use this to clean things up (deregister processes and release all claimed executions) in cases where the forks (workers, scheduler) don't have a chance to terminate in an orderly way. --- app/models/solid_queue/process.rb | 14 ++++++-------- lib/solid_queue/process_registration.rb | 10 +++++++--- lib/solid_queue/runner.rb | 2 +- lib/solid_queue/supervisor.rb | 2 +- test/dummy/db/schema.rb | 4 +++- test/integration/processes_lifecycle_test.rb | 14 +++++++------- test/test_helper.rb | 2 +- 7 files changed, 26 insertions(+), 22 deletions(-) diff --git a/app/models/solid_queue/process.rb b/app/models/solid_queue/process.rb index f3083e01..97bba488 100644 --- a/app/models/solid_queue/process.rb +++ b/app/models/solid_queue/process.rb @@ -1,18 +1,16 @@ class SolidQueue::Process < SolidQueue::Record include Prunable - if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1") - serialize :metadata, coder: JSON - else - serialize :metadata, JSON - end - + belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :forks + has_many :forks, class_name: "SolidQueue::Process", inverse_of: :supervisor, dependent: :destroy has_many :claimed_executions + store :metadata, accessors: [ :kind, :pid ], coder: JSON + after_destroy -> { claimed_executions.release_all } - def self.register(metadata) - create!(metadata: metadata, last_heartbeat_at: Time.current) + def self.register(**attributes) + create!(attributes.merge(last_heartbeat_at: Time.current)) end def heartbeat diff --git a/lib/solid_queue/process_registration.rb b/lib/solid_queue/process_registration.rb index e5803014..e0e9f076 100644 --- a/lib/solid_queue/process_registration.rb +++ b/lib/solid_queue/process_registration.rb @@ -16,7 +16,7 @@ module ProcessRegistration set_callback :shutdown, :before, :stop_heartbeat set_callback :shutdown, :after, :deregister - attr_accessor :supervisor_pid + attr_reader :supervisor end def inspect @@ -24,11 +24,15 @@ def inspect end alias to_s inspect + def supervised_by(process) + @supervisor = process + end + private attr_accessor :process def register - @process = SolidQueue::Process.register(metadata) + @process = SolidQueue::Process.register(supervisor: supervisor, metadata: metadata) end def deregister @@ -61,7 +65,7 @@ def process_pid end def metadata - { kind: self.class.name.demodulize, hostname: hostname, pid: process_pid, supervisor_pid: supervisor_pid } + { kind: self.class.name.demodulize, hostname: hostname, pid: process_pid, supervisor_pid: supervisor&.pid } end end end diff --git a/lib/solid_queue/runner.rb b/lib/solid_queue/runner.rb index b464a099..3f513e23 100644 --- a/lib/solid_queue/runner.rb +++ b/lib/solid_queue/runner.rb @@ -90,7 +90,7 @@ def finished? end def supervisor_went_away? - supervised? && supervisor_pid != ::Process.ppid + supervised? && supervisor&.pid != ::Process.ppid end def supervised? diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 694f3514..cc9e638b 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -118,7 +118,7 @@ def prune_dead_processes end def start_runner(runner) - runner.supervisor_pid = ::Process.pid + runner.supervised_by process pid = fork do runner.start diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index f2880229..c27ad78f 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2023_10_30_164933) do +ActiveRecord::Schema[7.1].define(version: 2023_11_02_184135) do create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "queue_name" t.string "status" @@ -59,7 +59,9 @@ t.text "metadata" t.datetime "created_at", null: false t.datetime "last_heartbeat_at", null: false + t.bigint "supervisor_id" t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" + t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" end create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index e84a6c75..4b763b73 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -73,13 +73,12 @@ class ProcessLifecycleTest < ActiveSupport::TestCase assert_completed_job_results("no pause") assert_job_status(no_pause, :finished) - # This job was left claimed as the worker was shutdown without - # a chance to terminate orderly assert_started_job_result("pause") - assert_job_status(pause, :claimed) - + # Workers were shutdown without a chance to terminate orderly, but + # since they're linked to the supervisor, the supervisor deregistering + # also deregistered them and released claimed jobs # Processes didn't have a chance to deregister either - assert_registered_workers_for(:background, :default) + assert_clean_termination end test "term supervisor while there are jobs in-flight" do @@ -179,8 +178,8 @@ class ProcessLifecycleTest < ActiveSupport::TestCase assert process_exists?(@pid) terminate_supervisor - # TODO: change this to clean termination when replacing a worker also deregisters its process ID - assert_registered_workers_for(:background) + + assert_clean_termination end private @@ -197,6 +196,7 @@ def terminate_registered_processes end def assert_clean_termination + wait_for_registered_processes 0, timeout: 0.2.second assert_no_registered_processes assert_no_claimed_jobs end diff --git a/test/test_helper.rb b/test/test_helper.rb index 604e9c0d..cc98cb22 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -53,7 +53,7 @@ def run_supervisor_as_fork(**options) def wait_for_registered_processes(count, timeout: 1.second) Timeout.timeout(timeout) do - while SolidQueue::Process.count < count do + while SolidQueue::Process.count != count do sleep 0.05 end end