Skip to content

Commit

Permalink
Order both by priority and job_id, to ensure deterministic ordering
Browse files Browse the repository at this point in the history
When all priorities are the same.
  • Loading branch information
rosa committed Nov 22, 2023
1 parent 2b8f732 commit 15cbf56
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
2 changes: 1 addition & 1 deletion app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def claiming(job_ids, process_id, &block)
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }

insert_all(job_data)
where(job_id: job_ids).tap do |claimed|
where(job_id: job_ids).load.tap do |claimed|
block.call(claimed)
SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs")
end
Expand Down
7 changes: 4 additions & 3 deletions app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
module SolidQueue
class ReadyExecution < Execution
scope :queued_as, ->(queue_name) { where(queue_name: queue_name) }
scope :ordered, -> { order(priority: :asc) }
scope :ordered, -> { order(priority: :asc, job_id: :asc) }

assume_attributes_from_job

class << self
def claim(queue_list, limit, process_id)
QueueSelector.new(queue_list, self).scoped_relations.flat_map do |queue_relation|
select_and_lock(queue_relation, process_id, limit).tap do |locked|
limit -= locked.count
break if limit <= 0
limit -= locked.size
end
end
end

private
def select_and_lock(queue_relation, process_id, limit)
return [] if limit <= 0

transaction do
candidates = select_candidates(queue_relation, limit)
lock(candidates, process_id)
Expand Down
6 changes: 3 additions & 3 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)

@polling_interval = options[:polling_interval]
@queues = options[:queues]
@queues = Array(options[:queues])
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
end

Expand All @@ -21,13 +21,13 @@ def run
end

if claimed_executions.size > 0
procline "performing #{claimed_executions.count} jobs in #{queues}"
procline "performing #{claimed_executions.count} jobs"

claimed_executions.each do |execution|
pool.post(execution)
end
else
procline "waiting for jobs in #{queues}"
procline "waiting for jobs in #{queues.join(",")}"
interruptible_sleep(polling_interval)
end
end
Expand Down

0 comments on commit 15cbf56

Please sign in to comment.