From 3539250e25e7316d142b220f46b4698b251f7be9 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 2 Nov 2023 12:17:37 +0100 Subject: [PATCH] Associate process ID with claimed executions as soon as they're claimed That is, not when they're posted to the thread pool, but before, because if something goes wrong when updating that and the executions are already claimed, they'd be left in limbo, claimed but without associated process. --- app/models/solid_queue/claimed_execution.rb | 17 ++++++----------- app/models/solid_queue/ready_execution.rb | 13 ++++++------- lib/solid_queue/pool.rb | 6 +++--- lib/solid_queue/worker.rb | 4 ++-- .../solid_queue/claimed_execution_test.rb | 8 ++++---- .../models/solid_queue/ready_execution_test.rb | 18 +++++++++--------- test/unit/worker_test.rb | 2 +- 7 files changed, 31 insertions(+), 37 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index f0132c79..d71d3cd2 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -10,11 +10,11 @@ def success? CLAIM_ATTRIBUTES = %w[ job_id ] class << self - def claiming(executions, &block) - job_data = Array(executions).collect { |execution| execution.attributes.slice(*CLAIM_ATTRIBUTES) } + def claiming(executions, process_id, &block) + job_data = Array(executions).collect { |execution| { job_id: execution.job_id, process_id: process_id } } insert_all(job_data) - where(job_id: job_data.map { |data| data["job_id"]} ).tap do |claimed| + where(job_id: job_data.map { |data| data[:job_id]} ).tap do |claimed| block.call(claimed) SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs") end @@ -25,10 +25,9 @@ def release_all end end - def perform(process) - claimed_by(process) - + def perform result = execute + if result.success? finished else @@ -44,12 +43,8 @@ def release end private - def claimed_by(process) - update!(process: process) - SolidQueue.logger.info("[SolidQueue] Performing job #{job.id} - #{job.active_job_id}") - end - def execute + SolidQueue.logger.info("[SolidQueue] Performing job #{job.id} - #{job.active_job_id}") ActiveJob::Base.execute(job.arguments) Result.new(true, nil) rescue Exception => e diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index d8c630e2..750aa7e6 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -6,10 +6,10 @@ class ReadyExecution < Execution before_create :assume_attributes_from_job class << self - def claim(queues, limit) + def claim(queues, limit, process_id) transaction do candidates = select_candidates(queues, limit) - lock(candidates) + lock(candidates, process_id) end end @@ -22,18 +22,17 @@ def select_candidates(queues, limit) queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED") end - def lock(candidates) + def lock(candidates, process_id) return [] if candidates.none? - - SolidQueue::ClaimedExecution.claiming(candidates) do |claimed| + SolidQueue::ClaimedExecution.claiming(candidates, process_id) do |claimed| where(job_id: claimed.pluck(:job_id)).delete_all end end end - def claim + def claim(process_id) transaction do - SolidQueue::ClaimedExecution.claiming(self) do |claimed| + SolidQueue::ClaimedExecution.claiming(self, process_id) do |claimed| delete if claimed.one? end end diff --git a/lib/solid_queue/pool.rb b/lib/solid_queue/pool.rb index 8d7dd426..c1bcf195 100644 --- a/lib/solid_queue/pool.rb +++ b/lib/solid_queue/pool.rb @@ -15,12 +15,12 @@ def initialize(size, on_idle: nil) @mutex = Mutex.new end - def post(execution, process) + def post(execution) available_threads.decrement - future = Concurrent::Future.new(args: [ execution, process ], executor: executor) do |thread_execution, thread_process| + future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution| wrap_in_app_executor do - thread_execution.perform(thread_process) + thread_execution.perform ensure available_threads.increment mutex.synchronize { on_idle.try(:call) if idle? } diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index cfb7b964..3044a002 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -16,13 +16,13 @@ def initialize(**options) private def run - claimed_executions = SolidQueue::ReadyExecution.claim(queues, pool.idle_threads) + claimed_executions = SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id) if claimed_executions.size > 0 procline "performing #{claimed_executions.count} jobs in #{queues}" claimed_executions.each do |execution| - pool.post(execution, process) + pool.post(execution) end else procline "waiting for jobs in #{queues}" diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index b4773f57..138e48fe 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -13,7 +13,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase claimed_execution = prepare_and_claim_job(job) assert_difference -> { SolidQueue::ClaimedExecution.count }, -1 do - claimed_execution.perform(@process) + claimed_execution.perform end assert job.reload.finished? @@ -24,7 +24,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase claimed_execution = prepare_and_claim_job(job) assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do - claimed_execution.perform(@process) + claimed_execution.perform end assert_not job.reload.finished? @@ -43,7 +43,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase job = solid_queue_jobs(:raising_job) claimed_execution = prepare_and_claim_job(job) - claimed_execution.perform(@process) + claimed_execution.perform end assert_equal 1, subscriber.errors.count @@ -64,7 +64,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase private def prepare_and_claim_job(job) job.prepare_for_execution - job.reload.ready_execution.claim + job.reload.ready_execution.claim(@process.id) job.reload.claimed_execution end diff --git a/test/models/solid_queue/ready_execution_test.rb b/test/models/solid_queue/ready_execution_test.rb index 4d2abd50..9f7ede91 100644 --- a/test/models/solid_queue/ready_execution_test.rb +++ b/test/models/solid_queue/ready_execution_test.rb @@ -8,7 +8,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase test "claim all jobs for existing queue" do assert_claimed_jobs(@jobs.count) do - SolidQueue::ReadyExecution.claim("fixtures", @jobs.count + 1) + SolidQueue::ReadyExecution.claim("fixtures", @jobs.count + 1, 42) end @jobs.each do |job| @@ -19,13 +19,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase test "claim jobs for queue without jobs at the moment" do assert_no_difference [ -> { SolidQueue::ReadyExecution.count }, -> { SolidQueue::ClaimedExecution.count } ] do - SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10) + SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10, 42) end end test "claim some jobs for existing queue" do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim("fixtures", 2) + SolidQueue::ReadyExecution.claim("fixtures", 2, 42) end @jobs.order(:priority).first(2).each do |job| @@ -44,7 +44,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase job.prepare_for_execution assert_claimed_jobs(1) do - job.ready_execution.claim + job.ready_execution.claim(42) end assert_not job.reload.ready? @@ -55,7 +55,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase (SolidQueue::Job.all - @jobs).each(&:prepare_for_execution) assert_claimed_jobs(SolidQueue::Job.count) do - SolidQueue::ReadyExecution.claim("fixtures,background", SolidQueue::Job.count + 1) + SolidQueue::ReadyExecution.claim("fixtures,background", SolidQueue::Job.count + 1, 42) end end @@ -63,7 +63,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase (SolidQueue::Job.all - @jobs).each(&:prepare_for_execution) assert_claimed_jobs(SolidQueue::Job.count) do - SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1) + SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) end end @@ -74,13 +74,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase SolidQueue::Queue.find_by_name("fixtures").pause assert_claimed_jobs(other_jobs.count) do - SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1) + SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) end end test "claim jobs using queue prefixes" do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim("fix*", 2) + SolidQueue::ReadyExecution.claim("fix*", 2, 42) end @jobs.order(:priority).first(2).each do |job| @@ -93,7 +93,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase (SolidQueue::Job.all - @jobs).each(&:prepare_for_execution) assert_claimed_jobs(SolidQueue::Job.count) do - SolidQueue::ReadyExecution.claim("fix*,background", SolidQueue::Job.count + 1) + SolidQueue::ReadyExecution.claim("fix*,background", SolidQueue::Job.count + 1, 42) end end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 660d682b..01f9befe 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -19,7 +19,7 @@ class WorkerTest < ActiveSupport::TestCase subscriber = ErrorBuffer.new Rails.error.subscribe(subscriber) - SolidQueue::ClaimedExecution.any_instance.expects(:update!).raises(RuntimeError.new("everything is broken")) + SolidQueue::ClaimedExecution::Result.expects(:new).raises(RuntimeError.new("everything is broken")).at_least_once AddToBufferJob.perform_later "hey!"