diff --git a/app/models/solid_queue/execution.rb b/app/models/solid_queue/execution.rb index 2792ae5d..8f1c3748 100644 --- a/app/models/solid_queue/execution.rb +++ b/app/models/solid_queue/execution.rb @@ -1,4 +1,4 @@ -class SolidQueue::Execution < ActiveRecord::Base +class SolidQueue::Execution < SolidQueue::Record self.abstract_class = true belongs_to :job diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 39f07143..e1d889c5 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -1,4 +1,4 @@ -class SolidQueue::Job < ActiveRecord::Base +class SolidQueue::Job < SolidQueue::Record include Executable if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1") diff --git a/app/models/solid_queue/pause.rb b/app/models/solid_queue/pause.rb new file mode 100644 index 00000000..0530405c --- /dev/null +++ b/app/models/solid_queue/pause.rb @@ -0,0 +1,7 @@ +module SolidQueue + class Pause < Record + def self.all_queue_names + all.pluck(:queue_name) + end + end +end diff --git a/app/models/solid_queue/process.rb b/app/models/solid_queue/process.rb index 22b07da5..f3083e01 100644 --- a/app/models/solid_queue/process.rb +++ b/app/models/solid_queue/process.rb @@ -1,4 +1,4 @@ -class SolidQueue::Process < ActiveRecord::Base +class SolidQueue::Process < SolidQueue::Record include Prunable if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1") diff --git a/app/models/solid_queue/queue.rb b/app/models/solid_queue/queue.rb index fcb944f4..7eeb39eb 100644 --- a/app/models/solid_queue/queue.rb +++ b/app/models/solid_queue/queue.rb @@ -4,7 +4,7 @@ class Queue class << self def all - SolidQueue::Job.select(:queue_name).distinct.collect do |job| + Job.select(:queue_name).distinct.collect do |job| new(job.queue_name) end end @@ -19,13 +19,15 @@ def initialize(name) end def paused? - false + Pause.exists?(queue_name: name) end def pause + Pause.create_or_find_by!(queue_name: name) end def resume + Pause.where(queue_name: name).delete_all end def clear diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 8ce77303..2260989b 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -1,6 +1,7 @@ module SolidQueue class ReadyExecution < Execution scope :ordered, -> { order(priority: :asc) } + scope :not_paused, -> { where.not(queue_name: Pause.all_queue_names) } before_create :assume_attributes_from_job @@ -24,7 +25,7 @@ def queued_as(queues) private def query_candidates(queues, limit) - queued_as(queues).ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id) + queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id) end def lock(job_ids) diff --git a/app/models/solid_queue/record.rb b/app/models/solid_queue/record.rb new file mode 100644 index 00000000..efee954c --- /dev/null +++ b/app/models/solid_queue/record.rb @@ -0,0 +1,5 @@ +module SolidQueue + class Record < ActiveRecord::Base + self.abstract_class = true + end +end diff --git a/db/migrate/20231025165946_create_solid_queue_pauses.rb b/db/migrate/20231025165946_create_solid_queue_pauses.rb new file mode 100644 index 00000000..470a7c2a --- /dev/null +++ b/db/migrate/20231025165946_create_solid_queue_pauses.rb @@ -0,0 +1,8 @@ +class CreateSolidQueuePauses < ActiveRecord::Migration[7.1] + def change + create_table :solid_queue_pauses do |t| + t.string :queue_name, null: false, index: { unique: true } + t.datetime :created_at, null: false + end + end +end diff --git a/test/dummy/app/models/job_result.rb b/test/dummy/app/models/job_result.rb index 8737a16c..e0c915a6 100644 --- a/test/dummy/app/models/job_result.rb +++ b/test/dummy/app/models/job_result.rb @@ -1,2 +1,2 @@ -class JobResult < ActiveRecord::Base +class JobResult < ApplicationRecord end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index bab40015..342809bb 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.0].define(version: 2023_02_24_193733) do +ActiveRecord::Schema[7.1].define(version: 2023_10_25_165946) do create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "queue_name" t.string "status" @@ -49,6 +49,12 @@ t.index ["queue_name", "scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" end + create_table "solid_queue_pauses", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "queue_name", null: false + t.datetime "created_at", null: false + t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true + end + create_table "solid_queue_processes", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.text "metadata" t.datetime "created_at", null: false diff --git a/test/models/solid_queue/ready_execution_test.rb b/test/models/solid_queue/ready_execution_test.rb index aaf390c8..4d2abd50 100644 --- a/test/models/solid_queue/ready_execution_test.rb +++ b/test/models/solid_queue/ready_execution_test.rb @@ -67,6 +67,17 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase end end + test "claim jobs using a wildcard and having paused queues" do + other_jobs = SolidQueue::Job.all - @jobs + other_jobs.each(&:prepare_for_execution) + + SolidQueue::Queue.find_by_name("fixtures").pause + + assert_claimed_jobs(other_jobs.count) do + SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1) + end + end + test "claim jobs using queue prefixes" do assert_claimed_jobs(2) do SolidQueue::ReadyExecution.claim("fix*", 2) diff --git a/test/test_helper.rb b/test/test_helper.rb index eb7974f6..7b2617ef 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -9,9 +9,9 @@ # Load fixtures from the engine if ActiveSupport::TestCase.respond_to?(:fixture_path=) - ActiveSupport::TestCase.fixture_path = File.expand_path("fixtures", __dir__) - ActionDispatch::IntegrationTest.fixture_path = ActiveSupport::TestCase.fixture_path - ActiveSupport::TestCase.file_fixture_path = ActiveSupport::TestCase.fixture_path + "/files" + ActiveSupport::TestCase.fixture_paths = [ File.expand_path("fixtures", __dir__) ] + ActionDispatch::IntegrationTest.fixture_paths = ActiveSupport::TestCase.fixture_paths + ActiveSupport::TestCase.file_fixture_path = ActiveSupport::TestCase.fixture_paths.first + "/files" ActiveSupport::TestCase.fixtures :all end diff --git a/test/unit/queue_test.rb b/test/unit/queue_test.rb index 849f1eaa..9a3bc2c3 100644 --- a/test/unit/queue_test.rb +++ b/test/unit/queue_test.rb @@ -31,4 +31,14 @@ class QueueTest < ActiveSupport::TestCase test "all existing queues" do assert_equal [ @background_queue ], SolidQueue::Queue.all end + + test "pause and resume queue" do + assert_changes -> { @default_queue.paused? }, from: false, to: true do + @default_queue.pause + end + + assert_changes -> { @default_queue.paused? }, from: true, to: false do + @default_queue.resume + end + end end