Skip to content

Commit

Permalink
Perform separate queries per queue assigned to worker
Browse files Browse the repository at this point in the history
We do this in the order specified in the worker configuration, and
by priority within each queue. It's still possible to specify "*",
which means all queues, and in that case the order will be only by
priority. Paused queues aren't queried.

The way it works when there are more than one queue in the list is
as follows. Imagine the worker is configured to run
```
queue1, queue2, queue3
```
In that order. Then:

- Query as most N (N is the worker's capacity, or its number of idle threads)
  jobs from queue1. If we got at least N jobs, stop and assign these to the pool,
  and continue querying jobs from queue1.
- If we got fewer jobs than N, query jobs from queue2. If we reached N, stop and
  post those to the pool, and go back to queue1.
- If we still have space because we haven't filled N, go on to queue3 and do the
  same. Then go back to queue1.
  • Loading branch information
rosa committed Nov 22, 2023
1 parent 6b82b6b commit 2dee1a6
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 74 deletions.
41 changes: 23 additions & 18 deletions app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
@@ -1,25 +1,38 @@
module SolidQueue
class ReadyExecution < Execution
scope :queued_as, ->(queue_name) { where(queue_name: queue_name) }
scope :ordered, -> { order(priority: :asc) }
scope :not_paused, -> { where.not(queue_name: Pause.all_queue_names) }

assume_attributes_from_job

class << self
def claim(queues, limit, process_id)
transaction do
candidates = select_candidates(queues, limit)
lock(candidates, process_id)
def claim(queue_list, limit, process_id)
claimed = []

queue_names = QueueSelector.new(queue_list).queue_names
relations = queue_names.empty? ? [ all ] : queue_names.map { |queue_name| queued_as(queue_name) }

relations.each do |queue_relation|
locked = select_and_lock(queue_relation, process_id, limit)
claimed |= locked

limit -= locked.count
break if limit <= 0
end
end

def queued_as(queues)
QueueParser.new(queues, self).scoped_relation
claimed
end

private
def select_candidates(queues, limit)
queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id)
def select_and_lock(queue_relation, process_id, limit)
transaction do
candidates = select_candidates(queue_relation, limit)
lock(candidates, process_id)
end
end

def select_candidates(queue_relation, limit)
queue_relation.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id)
end

def lock(candidates, process_id)
Expand All @@ -29,13 +42,5 @@ def lock(candidates, process_id)
end
end
end

def claim(process_id)
transaction do
SolidQueue::ClaimedExecution.claiming(job_id, process_id) do |claimed|
delete if claimed.one?
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
require "solid_queue/signals"
require "solid_queue/configuration"
require "solid_queue/pool"
require "solid_queue/queue_parser"
require "solid_queue/queue_selector"
require "solid_queue/runner"
require "solid_queue/process_registration"
require "solid_queue/worker"
Expand Down
32 changes: 0 additions & 32 deletions lib/solid_queue/queue_parser.rb

This file was deleted.

36 changes: 36 additions & 0 deletions lib/solid_queue/queue_selector.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# frozen_string_literal: true

module SolidQueue
class QueueSelector
attr_reader :raw_queues

def initialize(queue_list)
@raw_queues = Array(queue_list).map { |queue| queue.to_s.strip }.presence || [ "*" ]
end

def queue_names
if all? then filter_paused_queues
else
filter_paused_queues(exact_names)
end
end

private
def all?
"*".in? raw_queues
end

def filter_paused_queues(queues = [])
paused_queues = Pause.all_queue_names
if paused_queues.empty? then queues
else
queues = queues.presence || Queue.all.map(&:name)
queues - paused_queues
end
end

def exact_names
@exact_names ||= raw_queues.select { |queue| !queue.include?("*") }
end
end
end
14 changes: 7 additions & 7 deletions test/models/solid_queue/claimed_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
setup do
@jobs = SolidQueue::Job.where(queue_name: "fixtures")
@jobs.each(&:prepare_for_execution)

@process = SolidQueue::Process.register(metadata: { queue: "fixtures" })
@process = SolidQueue::Process.register(metadata: { queue: "background" })
end

test "perform job successfully" do
Expand Down Expand Up @@ -62,12 +59,15 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
end

private
def prepare_and_claim_job(active_job)
def prepare_and_claim_job(active_job, process: @process)
job = SolidQueue::Job.find_by(active_job_id: active_job.job_id)

job.prepare_for_execution
job.reload.ready_execution.claim(@process.id)
job.reload.claimed_execution
assert_difference -> { SolidQueue::ClaimedExecution.count } => +1 do
SolidQueue::ReadyExecution.claim(job.queue_name, 1, process.id)
end

SolidQueue::ClaimedExecution.last
end

def with_error_subscriber(subscriber)
Expand Down
53 changes: 37 additions & 16 deletions test/models/solid_queue/ready_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
AddToBufferJob.set(queue: "backend", priority: 5 - i).perform_later(i)
end

@jobs = SolidQueue::Job.where(queue_name: "backend")
@jobs = SolidQueue::Job.where(queue_name: "backend").order(:priority)
end

test "claim all jobs for existing queue" do
assert_claimed_jobs(@jobs.count) do
SolidQueue::ReadyExecution.claim("backend", @jobs.count + 1, 42)
end

@jobs.each do |job|
assert_not job.reload.ready?
assert job.claimed?
end
@jobs.each(&:reload)
assert @jobs.none?(&:ready?)
assert @jobs.all?(&:claimed?)
end

test "claim jobs for queue without jobs at the moment" do
Expand All @@ -31,7 +30,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
SolidQueue::ReadyExecution.claim("backend", 2, 42)
end

@jobs.order(:priority).first(2).each do |job|
@jobs.first(2).each do |job|
assert_not job.reload.ready?
assert job.claimed?
end
Expand All @@ -42,23 +41,27 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
end
end

test "claim individual job" do
test "claim jobs using a list of queues" do
AddToBufferJob.perform_later("hey")
job = SolidQueue::Job.last

assert_claimed_jobs(1) do
job.ready_execution.claim(42)
assert_claimed_jobs(6) do
SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, 42)
end

assert_not job.reload.ready?
assert job.claimed?
end

test "claim jobs using a list of queues" do
test "queue order and then priority is respected when using a list of queues" do
AddToBufferJob.perform_later("hey")
job = SolidQueue::Job.last
assert_equal "background", job.queue_name

assert_claimed_jobs(6) do
SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, 42)
assert_claimed_jobs(3) do
SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42)
end

assert job.reload.claimed?
@jobs.first(2).each do |job|
assert_not job.reload.ready?
assert job.claimed?
end
end

Expand All @@ -70,6 +73,21 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
end
end

test "priority order is used when claiming jobs using a wildcard" do
AddToBufferJob.set(priority: 1).perform_later("hey")
job = SolidQueue::Job.last

assert_claimed_jobs(3) do
SolidQueue::ReadyExecution.claim("*", 3, 42)
end

assert job.reload.claimed?
@jobs.first(2).each do |job|
assert_not job.reload.ready?
assert job.claimed?
end
end

test "claim jobs using a wildcard and having paused queues" do
AddToBufferJob.perform_later("hey")

Expand All @@ -78,6 +96,9 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
assert_claimed_jobs(1) do
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
end

@jobs.each(&:reload)
assert @jobs.none?(&:claimed?)
end

test "claim jobs using both exact names and a wildcard" do
Expand Down

0 comments on commit 2dee1a6

Please sign in to comment.