diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index dab37a7b..bf77be4e 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -5,11 +5,13 @@ class BlockedExecution < SolidQueue::Execution has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key - scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } + scope :expired, -> { where(expires_at: ...Time.current) } class << self def unblock(count) - release_many releasable.distinct.limit(count).pluck(:concurrency_key) + expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys| + release_many releasable(concurrency_keys) + end end def release_many(concurrency_keys) @@ -21,6 +23,14 @@ def release_many(concurrency_keys) def release_one(concurrency_key) ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release) end + + private + def releasable(concurrency_keys) + semaphores = Semaphore.where(key: concurrency_keys).pluck(:key, :value).index_by(&:key) + + # Concurrency keys without semaphore + concurrency keys with open semaphore + (concurrency_keys - semaphores.keys) | semaphores.select { |key, value| value > 0 }.map(&:first) + end end def release diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 6b0a1c00..97d178c2 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -1,6 +1,6 @@ class SolidQueue::Semaphore < SolidQueue::Record scope :available, -> { where("value > 0") } - scope :expired, -> { where(expires_at: ...Time.current)} + scope :expired, -> { where(expires_at: ...Time.current) } class << self def wait(job) diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 45855449..46ff42e6 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -34,7 +34,7 @@ module SolidQueue mattr_accessor :supervisor, default: false mattr_accessor :delete_finished_jobs, default: true - mattr_accessor :default_concurrency_control_period, default: 15.minutes + mattr_accessor :default_concurrency_control_period, default: 3.minutes def self.supervisor? supervisor diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/sequential_update_result_job.rb index a3afa33f..8cf91a9b 100644 --- a/test/dummy/app/jobs/sequential_update_result_job.rb +++ b/test/dummy/app/jobs/sequential_update_result_job.rb @@ -1,3 +1,3 @@ class SequentialUpdateResultJob < UpdateResultJob - limits_concurrency key: ->(job_result, **) { job_result } + limits_concurrency key: ->(job_result, **) { job_result }, duration: 2.seconds end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 2dbbb586..0bbcc167 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -106,8 +106,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # the semaphore but hadn't unblocked any jobs assert SolidQueue::Semaphore.signal(job) - # And wait for workers to release the jobs - wait_for_jobs_to_finish_for(3.seconds) + # And wait for the scheduler to release the jobs + wait_for_jobs_to_finish_for(5.seconds) assert_no_pending_jobs # We can't ensure the order between B and C, because it depends on which worker wins when @@ -133,11 +133,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end end - # Simulate semaphore expiration - SolidQueue::Semaphore.find_by(key: job.concurrency_key).update(expires_at: 1.hour.ago) - # And wait for scheduler to release the jobs - wait_for_jobs_to_finish_for(3.seconds) + wait_for_jobs_to_finish_for(5.seconds) assert_no_pending_jobs # We can't ensure the order between B and C, because it depends on which worker wins when