Skip to content

Commit

Permalink
Allow multiple workers for the same queue and configuration
Browse files Browse the repository at this point in the history
From Donal:

> A single worker won't use more than 1 CPU core, so if you need to
> dedicate more than on CPU core on a VM to a queue you'll need multiple
> workers. I think I'll need this for the new campfire

This changes the configuration format to be like this:

```
workers:
    - queues: haystack_production_incineration
      processes: 3
      threads: 4
      polling_interval: 2
    - queues: haystack_production_recycling
      threads: 3
      polling_interval: 5
```

`processes` is 1 by default, and `queues` should be now an array of
queues (which can include wildcards).
  • Loading branch information
rosa committed Nov 3, 2023
1 parent a49b63c commit e72cd5a
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 22 deletions.
16 changes: 9 additions & 7 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
module SolidQueue
class Configuration
WORKER_DEFAULTS = {
pool_size: 5,
threads: 5,
processes: 1,
polling_interval: 0.1
}

Expand All @@ -28,7 +29,10 @@ def runners

def workers
if mode.in? %i[ work all]
workers_options.values.map { |worker_options| SolidQueue::Worker.new(**worker_options) }
workers_options.flat_map do |worker_options|
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes.times.collect { SolidQueue::Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
end
else
[]
end
Expand All @@ -41,8 +45,8 @@ def scheduler
end

def max_number_of_threads
# At most pool_size thread in each worker + 1 thread for the worker + 1 thread for the heartbeat task
workers_options.values.map { |options| options[:pool_size] }.max + 2
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
workers_options.map { |options| options[:threads] }.max + 2
end

private
Expand All @@ -54,9 +58,7 @@ def config_from(file_or_hash, env: Rails.env)
end

def workers_options
@workers_options ||= (raw_config[:workers] || {}).each_with_object({}) do |(queue_string, options), hsh|
hsh[queue_string] = options.merge(queues: queue_string.to_s).with_defaults(WORKER_DEFAULTS)
end.deep_symbolize_keys
@workers_options ||= (raw_config[:workers] || {}).map { |options| options.dup.symbolize_keys }
end

def scheduler_options
Expand Down
4 changes: 2 additions & 2 deletions lib/solid_queue/queue_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ module SolidQueue
class QueueParser
attr_reader :raw_queues, :relation

def initialize(queue_string, relation)
@raw_queues = queue_string.split(",").map(&:strip).presence || [ "*" ]
def initialize(queue_list, relation)
@raw_queues = Array(queue_list).map(&:strip).presence || [ "*" ]
@relation = relation
end

Expand Down
6 changes: 3 additions & 3 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)

@polling_interval = options[:polling_interval]
@queues = options[:queues].to_s
@pool = Pool.new(options[:pool_size], on_idle: -> { wake_up })
@queues = options[:queues]
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
end

private
Expand Down Expand Up @@ -42,7 +42,7 @@ def all_work_completed?
end

def metadata
super.merge(queues: queues, pool_size: pool.size, idle_threads: pool.idle_threads, polling_interval: polling_interval)
super.merge(queues: queues, thread_pool_size: pool.size, idle_threads: pool.idle_threads, polling_interval: polling_interval)
end
end
end
8 changes: 4 additions & 4 deletions test/dummy/config/solid_queue.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
default: &default
workers:
background:
pool_size: 3
default:
pool_size: 5
- queues: background
threads: 3
- queues: default
threads: 5
scheduler:
polling_interval: 1
batch_size: 500
Expand Down
2 changes: 1 addition & 1 deletion test/integration/jobs_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class JobsLifecycleTest < ActiveSupport::TestCase
setup do
@worker = SolidQueue::Worker.new(queue_name: "background", pool_size: 3, polling_interval: 1)
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 1)
@scheduler = SolidQueue::Scheduler.new(batch_size: 10, polling_interval: 1)
end

Expand Down
4 changes: 2 additions & 2 deletions test/models/solid_queue/ready_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
(SolidQueue::Job.all - @jobs).each(&:prepare_for_execution)

assert_claimed_jobs(SolidQueue::Job.count) do
SolidQueue::ReadyExecution.claim("fixtures,background", SolidQueue::Job.count + 1, 42)
SolidQueue::ReadyExecution.claim(%w[ fixtures background ], SolidQueue::Job.count + 1, 42)
end
end

Expand Down Expand Up @@ -93,7 +93,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
(SolidQueue::Job.all - @jobs).each(&:prepare_for_execution)

assert_claimed_jobs(SolidQueue::Job.count) do
SolidQueue::ReadyExecution.claim("fix*,background", SolidQueue::Job.count + 1, 42)
SolidQueue::ReadyExecution.claim(%w[ fix* background ], SolidQueue::Job.count + 1, 42)
end
end

Expand Down
17 changes: 15 additions & 2 deletions test/unit/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,28 @@ class ConfigurationTest < ActiveSupport::TestCase
end

test "provide configuration as a hash and fill defaults" do
config_as_hash = { workers: { background: { polling_interval: 10 } } }
background_worker = { queues: "background", polling_interval: 10 }
config_as_hash = { workers: [ background_worker, background_worker ] }
configuration = SolidQueue::Configuration.new(mode: :all, load_from: config_as_hash)

assert_equal SolidQueue::Configuration::SCHEDULER_DEFAULTS[:polling_interval], configuration.scheduler.polling_interval
assert configuration.workers.detect { |w| w.queues == "background" }.pool.size > 0
assert_equal 2, configuration.workers.count
assert_equal [ "background" ], configuration.workers.map(&:queues).uniq
assert_equal [ 10 ], configuration.workers.map(&:polling_interval).uniq
end

test "max number of threads" do
configuration = SolidQueue::Configuration.new(mode: :all)
assert 7, configuration.max_number_of_threads
end

test "mulitple workers with the same configuration" do
background_worker = { queues: "background", polling_interval: 10, processes: 3 }
config_as_hash = { workers: [ background_worker ] }
configuration = SolidQueue::Configuration.new(mode: :work, load_from: config_as_hash)

assert_equal 3, configuration.workers.count
assert_equal [ "background" ], configuration.workers.map(&:queues).uniq
assert_equal [ 10 ], configuration.workers.map(&:polling_interval).uniq
end
end
2 changes: 1 addition & 1 deletion test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class WorkerTest < ActiveSupport::TestCase
include ActiveSupport::Testing::MethodCallAssertions

setup do
@worker = SolidQueue::Worker.new(queue_name: "background", pool_size: 3, polling_interval: 10)
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 10)
end

teardown do
Expand Down

0 comments on commit e72cd5a

Please sign in to comment.