Skip to content

Commit

Permalink
Associate process ID with claimed executions as soon as they're claimed
Browse files Browse the repository at this point in the history
That is, not when they're posted to the thread pool, but before, because if
something goes wrong when updating that and the executions are already claimed,
they'd be left in limbo, claimed but without associated process.
  • Loading branch information
rosa committed Nov 2, 2023
1 parent 7649fb8 commit 3539250
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 37 deletions.
17 changes: 6 additions & 11 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ def success?
CLAIM_ATTRIBUTES = %w[ job_id ]

class << self
def claiming(executions, &block)
job_data = Array(executions).collect { |execution| execution.attributes.slice(*CLAIM_ATTRIBUTES) }
def claiming(executions, process_id, &block)
job_data = Array(executions).collect { |execution| { job_id: execution.job_id, process_id: process_id } }

insert_all(job_data)
where(job_id: job_data.map { |data| data["job_id"]} ).tap do |claimed|
where(job_id: job_data.map { |data| data[:job_id]} ).tap do |claimed|
block.call(claimed)
SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs")
end
Expand All @@ -25,10 +25,9 @@ def release_all
end
end

def perform(process)
claimed_by(process)

def perform
result = execute

if result.success?
finished
else
Expand All @@ -44,12 +43,8 @@ def release
end

private
def claimed_by(process)
update!(process: process)
SolidQueue.logger.info("[SolidQueue] Performing job #{job.id} - #{job.active_job_id}")
end

def execute
SolidQueue.logger.info("[SolidQueue] Performing job #{job.id} - #{job.active_job_id}")
ActiveJob::Base.execute(job.arguments)
Result.new(true, nil)
rescue Exception => e
Expand Down
13 changes: 6 additions & 7 deletions app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ class ReadyExecution < Execution
before_create :assume_attributes_from_job

class << self
def claim(queues, limit)
def claim(queues, limit, process_id)
transaction do
candidates = select_candidates(queues, limit)
lock(candidates)
lock(candidates, process_id)
end
end

Expand All @@ -22,18 +22,17 @@ def select_candidates(queues, limit)
queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED")
end

def lock(candidates)
def lock(candidates, process_id)
return [] if candidates.none?

SolidQueue::ClaimedExecution.claiming(candidates) do |claimed|
SolidQueue::ClaimedExecution.claiming(candidates, process_id) do |claimed|
where(job_id: claimed.pluck(:job_id)).delete_all
end
end
end

def claim
def claim(process_id)
transaction do
SolidQueue::ClaimedExecution.claiming(self) do |claimed|
SolidQueue::ClaimedExecution.claiming(self, process_id) do |claimed|
delete if claimed.one?
end
end
Expand Down
6 changes: 3 additions & 3 deletions lib/solid_queue/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ def initialize(size, on_idle: nil)
@mutex = Mutex.new
end

def post(execution, process)
def post(execution)
available_threads.decrement

future = Concurrent::Future.new(args: [ execution, process ], executor: executor) do |thread_execution, thread_process|
future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
wrap_in_app_executor do
thread_execution.perform(thread_process)
thread_execution.perform
ensure
available_threads.increment
mutex.synchronize { on_idle.try(:call) if idle? }
Expand Down
4 changes: 2 additions & 2 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ def initialize(**options)

private
def run
claimed_executions = SolidQueue::ReadyExecution.claim(queues, pool.idle_threads)
claimed_executions = SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id)

if claimed_executions.size > 0
procline "performing #{claimed_executions.count} jobs in #{queues}"

claimed_executions.each do |execution|
pool.post(execution, process)
pool.post(execution)
end
else
procline "waiting for jobs in #{queues}"
Expand Down
8 changes: 4 additions & 4 deletions test/models/solid_queue/claimed_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
claimed_execution = prepare_and_claim_job(job)

assert_difference -> { SolidQueue::ClaimedExecution.count }, -1 do
claimed_execution.perform(@process)
claimed_execution.perform
end

