Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple workers for the same queue and configuration #35

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
module SolidQueue
class Configuration
WORKER_DEFAULTS = {
pool_size: 5,
threads: 5,
processes: 1,
polling_interval: 0.1
}

Expand All @@ -28,7 +29,10 @@ def runners

def workers
if mode.in? %i[ work all]
workers_options.values.map { |worker_options| SolidQueue::Worker.new(**worker_options) }
workers_options.flat_map do |worker_options|
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes.times.collect { SolidQueue::Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
end
else
[]
end
Expand All @@ -41,8 +45,8 @@ def scheduler
end

def max_number_of_threads
# At most pool_size thread in each worker + 1 thread for the worker + 1 thread for the heartbeat task
workers_options.values.map { |options| options[:pool_size] }.max + 2
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
workers_options.map { |options| options[:threads] }.max + 2
end

private
Expand All @@ -54,9 +58,7 @@ def config_from(file_or_hash, env: Rails.env)
end

def workers_options
@workers_options ||= (raw_config[:workers] || {}).each_with_object({}) do |(queue_string, options), hsh|
hsh[queue_string] = options.merge(queues: queue_string.to_s).with_defaults(WORKER_DEFAULTS)
end.deep_symbolize_keys
@workers_options ||= (raw_config[:workers] || {}).map { |options| options.dup.symbolize_keys }
end

def scheduler_options
Expand Down
4 changes: 2 additions & 2 deletions lib/solid_queue/queue_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ module SolidQueue
class QueueParser
attr_reader :raw_queues, :relation

def initialize(queue_string, relation)
@raw_queues = queue_string.split(",").map(&:strip).presence || [ "*" ]
def initialize(queue_list, relation)
@raw_queues = Array(queue_list).map(&:strip).presence || [ "*" ]
@relation = relation
end

Expand Down
6 changes: 3 additions & 3 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)

@polling_interval = options[:polling_interval]
@queues = options[:queues].to_s
@pool = Pool.new(options[:pool_size], on_idle: -> { wake_up })
@queues = options[:queues]
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
end

private
Expand Down Expand Up @@ -42,7 +42,7 @@ def all_work_completed?
end

def metadata
super.merge(queues: queues, pool_size: pool.size, idle_threads: pool.idle_threads, polling_interval: polling_interval)
super.merge(queues: queues, thread_pool_size: pool.size, idle_threads: pool.idle_threads, polling_interval: polling_interval)
end
end
end
8 changes: 4 additions & 4 deletions test/dummy/config/solid_queue.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
default: &default
workers:
background:
pool_size: 3
default:
pool_size: 5
- queues: background
threads: 3
- queues: default
threads: 5
scheduler:
polling_interval: 1
batch_size: 500
Expand Down
2 changes: 1 addition & 1 deletion test/integration/jobs_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class JobsLifecycleTest < ActiveSupport::TestCase
setup do
@worker = SolidQueue::Worker.new(queue_name: "background", pool_size: 3, polling_interval: 1)
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 1)
@scheduler = SolidQueue::Scheduler.new(batch_size: 10, polling_interval: 1)
end

Expand Down
4 changes: 2 additions & 2 deletions test/models/solid_queue/ready_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
(SolidQueue::Job.all - @jobs).each(&:prepare_for_execution)

assert_claimed_jobs(SolidQueue::Job.count) do
SolidQueue::ReadyExecution.claim("fixtures,background", SolidQueue::Job.count + 1, 42)
SolidQueue::ReadyExecution.claim(%w[ fixtures background ], SolidQueue::Job.count + 1, 42)
end
end

Expand Down Expand Up @@ -93,7 +93,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
(SolidQueue::Job.all - @jobs).each(&:prepare_for_execution)

assert_claimed_jobs(SolidQueue::Job.count) do
SolidQueue::ReadyExecution.claim("fix*,background", SolidQueue::Job.count + 1, 42)
SolidQueue::ReadyExecution.claim(%w[ fix* background ], SolidQueue::Job.count + 1, 42)
end
end

Expand Down
17 changes: 15 additions & 2 deletions test/unit/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,28 @@ class ConfigurationTest < ActiveSupport::TestCase
end

test "provide configuration as a hash and fill defaults" do
config_as_hash = { workers: { background: { polling_interval: 10 } } }
background_worker = { queues: "background", polling_interval: 10 }
config_as_hash = { workers: [ background_worker, background_worker ] }
configuration = SolidQueue::Configuration.new(mode: :all, load_from: config_as_hash)

assert_equal SolidQueue::Configuration::SCHEDULER_DEFAULTS[:polling_interval], configuration.scheduler.polling_interval
assert configuration.workers.detect { |w| w.queues == "background" }.pool.size > 0
assert_equal 2, configuration.workers.count
assert_equal [ "background" ], configuration.workers.map(&:queues).uniq
assert_equal [ 10 ], configuration.workers.map(&:polling_interval).uniq
end

test "max number of threads" do
configuration = SolidQueue::Configuration.new(mode: :all)
assert 7, configuration.max_number_of_threads
end

test "mulitple workers with the same configuration" do
background_worker = { queues: "background", polling_interval: 10, processes: 3 }
config_as_hash = { workers: [ background_worker ] }
configuration = SolidQueue::Configuration.new(mode: :work, load_from: config_as_hash)

assert_equal 3, configuration.workers.count
assert_equal [ "background" ], configuration.workers.map(&:queues).uniq
assert_equal [ 10 ], configuration.workers.map(&:polling_interval).uniq
end
end
2 changes: 1 addition & 1 deletion test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class WorkerTest < ActiveSupport::TestCase
include ActiveSupport::Testing::MethodCallAssertions

setup do
@worker = SolidQueue::Worker.new(queue_name: "background", pool_size: 3, polling_interval: 10)
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 10)
end

teardown do
Expand Down