Skip to content

Commit

Permalink
Merge pull request #30 from basecamp/pauses
Browse files Browse the repository at this point in the history
Implement queue pauses
  • Loading branch information
rosa authored Oct 26, 2023
2 parents 6ca84fc + fb33ee7 commit 4ff7083
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 11 deletions.
2 changes: 1 addition & 1 deletion app/models/solid_queue/execution.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class SolidQueue::Execution < ActiveRecord::Base
class SolidQueue::Execution < SolidQueue::Record
self.abstract_class = true

belongs_to :job
Expand Down
2 changes: 1 addition & 1 deletion app/models/solid_queue/job.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class SolidQueue::Job < ActiveRecord::Base
class SolidQueue::Job < SolidQueue::Record
include Executable

if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1")
Expand Down
7 changes: 7 additions & 0 deletions app/models/solid_queue/pause.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module SolidQueue
class Pause < Record
def self.all_queue_names
all.pluck(:queue_name)
end
end
end
2 changes: 1 addition & 1 deletion app/models/solid_queue/process.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class SolidQueue::Process < ActiveRecord::Base
class SolidQueue::Process < SolidQueue::Record
include Prunable

if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1")
Expand Down
6 changes: 4 additions & 2 deletions app/models/solid_queue/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class Queue

class << self
def all
SolidQueue::Job.select(:queue_name).distinct.collect do |job|
Job.select(:queue_name).distinct.collect do |job|
new(job.queue_name)
end
end
Expand All @@ -19,13 +19,15 @@ def initialize(name)
end

def paused?
false
Pause.exists?(queue_name: name)
end

def pause
Pause.create_or_find_by!(queue_name: name)
end

def resume
Pause.where(queue_name: name).delete_all
end

def clear
Expand Down
3 changes: 2 additions & 1 deletion app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module SolidQueue
class ReadyExecution < Execution
scope :ordered, -> { order(priority: :asc) }
scope :not_paused, -> { where.not(queue_name: Pause.all_queue_names) }

before_create :assume_attributes_from_job

Expand All @@ -24,7 +25,7 @@ def queued_as(queues)

private
def query_candidates(queues, limit)
queued_as(queues).ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id)
queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id)
end

def lock(job_ids)
Expand Down
5 changes: 5 additions & 0 deletions app/models/solid_queue/record.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module SolidQueue
class Record < ActiveRecord::Base
self.abstract_class = true
end
end
8 changes: 8 additions & 0 deletions db/migrate/20231025165946_create_solid_queue_pauses.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class CreateSolidQueuePauses < ActiveRecord::Migration[7.1]
def change
create_table :solid_queue_pauses do |t|
t.string :queue_name, null: false, index: { unique: true }
t.datetime :created_at, null: false
end
end
end
2 changes: 1 addition & 1 deletion test/dummy/app/models/job_result.rb
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
class JobResult < ActiveRecord::Base
class JobResult < ApplicationRecord
end
8 changes: 7 additions & 1 deletion test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.0].define(version: 2023_02_24_193733) do
ActiveRecord::Schema[7.1].define(version: 2023_10_25_165946) do
create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "queue_name"
t.string "status"
Expand Down Expand Up @@ -49,6 +49,12 @@
t.index ["queue_name", "scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting"
end

create_table "solid_queue_pauses", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "queue_name", null: false
t.datetime "created_at", null: false
t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true
end

create_table "solid_queue_processes", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.text "metadata"
t.datetime "created_at", null: false
Expand Down
11 changes: 11 additions & 0 deletions test/models/solid_queue/ready_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
end
end

test "claim jobs using a wildcard and having paused queues" do
other_jobs = SolidQueue::Job.all - @jobs
other_jobs.each(&:prepare_for_execution)

SolidQueue::Queue.find_by_name("fixtures").pause

assert_claimed_jobs(other_jobs.count) do
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1)
end
end

test "claim jobs using queue prefixes" do
assert_claimed_jobs(2) do
SolidQueue::ReadyExecution.claim("fix*", 2)
Expand Down
6 changes: 3 additions & 3 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

# Load fixtures from the engine
if ActiveSupport::TestCase.respond_to?(:fixture_path=)
ActiveSupport::TestCase.fixture_path = File.expand_path("fixtures", __dir__)
ActionDispatch::IntegrationTest.fixture_path = ActiveSupport::TestCase.fixture_path
ActiveSupport::TestCase.file_fixture_path = ActiveSupport::TestCase.fixture_path + "/files"
ActiveSupport::TestCase.fixture_paths = [ File.expand_path("fixtures", __dir__) ]
ActionDispatch::IntegrationTest.fixture_paths = ActiveSupport::TestCase.fixture_paths
ActiveSupport::TestCase.file_fixture_path = ActiveSupport::TestCase.fixture_paths.first + "/files"
ActiveSupport::TestCase.fixtures :all
end

Expand Down
10 changes: 10 additions & 0 deletions test/unit/queue_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,14 @@ class QueueTest < ActiveSupport::TestCase
test "all existing queues" do
assert_equal [ @background_queue ], SolidQueue::Queue.all
end

test "pause and resume queue" do
assert_changes -> { @default_queue.paused? }, from: false, to: true do
@default_queue.pause
end

assert_changes -> { @default_queue.paused? }, from: true, to: false do
@default_queue.resume
end
end
end

0 comments on commit 4ff7083

Please sign in to comment.