From 77b67b0501ca5dc71e558a647ebebf4f3834879a Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 26 Nov 2023 13:35:10 +0100 Subject: [PATCH] Rely on blocked_executions.expires_at to select candidates to be released Sempahores' expiry times are bumped when new jobs are successfully queued for execution or when jobs finish and release the semaphores. This means that if a blocked execution's expiry time is in the past, the semaphore's expiry time is most likely in the past too. Here's why: we know that if we still have a blocked job execution after its expiry time has passed, it's because: 1. A job holding the semaphore hasn't finished yet, and in that case, the semaphore's expiry time would have expired as well and would be cleared up right before checking blocked jobs. 2. The job holding the semaphore finished and released the semaphore but failed to unblock the next job. In that case, when we inspect the blocked job's concurrency key, we'll see the semaphore released. a. It's possible a new job is enqueued in the meantime and claims the semaphore, so we wouldn't be able to unblock that blocked job. However, if this happens, it's also more likely that this new job will succeed at unblocking it when it is finished. The more jobs that are enqueued and run, bumping the semaphore's expiry time, the more likely we are to unblock the blocked jobs via the normal method. 3. The job holding the semaphore finished but failed to release the semaphore: this case is the same as 1, the semaphore will be cleared before unblocking the execution. We take advantage of this to select X (scheduler's batch size) distinct concurrency keys from expired blocked executions, and for that we can use the index on (expires_at, concurrency_key), that filters the elements, even if we have to scan all of them to find the distcint concurrency keys using a temporary table that would get at most X items long. Then, we'll check whether these (up to) X concurrency keys are releasable and try to release them. A potential problem would be if we happen to select X concurrency keys that are expired but turn out not to be releasable. I think this should be very unlikely because for this to happen, we'd have to failed to unblock X jobs via the regular method and that other jobs using the same concurrency keys were enqueued, claiming the semaphore (case 2.a. described above), before we had the chance to unblock them, for all of them. We'd need two exceptional things to happen at once: a large backlog of concurrent jobs (using different keys) and a large amount of failed unblocked jobs. Thanks to @djmb for thinking through this with me and all the help! --- app/models/solid_queue/blocked_execution.rb | 14 ++++++++++++-- app/models/solid_queue/semaphore.rb | 2 +- lib/solid_queue.rb | 2 +- .../dummy/app/jobs/sequential_update_result_job.rb | 2 +- test/integration/concurrency_controls_test.rb | 9 +++------ 5 files changed, 18 insertions(+), 11 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index dab37a7b..bf77be4e 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -5,11 +5,13 @@ class BlockedExecution < SolidQueue::Execution has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key - scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } + scope :expired, -> { where(expires_at: ...Time.current) } class << self def unblock(count) - release_many releasable.distinct.limit(count).pluck(:concurrency_key) + expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys| + release_many releasable(concurrency_keys) + end end def release_many(concurrency_keys) @@ -21,6 +23,14 @@ def release_many(concurrency_keys) def release_one(concurrency_key) ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release) end + + private + def releasable(concurrency_keys) + semaphores = Semaphore.where(key: concurrency_keys).pluck(:key, :value).index_by(&:key) + + # Concurrency keys without semaphore + concurrency keys with open semaphore + (concurrency_keys - semaphores.keys) | semaphores.select { |key, value| value > 0 }.map(&:first) + end end def release diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 6b0a1c00..97d178c2 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -1,6 +1,6 @@ class SolidQueue::Semaphore < SolidQueue::Record scope :available, -> { where("value > 0") } - scope :expired, -> { where(expires_at: ...Time.current)} + scope :expired, -> { where(expires_at: ...Time.current) } class << self def wait(job) diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 45855449..46ff42e6 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -34,7 +34,7 @@ module SolidQueue mattr_accessor :supervisor, default: false mattr_accessor :delete_finished_jobs, default: true - mattr_accessor :default_concurrency_control_period, default: 15.minutes + mattr_accessor :default_concurrency_control_period, default: 3.minutes def self.supervisor? supervisor diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/sequential_update_result_job.rb index a3afa33f..8cf91a9b 100644 --- a/test/dummy/app/jobs/sequential_update_result_job.rb +++ b/test/dummy/app/jobs/sequential_update_result_job.rb @@ -1,3 +1,3 @@ class SequentialUpdateResultJob < UpdateResultJob - limits_concurrency key: ->(job_result, **) { job_result } + limits_concurrency key: ->(job_result, **) { job_result }, duration: 2.seconds end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 2dbbb586..0bbcc167 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -106,8 +106,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # the semaphore but hadn't unblocked any jobs assert SolidQueue::Semaphore.signal(job) - # And wait for workers to release the jobs - wait_for_jobs_to_finish_for(3.seconds) + # And wait for the scheduler to release the jobs + wait_for_jobs_to_finish_for(5.seconds) assert_no_pending_jobs # We can't ensure the order between B and C, because it depends on which worker wins when @@ -133,11 +133,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end end - # Simulate semaphore expiration - SolidQueue::Semaphore.find_by(key: job.concurrency_key).update(expires_at: 1.hour.ago) - # And wait for scheduler to release the jobs - wait_for_jobs_to_finish_for(3.seconds) + wait_for_jobs_to_finish_for(5.seconds) assert_no_pending_jobs # We can't ensure the order between B and C, because it depends on which worker wins when