diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index adba4dc0..90664515 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -3,7 +3,8 @@ module SolidQueue class Configuration WORKER_DEFAULTS = { - pool_size: 5, + threads: 5, + processes: 1, polling_interval: 0.1 } @@ -28,7 +29,10 @@ def runners def workers if mode.in? %i[ work all] - workers_options.values.map { |worker_options| SolidQueue::Worker.new(**worker_options) } + workers_options.flat_map do |worker_options| + processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes]) + processes.times.collect { SolidQueue::Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) } + end else [] end @@ -41,8 +45,8 @@ def scheduler end def max_number_of_threads - # At most pool_size thread in each worker + 1 thread for the worker + 1 thread for the heartbeat task - workers_options.values.map { |options| options[:pool_size] }.max + 2 + # At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task + workers_options.map { |options| options[:threads] }.max + 2 end private @@ -54,9 +58,7 @@ def config_from(file_or_hash, env: Rails.env) end def workers_options - @workers_options ||= (raw_config[:workers] || {}).each_with_object({}) do |(queue_string, options), hsh| - hsh[queue_string] = options.merge(queues: queue_string.to_s).with_defaults(WORKER_DEFAULTS) - end.deep_symbolize_keys + @workers_options ||= (raw_config[:workers] || {}).map { |options| options.dup.symbolize_keys } end def scheduler_options diff --git a/lib/solid_queue/queue_parser.rb b/lib/solid_queue/queue_parser.rb index 0eed73db..f4e576f7 100644 --- a/lib/solid_queue/queue_parser.rb +++ b/lib/solid_queue/queue_parser.rb @@ -4,8 +4,8 @@ module SolidQueue class QueueParser attr_reader :raw_queues, :relation - def initialize(queue_string, relation) - @raw_queues = queue_string.split(",").map(&:strip).presence || [ "*" ] + def initialize(queue_list, relation) + @raw_queues = Array(queue_list).map(&:strip).presence || [ "*" ] @relation = relation end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 3044a002..89b7e761 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -10,8 +10,8 @@ def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS) @polling_interval = options[:polling_interval] - @queues = options[:queues].to_s - @pool = Pool.new(options[:pool_size], on_idle: -> { wake_up }) + @queues = options[:queues] + @pool = Pool.new(options[:threads], on_idle: -> { wake_up }) end private @@ -42,7 +42,7 @@ def all_work_completed? end def metadata - super.merge(queues: queues, pool_size: pool.size, idle_threads: pool.idle_threads, polling_interval: polling_interval) + super.merge(queues: queues, thread_pool_size: pool.size, idle_threads: pool.idle_threads, polling_interval: polling_interval) end end end diff --git a/test/dummy/config/solid_queue.yml b/test/dummy/config/solid_queue.yml index 9ce3cdad..2132133c 100644 --- a/test/dummy/config/solid_queue.yml +++ b/test/dummy/config/solid_queue.yml @@ -1,9 +1,9 @@ default: &default workers: - background: - pool_size: 3 - default: - pool_size: 5 + - queues: background + threads: 3 + - queues: default + threads: 5 scheduler: polling_interval: 1 batch_size: 500 diff --git a/test/integration/jobs_lifecycle_test.rb b/test/integration/jobs_lifecycle_test.rb index fa5eefdd..972526d2 100644 --- a/test/integration/jobs_lifecycle_test.rb +++ b/test/integration/jobs_lifecycle_test.rb @@ -3,7 +3,7 @@ class JobsLifecycleTest < ActiveSupport::TestCase setup do - @worker = SolidQueue::Worker.new(queue_name: "background", pool_size: 3, polling_interval: 1) + @worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 1) @scheduler = SolidQueue::Scheduler.new(batch_size: 10, polling_interval: 1) end diff --git a/test/models/solid_queue/ready_execution_test.rb b/test/models/solid_queue/ready_execution_test.rb index 9f7ede91..3f9c05c6 100644 --- a/test/models/solid_queue/ready_execution_test.rb +++ b/test/models/solid_queue/ready_execution_test.rb @@ -55,7 +55,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase (SolidQueue::Job.all - @jobs).each(&:prepare_for_execution) assert_claimed_jobs(SolidQueue::Job.count) do - SolidQueue::ReadyExecution.claim("fixtures,background", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ fixtures background ], SolidQueue::Job.count + 1, 42) end end @@ -93,7 +93,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase (SolidQueue::Job.all - @jobs).each(&:prepare_for_execution) assert_claimed_jobs(SolidQueue::Job.count) do - SolidQueue::ReadyExecution.claim("fix*,background", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ fix* background ], SolidQueue::Job.count + 1, 42) end end diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index b1d82316..0f9f6254 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -9,15 +9,28 @@ class ConfigurationTest < ActiveSupport::TestCase end test "provide configuration as a hash and fill defaults" do - config_as_hash = { workers: { background: { polling_interval: 10 } } } + background_worker = { queues: "background", polling_interval: 10 } + config_as_hash = { workers: [ background_worker, background_worker ] } configuration = SolidQueue::Configuration.new(mode: :all, load_from: config_as_hash) assert_equal SolidQueue::Configuration::SCHEDULER_DEFAULTS[:polling_interval], configuration.scheduler.polling_interval - assert configuration.workers.detect { |w| w.queues == "background" }.pool.size > 0 + assert_equal 2, configuration.workers.count + assert_equal [ "background" ], configuration.workers.map(&:queues).uniq + assert_equal [ 10 ], configuration.workers.map(&:polling_interval).uniq end test "max number of threads" do configuration = SolidQueue::Configuration.new(mode: :all) assert 7, configuration.max_number_of_threads end + + test "mulitple workers with the same configuration" do + background_worker = { queues: "background", polling_interval: 10, processes: 3 } + config_as_hash = { workers: [ background_worker ] } + configuration = SolidQueue::Configuration.new(mode: :work, load_from: config_as_hash) + + assert_equal 3, configuration.workers.count + assert_equal [ "background" ], configuration.workers.map(&:queues).uniq + assert_equal [ 10 ], configuration.workers.map(&:polling_interval).uniq + end end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 01f9befe..ba443878 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -5,7 +5,7 @@ class WorkerTest < ActiveSupport::TestCase include ActiveSupport::Testing::MethodCallAssertions setup do - @worker = SolidQueue::Worker.new(queue_name: "background", pool_size: 3, polling_interval: 10) + @worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 10) end teardown do