Skip to content

Commit

Permalink
Use a separate config option for concurrency maintenance task interval
Browse files Browse the repository at this point in the history
  • Loading branch information
rosa committed Nov 23, 2023
1 parent 93eb7f8 commit ee5fcc6
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
3 changes: 2 additions & 1 deletion lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ class Configuration

SCHEDULER_DEFAULTS = {
batch_size: 500,
polling_interval: 300
polling_interval: 300,
concurrency_maintenance_interval: 600
}

def initialize(mode: :work, load_from: nil)
Expand Down
5 changes: 3 additions & 2 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module SolidQueue
class Scheduler
include Runner

attr_accessor :batch_size, :polling_interval
attr_accessor :batch_size, :polling_interval, :concurrency_maintenance_interval

set_callback :start, :before, :launch_concurrency_maintenance
set_callback :shutdown, :before, :stop_concurrency_maintenance
Expand All @@ -14,6 +14,7 @@ def initialize(**options)

@batch_size = options[:batch_size]
@polling_interval = options[:polling_interval]
@concurrency_maintenance_interval = options[:concurrency_maintenance_interval]
end

private
Expand All @@ -33,7 +34,7 @@ def run
end

def launch_concurrency_maintenance
@concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: polling_interval) do
@concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: concurrency_maintenance_interval) do
expire_semaphores
unblock_blocked_executions
end
Expand Down
2 changes: 1 addition & 1 deletion test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
@result = JobResult.create!(queue_name: "default", status: "seq: ")

default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 }
scheduler = { polling_interval: 1, batch_size: 200 }
scheduler = { polling_interval: 1, batch_size: 200, concurrency_maintenance_interval: 1 }

@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler })

Expand Down

0 comments on commit ee5fcc6

Please sign in to comment.