Skip to content

Commit

Permalink
Merge remote-tracking branch 'fork/master' into exclude-queues
Browse files Browse the repository at this point in the history
  • Loading branch information
dmytro-strukov committed Dec 12, 2023
2 parents b66bb64 + 329beaf commit bc5d00e
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 24 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ You can then do the following:
RAILS_ENV=production script/delayed_job --queue=tracking start
RAILS_ENV=production script/delayed_job --queues=mailers,tasks start

# Use the --pool option to specify a worker pool. You can use this option multiple times to start different numbers of workers for different queues.
# Option --exclude-specified-queues will do inverse of queues processing by skipping ones from --queue, --queues.
# If both --pool=* --exclude-specified-queues given, no exclusions will by applied on "*".
# A worker pool's queue list can be prefixed with a ! which has the same effect as setting
# --exclude-specified-queues but only applies it to that specific worker pool.

# Use the --pool option to specify a worker pool.
# You can use this option multiple times to start different numbers of workers for different queues.
# The following command will start 1 worker for the tracking queue,
# 2 workers for the mailers and tasks queues, and 2 workers for any jobs:
RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start
Expand All @@ -274,6 +280,9 @@ Work off queues by setting the `QUEUE` or `QUEUES` environment variable.
QUEUE=tracking rake jobs:work
QUEUES=mailers,tasks rake jobs:work

If EXCLUDE_SPECIFIED_QUEUES set to YES, then queues defined by QUEUE, QUEUES will be skipped instead.
See option --exclude-specified-queues description for special case of queue "*"

Restarting delayed_job
======================

Expand Down
59 changes: 59 additions & 0 deletions lib/delayed/backend/shared_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,65 @@ def create_job(opts = {})
expect(SimpleJob.runs).to eq(3)
end
end

context 'when asked to exclude specified queues' do
context 'and worker does not have queue set' do
before(:each) do
worker.queues = []
worker.exclude_specified_queues = true
end

it 'works off all jobs' do
expect(SimpleJob.runs).to eq(0)

create_job(:queue => 'one')
create_job(:queue => 'two')
create_job
worker.work_off

expect(SimpleJob.runs).to eq(3)
end
end

context 'and worker has one queue set' do
before(:each) do
worker.queues = ['large']
worker.exclude_specified_queues = true
end

it 'only works off jobs which are not from selected queues' do
expect(SimpleJob.runs).to eq(0)

create_job(:queue => 'large')
create_job(:queue => 'small')
create_job(:queue => 'small 2')
worker.work_off

expect(SimpleJob.runs).to eq(2)
end
end

context 'and worker has two queue set' do
before(:each) do
worker.queues = %w[large small]
worker.exclude_specified_queues = true
end

it 'only works off jobs which are not from selected queues' do
expect(SimpleJob.runs).to eq(0)

create_job(:queue => 'large')
create_job(:queue => 'small')
create_job(:queue => 'medium')
create_job(:queue => 'medium 2')
create_job

worker.work_off

expect(SimpleJob.runs).to eq(3)
end
end
end
end

context 'max_attempts' do
Expand Down
18 changes: 18 additions & 0 deletions lib/delayed/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def initialize(args) # rubocop:disable MethodLength
opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue|
@options[:queues] = queue.split(',')
end
opt.on('--exclude-specified-queues', 'Exclude looking up of queues specified by --queue[s]=') do
@options[:exclude_specified_queues] = true
end
opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool|
parse_worker_pool(pool)
end
Expand Down Expand Up @@ -119,6 +122,7 @@ def setup_pools
end

def run_process(process_name, options = {})
options = normalize_worker_options(options)
Delayed::Worker.before_fork
Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args|
$0 = File.join(options[:prefix], process_name) if @options[:prefix]
Expand Down Expand Up @@ -153,6 +157,20 @@ def parse_worker_pool(pool)
@worker_pools << [queues, worker_count]
end

def normalize_worker_options(options)
options = options.dup

# If we haven't explictly said that we do or don't want to exclude specified queues, treat a leading '!' as a negation indicator for that list of queues
# Otherwise, the ! is treated as part of the queue name itself
if options[:exclude_specified_queues].nil? && options[:queues].present?
queues = options[:queues].map {|queue| queue.sub(/^!/, '') } # remove leading ! from all queues even though we only expect the first to have one, this makes it easier to look for changes after
options[:exclude_specified_queues] = queues != options[:queues]
options[:queues] = queues
end

options
end

def root
@root ||= rails_root_defined? ? ::Rails.root : DIR_PWD
end
Expand Down
1 change: 1 addition & 0 deletions lib/delayed/tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
:min_priority => ENV['MIN_PRIORITY'],
:max_priority => ENV['MAX_PRIORITY'],
:queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','),
:exclude_specified_queues => ENV['EXCLUDE_SPECIFIED_QUEUES'].to_s.casecmp('YES').zero?,
:quiet => ENV['QUIET']
}