assert job.reload.finished?
Expand All @@ -24,7 +24,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
claimed_execution = prepare_and_claim_job(job)

assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do
claimed_execution.perform(@process)
claimed_execution.perform
end

assert_not job.reload.finished?
Expand All @@ -43,7 +43,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
job = solid_queue_jobs(:raising_job)
claimed_execution = prepare_and_claim_job(job)

claimed_execution.perform(@process)
claimed_execution.perform
end

assert_equal 1, subscriber.errors.count
Expand All @@ -64,7 +64,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
private
def prepare_and_claim_job(job)
job.prepare_for_execution
job.reload.ready_execution.claim
job.reload.ready_execution.claim(@process.id)
job.reload.claimed_execution
end

Expand Down
18 changes: 9 additions & 9 deletions test/models/solid_queue/ready_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase

test "claim all jobs for existing queue" do
assert_claimed_jobs(@jobs.count) do
SolidQueue::ReadyExecution.claim("fixtures", @jobs.count + 1)
SolidQueue::ReadyExecution.claim("fixtures", @jobs.count + 1, 42)
end

@jobs.each do |job|
Expand All @@ -19,13 +19,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase

test "claim jobs for queue without jobs at the moment" do
assert_no_difference [ -> { SolidQueue::ReadyExecution.count }, -> { SolidQueue::ClaimedExecution.count } ] do
SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10)
SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10, 42)
end
end

test "claim some jobs for existing queue" do
assert_claimed_jobs(2) do
SolidQueue::ReadyExecution.claim("fixtures", 2)
SolidQueue::ReadyExecution.claim("fixtures", 2, 42)
end

@jobs.order(:priority).first(2).each do |job|
Expand All @@ -44,7 +44,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
job.prepare_for_execution

assert_claimed_jobs(1) do
job.ready_execution.claim
job.ready_execution.claim(42)
end

assert_not job.reload.ready?
Expand All @@ -55,15 +55,15 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
(SolidQueue::Job.all - @jobs).each(&:prepare_for_execution)

assert_claimed_jobs(SolidQueue::Job.count) do
SolidQueue::ReadyExecution.claim("fixtures,background", SolidQueue::Job.count + 1)
SolidQueue::ReadyExecution.claim("fixtures,background", SolidQueue::Job.count + 1, 42)
end
end

test "claim jobs using a wildcard" do
(SolidQueue::Job.all - @jobs).each(&:prepare_for_execution)

assert_claimed_jobs(SolidQueue::Job.count) do
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1)
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
end
end

Expand All @@ -74,13 +74,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
SolidQueue::Queue.find_by_name("fixtures").pause

assert_claimed_jobs(other_jobs.count) do
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1)
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
end
end

test "claim jobs using queue prefixes" do
assert_claimed_jobs(2) do
SolidQueue::ReadyExecution.claim("fix*", 2)
SolidQueue::ReadyExecution.claim("fix*", 2, 42)
end

@jobs.order(:priority).first(2).each do |job|
Expand All @@ -93,7 +93,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
(SolidQueue::Job.all - @jobs).each(&:prepare_for_execution)

assert_claimed_jobs(SolidQueue::Job.count) do
SolidQueue::ReadyExecution.claim("fix*,background", SolidQueue::Job.count + 1)
SolidQueue::ReadyExecution.claim("fix*,background", SolidQueue::Job.count + 1, 42)
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class WorkerTest < ActiveSupport::TestCase
subscriber = ErrorBuffer.new
Rails.error.subscribe(subscriber)

SolidQueue::ClaimedExecution.any_instance.expects(:update!).raises(RuntimeError.new("everything is broken"))
SolidQueue::ClaimedExecution::Result.expects(:new).raises(RuntimeError.new("everything is broken")).at_least_once

AddToBufferJob.perform_later "hey!"

Expand Down

0 comments on commit 3539250

Please sign in to comment.