From 57870921d31c21eec47e8f5e50defbb3b5999a2c Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 30 Oct 2023 17:39:27 +0100 Subject: [PATCH 1/3] Delete ready_executions only when they've been claimed If we try to claim a set of job IDs, creating claimed executions for them, but for some reason, we fail to do that for some of them, in a way that the failure is ignored (eg. on a duplicate key conflict that gets ignored with Rails's `insert_all`), we'd be deleting the ready execution for these, even if they weren't claimed at all, leaving the job in limbo. This change simplifies a bit the claiming code and ensures only those jobs that have been claimed are indeed deleted. --- app/models/solid_queue/claimed_execution.rb | 14 +++++---- app/models/solid_queue/ready_execution.rb | 33 ++++++++------------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index ebaf79d0..f0132c79 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -7,13 +7,17 @@ def success? end end + CLAIM_ATTRIBUTES = %w[ job_id ] + class << self - def claim_batch(job_ids) - claimed_at = Time.current - rows = Array(job_ids).map { |id| { job_id: id, created_at: claimed_at } } - insert_all(rows) if rows.any? + def claiming(executions, &block) + job_data = Array(executions).collect { |execution| execution.attributes.slice(*CLAIM_ATTRIBUTES) } - SolidQueue.logger.info("[SolidQueue] Claimed #{rows.size} jobs at #{claimed_at}") + insert_all(job_data) + where(job_id: job_data.map { |data| data["job_id"]} ).tap do |claimed| + block.call(claimed) + SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs") + end end def release_all diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 2260989b..d8c630e2 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -7,16 +7,10 @@ class ReadyExecution < Execution class << self def claim(queues, limit) - return [] unless limit > 0 - - candidate_job_ids = [] - transaction do - candidate_job_ids = query_candidates(queues, limit) - lock(candidate_job_ids) + candidates = select_candidates(queues, limit) + lock(candidates) end - - claimed_executions_for(candidate_job_ids) end def queued_as(queues) @@ -24,27 +18,24 @@ def queued_as(queues) end private - def query_candidates(queues, limit) - queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id) + def select_candidates(queues, limit) + queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED") end - def lock(job_ids) - return nil if job_ids.none? - SolidQueue::ClaimedExecution.claim_batch(job_ids) - where(job_id: job_ids).delete_all - end - - def claimed_executions_for(job_ids) - return [] if job_ids.none? + def lock(candidates) + return [] if candidates.none? - SolidQueue::ClaimedExecution.where(job_id: job_ids) + SolidQueue::ClaimedExecution.claiming(candidates) do |claimed| + where(job_id: claimed.pluck(:job_id)).delete_all + end end end def claim transaction do - SolidQueue::ClaimedExecution.claim_batch(job_id) - delete + SolidQueue::ClaimedExecution.claiming(self) do |claimed| + delete if claimed.one? + end end end end From 722c99be25d6e41b70a2f9e5e88f742b03225e57 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 30 Oct 2023 17:55:42 +0100 Subject: [PATCH 2/3] Add NOT NULL constraints to all job_id columns --- db/migrate/20231030164933_make_job_id_not_null.rb | 8 ++++++++ test/dummy/db/schema.rb | 10 +++++----- 2 files changed, 13 insertions(+), 5 deletions(-) create mode 100644 db/migrate/20231030164933_make_job_id_not_null.rb diff --git a/db/migrate/20231030164933_make_job_id_not_null.rb b/db/migrate/20231030164933_make_job_id_not_null.rb new file mode 100644 index 00000000..c5e2a5f5 --- /dev/null +++ b/db/migrate/20231030164933_make_job_id_not_null.rb @@ -0,0 +1,8 @@ +class MakeJobIdNotNull < ActiveRecord::Migration[7.1] + def change + change_column :solid_queue_claimed_executions, :job_id, :bigint, null: false + change_column :solid_queue_failed_executions, :job_id, :bigint, null: false + change_column :solid_queue_ready_executions, :job_id, :bigint, null: false + change_column :solid_queue_scheduled_executions, :job_id, :bigint, null: false + end +end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 342809bb..f2880229 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2023_10_25_165946) do +ActiveRecord::Schema[7.1].define(version: 2023_10_30_164933) do create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "queue_name" t.string "status" @@ -20,7 +20,7 @@ end create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id" + t.bigint "job_id", null: false t.bigint "process_id" t.datetime "created_at", null: false t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true @@ -28,7 +28,7 @@ end create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id" + t.bigint "job_id", null: false t.text "error" t.datetime "created_at", null: false t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true @@ -63,7 +63,7 @@ end create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id" + t.bigint "job_id", null: false t.string "queue_name", null: false t.integer "priority", default: 0, null: false t.datetime "created_at", null: false @@ -73,7 +73,7 @@ end create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id" + t.bigint "job_id", null: false t.string "queue_name", null: false t.integer "priority", default: 0, null: false t.datetime "scheduled_at", null: false From 57bad40e81fa298b95f4b4884d3c863b0e920625 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 30 Oct 2023 19:25:56 +0100 Subject: [PATCH 3/3] Tweak waiting times in tests to prevent timeouts locally --- test/integration/processes_lifecycle_test.rb | 2 +- test/test_helper.rb | 6 +++--- test/unit/supervisor_test.rb | 12 ++++++------ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index 17e47f7e..9e4c27c5 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -8,7 +8,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase setup do @pid = run_supervisor_as_fork - wait_for_registered_processes(3, timeout: 0.1.second) + wait_for_registered_processes(3, timeout: 0.2.second) assert_registered_processes_for(:background, :default) end diff --git a/test/test_helper.rb b/test/test_helper.rb index 7b2617ef..9c465270 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -29,7 +29,7 @@ class ActiveSupport::TestCase def wait_for_jobs_to_finish_for(timeout = 1.second) Timeout.timeout(timeout) do while SolidQueue::Job.where(finished_at: nil).any? do - sleep 0.25 + sleep 0.05 end end rescue Timeout::Error @@ -44,7 +44,7 @@ def run_supervisor_as_fork(**options) def wait_for_registered_processes(count, timeout: 1.second) Timeout.timeout(timeout) do while SolidQueue::Process.count < count do - sleep 0.25 + sleep 0.05 end end rescue Timeout::Error @@ -69,7 +69,7 @@ def wait_for_process_termination_with_timeout(pid, timeout: 10, from_parent: tru else loop do break unless process_exists?(pid) - sleep(0.1) + sleep 0.05 end end end diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 6fb69e26..2ab1baf9 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -16,7 +16,7 @@ class SupervisorTest < ActiveSupport::TestCase test "start in work mode (default)" do pid = run_supervisor_as_fork - wait_for_registered_processes(2) + wait_for_registered_processes(0.3) terminate_process(pid) @@ -25,7 +25,7 @@ class SupervisorTest < ActiveSupport::TestCase test "start in schedule mode" do pid = run_supervisor_as_fork(mode: :schedule) - wait_for_registered_processes(1) + wait_for_registered_processes(0.3) terminate_process(pid) @@ -36,7 +36,7 @@ class SupervisorTest < ActiveSupport::TestCase assert_not File.exist?(@pidfile) pid = run_supervisor_as_fork(mode: :all) - wait_for_registered_processes(3) + wait_for_registered_processes(0.3) assert File.exist?(@pidfile) assert_equal pid, File.read(@pidfile).strip.to_i @@ -50,7 +50,7 @@ class SupervisorTest < ActiveSupport::TestCase File.write(@pidfile, ::Process.pid.to_s) pid = run_supervisor_as_fork(mode: :all) - wait_for_registered_processes(3) + wait_for_registered_processes(0.3) assert File.exist?(@pidfile) assert_not_equal pid, File.read(@pidfile).strip.to_i @@ -60,7 +60,7 @@ class SupervisorTest < ActiveSupport::TestCase test "deletes previous pidfile if the owner is dead" do pid = run_supervisor_as_fork(mode: :all) - wait_for_registered_processes(3) + wait_for_registered_processes(0.3) terminate_process(pid, signal: :KILL) @@ -68,7 +68,7 @@ class SupervisorTest < ActiveSupport::TestCase assert_equal pid, File.read(@pidfile).strip.to_i pid = run_supervisor_as_fork(mode: :all) - wait_for_registered_processes(3) + wait_for_registered_processes(0.3) assert File.exist?(@pidfile) assert_equal pid, File.read(@pidfile).strip.to_i