From 15cbf56cc505a0cdbcd6e8e644c45b093b27f6dd Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 22 Nov 2023 13:15:01 +0100 Subject: [PATCH] Order both by priority and job_id, to ensure deterministic ordering When all priorities are the same. --- app/models/solid_queue/claimed_execution.rb | 2 +- app/models/solid_queue/ready_execution.rb | 7 ++++--- lib/solid_queue/worker.rb | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 86d0b190..63d62571 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -12,7 +12,7 @@ def claiming(job_ids, process_id, &block) job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } } insert_all(job_data) - where(job_id: job_ids).tap do |claimed| + where(job_id: job_ids).load.tap do |claimed| block.call(claimed) SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs") end diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 5375eaac..7701d815 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -1,7 +1,7 @@ module SolidQueue class ReadyExecution < Execution scope :queued_as, ->(queue_name) { where(queue_name: queue_name) } - scope :ordered, -> { order(priority: :asc) } + scope :ordered, -> { order(priority: :asc, job_id: :asc) } assume_attributes_from_job @@ -9,14 +9,15 @@ class << self def claim(queue_list, limit, process_id) QueueSelector.new(queue_list, self).scoped_relations.flat_map do |queue_relation| select_and_lock(queue_relation, process_id, limit).tap do |locked| - limit -= locked.count - break if limit <= 0 + limit -= locked.size end end end private def select_and_lock(queue_relation, process_id, limit) + return [] if limit <= 0 + transaction do candidates = select_candidates(queue_relation, limit) lock(candidates, process_id) diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 61eeb783..cb976a91 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -10,7 +10,7 @@ def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) @polling_interval = options[:polling_interval] - @queues = options[:queues] + @queues = Array(options[:queues]) @pool = Pool.new(options[:threads], on_idle: -> { wake_up }) end @@ -21,13 +21,13 @@ def run end if claimed_executions.size > 0 - procline "performing #{claimed_executions.count} jobs in #{queues}" + procline "performing #{claimed_executions.count} jobs" claimed_executions.each do |execution| pool.post(execution) end else - procline "waiting for jobs in #{queues}" + procline "waiting for jobs in #{queues.join(",")}" interruptible_sleep(polling_interval) end end