Expand Down
47 changes: 25 additions & 22 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@

module Delayed
class Worker # rubocop:disable ClassLength
DEFAULT_LOG_LEVEL = 'info'.freeze
DEFAULT_SLEEP_DELAY = 5
DEFAULT_MAX_ATTEMPTS = 25
DEFAULT_MAX_RUN_TIME = 4.hours
DEFAULT_DEFAULT_PRIORITY = 0
DEFAULT_DELAY_JOBS = true
DEFAULT_QUEUES = [].freeze
DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze
DEFAULT_READ_AHEAD = 5
DEFAULT_LOG_LEVEL = 'info'.freeze
DEFAULT_SLEEP_DELAY = 5
DEFAULT_MAX_ATTEMPTS = 25
DEFAULT_MAX_RUN_TIME = 4.hours
DEFAULT_DEFAULT_PRIORITY = 0
DEFAULT_DELAY_JOBS = true
DEFAULT_QUEUES = [].freeze
DEFAULT_EXCLUDE_SPECIFIED_QUEUES = false
DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze
DEFAULT_READ_AHEAD = 5

cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time,
:default_priority, :sleep_delay, :logger, :delay_jobs, :queues,
:read_ahead, :plugins, :destroy_failed_jobs, :exit_on_complete,
:default_log_level
:exclude_specified_queues, :read_ahead, :plugins, :destroy_failed_jobs,
:exit_on_complete, :default_log_level

# Named queue into which jobs are enqueued by default
cattr_accessor :default_queue_name
Expand All @@ -34,16 +35,17 @@ class Worker # rubocop:disable ClassLength
attr_accessor :name_prefix

def self.reset
self.default_log_level = DEFAULT_LOG_LEVEL
self.sleep_delay = DEFAULT_SLEEP_DELAY
self.max_attempts = DEFAULT_MAX_ATTEMPTS
self.max_run_time = DEFAULT_MAX_RUN_TIME
self.default_priority = DEFAULT_DEFAULT_PRIORITY
self.delay_jobs = DEFAULT_DELAY_JOBS
self.queues = DEFAULT_QUEUES
self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES
self.read_ahead = DEFAULT_READ_AHEAD
@lifecycle = nil
self.default_log_level = DEFAULT_LOG_LEVEL
self.sleep_delay = DEFAULT_SLEEP_DELAY
self.max_attempts = DEFAULT_MAX_ATTEMPTS
self.max_run_time = DEFAULT_MAX_RUN_TIME
self.default_priority = DEFAULT_DEFAULT_PRIORITY
self.delay_jobs = DEFAULT_DELAY_JOBS
self.queues = DEFAULT_QUEUES
self.exclude_specified_queues = DEFAULT_EXCLUDE_SPECIFIED_QUEUES
self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES
self.read_ahead = DEFAULT_READ_AHEAD
@lifecycle = nil
end

# Add or remove plugins in this list before the worker is instantiated
Expand Down Expand Up @@ -132,7 +134,8 @@ def initialize(options = {})
@quiet = options.key?(:quiet) ? options[:quiet] : true
@failed_reserve_count = 0

[:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option|
[:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues,
:exclude_specified_queues, :exit_on_complete].each do |option|
self.class.send("#{option}=", options[option]) if options.key?(option)
end

Expand Down
7 changes: 6 additions & 1 deletion spec/delayed/backend/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_ti
end
jobs.select! { |j| j.priority <= Worker.max_priority } if Worker.max_priority
jobs.select! { |j| j.priority >= Worker.min_priority } if Worker.min_priority
jobs.select! { |j| Worker.queues.include?(j.queue) } if Worker.queues.any?
if Worker.queues.any?
jobs.select! do |j|
includes = Worker.queues.include?(j.queue)
Worker.exclude_specified_queues ? !includes : includes
end
end
jobs.sort_by! { |j| [j.priority, j.run_at] }[0..limit - 1]
end

Expand Down
30 changes: 30 additions & 0 deletions spec/delayed/command_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,35 @@

command.daemonize
end

it 'should run with respect of exclude queues' do
command = Delayed::Command.new(['--pool=*:1', '--pool=lage,slow,buggy:2', '--exclude-specified-queues'])
expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once

[
['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => [], :exclude_specified_queues => true}],
['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}],
['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}]
].each do |args|
expect(command).to receive(:run_process).with(*args).once
end

command.daemonize
end

it 'should set queue exclusion to true if a queue starts with a ! and --exclude_specified_queues has not been specified' do
command = Delayed::Command.new(['--pool=fast:1', '--pool=!lage,slow,buggy:2'])
expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once

[
['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[fast], :exclude_specified_queues => false}],
['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}],
['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}]
].each do |args|
expect(command).to receive(:run_process).with(*args).once
end

command.daemonize
end
end
end

0 comments on commit bc5d00e

Please sign in to comment.