diff --git a/lib/sidekiq_unique_jobs/batch_delete.rb b/lib/sidekiq_unique_jobs/batch_delete.rb index 994a7c1c..350ee252 100644 --- a/lib/sidekiq_unique_jobs/batch_delete.rb +++ b/lib/sidekiq_unique_jobs/batch_delete.rb @@ -9,7 +9,7 @@ module SidekiqUniqueJobs class BatchDelete # # @return [Integer] the default batch size - BATCH_SIZE = 100 + BATCH_SIZE = 500 # # @return [Array] Supported key suffixes diff --git a/lib/sidekiq_unique_jobs/constants.rb b/lib/sidekiq_unique_jobs/constants.rb index b9af7131..3f56569c 100644 --- a/lib/sidekiq_unique_jobs/constants.rb +++ b/lib/sidekiq_unique_jobs/constants.rb @@ -15,6 +15,7 @@ module SidekiqUniqueJobs DEAD_VERSION = "uniquejobs:dead" DIGESTS = "uniquejobs:digests" EXPIRING_DIGESTS = "uniquejobs:expiring_digests" + ORPHANED_DIGESTS = "uniquejobs:orphaned_digests" ERRORS = "errors" JID = "jid" LIMIT = "limit" diff --git a/lib/sidekiq_unique_jobs/lock.rb b/lib/sidekiq_unique_jobs/lock.rb index ad7d277e..cfd2a6b8 100644 --- a/lib/sidekiq_unique_jobs/lock.rb +++ b/lib/sidekiq_unique_jobs/lock.rb @@ -33,9 +33,9 @@ class Lock # rubocop:disable Metrics/ClassLength # # @return [Lock] a newly lock that has been locked # - def self.create(digest, job_id, lock_info = {}) - lock = new(digest, time: Timing.now_f) - lock.lock(job_id, lock_info) + def self.create(digest, job_id, lock_info: {}, time: Timing.now_f, score: nil) + lock = new(digest, time: time) + lock.lock(job_id, lock_info, score) lock end @@ -63,15 +63,16 @@ def initialize(key, time: nil) # # @return [void] # - def lock(job_id, lock_info = {}) + def lock(job_id, lock_info = {}, score = nil) + score ||= now_f redis do |conn| conn.multi do |pipeline| pipeline.set(key.digest, job_id) pipeline.hset(key.locked, job_id, now_f) info.set(lock_info, pipeline) - add_digest_to_set(pipeline, lock_info) - pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "queue.lua", "Queued")) - pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "lock.lua", "Locked")) + add_digest_to_set(pipeline, lock_info, score) + pipeline.zadd(key.changelog, score, changelog_json(job_id, "queue.lua", "Queued")) + pipeline.zadd(key.changelog, score, changelog_json(job_id, "lock.lua", "Locked")) end end end @@ -333,12 +334,14 @@ def changelog_json(job_id, script, message) # # @return [nil] # - def add_digest_to_set(pipeline, lock_info) + def add_digest_to_set(pipeline, lock_info, score = nil) + score ||= now_f digest_string = key.digest + if lock_info["lock"] == :until_expired - pipeline.zadd(key.expiring_digests, now_f + lock_info["ttl"], digest_string) + pipeline.zadd(key.expiring_digests, score + lock_info["ttl"], digest_string) else - pipeline.zadd(key.digests, now_f, digest_string) + pipeline.zadd(key.digests, score, digest_string) end end end diff --git a/lib/sidekiq_unique_jobs/orphans/reaper.rb b/lib/sidekiq_unique_jobs/orphans/reaper.rb index a560647b..57d10728 100644 --- a/lib/sidekiq_unique_jobs/orphans/reaper.rb +++ b/lib/sidekiq_unique_jobs/orphans/reaper.rb @@ -22,11 +22,12 @@ class Reaper # # @return [Hash SidekiqUniqueJobs::Orphans::NullReaper, false => SidekiqUniqueJobs::Orphans::NullReaper, + true => SidekiqUniqueJobs::Orphans::RubyReaper, }.freeze # diff --git a/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb b/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb index 4f7433bf..76cc2994 100644 --- a/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb +++ b/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb @@ -13,6 +13,9 @@ module Orphans class RubyReaper < Reaper include SidekiqUniqueJobs::Timing + # + # @return [Integer] a best guess of Sidekiq::Launcher::BEAT_PAUSE + SIDEKIQ_BEAT_PAUSE = 10 # # @return [String] the suffix for :RUN locks RUN_SUFFIX = ":RUN" @@ -74,14 +77,30 @@ def call BatchDelete.call(expired_digests, conn) BatchDelete.call(orphans, conn) + + # orphans.each_slice(500) do |chunk| + # conn.pipelined do |pipeline| + # chunk.each do |digest| + # next if belongs_to_job?(digest) + + # pipeline.zadd(ORPHANED_DIGESTS, now_f, digest) + # end + # end + # end end def expired_digests - max_score = (start_time - reaper_timeout).to_f - conn.zrange(EXPIRING_DIGESTS, 0, max_score, "byscore") end + def orphaned_digests + conn.zrange(ORPHANED_DIGESTS, 0, max_score, "byscore") + end + + def max_score + (start_time - reaper_timeout - SIDEKIQ_BEAT_PAUSE).to_f + end + # # Find orphaned digests # @@ -89,10 +108,10 @@ def expired_digests # @return [Array] an array of orphaned digests # def orphans # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity - page = 0 - per = reaper_count * 2 orphans = [] - results = conn.zrange(digests.key, page * per, (page + 1) * per) + page = 0 + per = reaper_count * 2 + results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per) while results.size.positive? results.each do |digest| @@ -107,7 +126,7 @@ def orphans # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity break if orphans.size >= reaper_count page += 1 - results = conn.zrange(digests.key, page * per, (page + 1) * per) + results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per) end orphans @@ -218,7 +237,7 @@ def match?(key_one, key_two) end def considered_active?(time_f) - (Time.now - reaper_timeout).to_f < time_f + max_score < time_f end # diff --git a/lib/sidekiq_unique_jobs/redis/sorted_set.rb b/lib/sidekiq_unique_jobs/redis/sorted_set.rb index 5cb4c2d3..e3a1c4ad 100644 --- a/lib/sidekiq_unique_jobs/redis/sorted_set.rb +++ b/lib/sidekiq_unique_jobs/redis/sorted_set.rb @@ -48,6 +48,14 @@ def add(values) end end + def byscore(min, max, offset: nil, count: nil) + redis do |conn| + return conn.zrange(key, min, max, "byscore") unless offset && count + + conn.zrange(key, min, max, "byscore", "limit", offset, count) + end + end + # # Return the zrak of the member # diff --git a/spec/sidekiq_unique_jobs/lock_spec.rb b/spec/sidekiq_unique_jobs/lock_spec.rb index f545e662..cc916234 100644 --- a/spec/sidekiq_unique_jobs/lock_spec.rb +++ b/spec/sidekiq_unique_jobs/lock_spec.rb @@ -41,7 +41,7 @@ its(:to_s) { is_expected.to eq(expected_string) } describe ".create" do - subject(:create) { described_class.create(key, job_id, lock_info) } + subject(:create) { described_class.create(key, job_id, lock_info: lock_info) } it "creates all expected keys in redis" do create @@ -77,7 +77,7 @@ describe "#del" do subject(:del) { lock.del } - let(:lock) { described_class.create(key, job_id, info) } + let(:lock) { described_class.create(key, job_id, lock_info: info) } it "creates keys and adds job_id to locked hash" do expect { lock }.to change { entity.locked_jids }.to([job_id]) diff --git a/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb b/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb index 6a934338..c938d1f0 100644 --- a/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb @@ -19,7 +19,7 @@ end let(:argv) { [100, threshold] } let(:digest) { "uniquejobs:digest" } - let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) } + let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info: lock_info) } let(:job_id) { "job_id" } let(:item) { raw_item } let(:created_at) { (Time.now - 1000).to_f } diff --git a/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb b/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb index f4d39d5c..4ecb2bb2 100644 --- a/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb +++ b/spec/sidekiq_unique_jobs/orphans/reaper_spec.rb @@ -5,8 +5,17 @@ let(:digest) { "uniquejobs:digest" } let(:job_id) { "job_id" } let(:item) { raw_item } - let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) } + let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info: lock_info, score: score) } let(:raw_item) { { "class" => MyUniqueJob, "args" => [], "jid" => job_id, "lock_digest" => digest } } + + let(:score) do + ( + Time.now - + SidekiqUniqueJobs.config.reaper_timeout - + SidekiqUniqueJobs::Orphans::RubyReaper::SIDEKIQ_BEAT_PAUSE - + 100 + ).to_f + end let(:lock_info) do { "job_id" => job_id, @@ -152,7 +161,7 @@ end context "when digest has :RUN suffix" do - let(:lock) { SidekiqUniqueJobs::Lock.create("#{digest}:RUN", job_id, lock_info) } + let(:lock) { SidekiqUniqueJobs::Lock.create("#{digest}:RUN", job_id, lock_info: lock_info) } context "that matches current digest" do # rubocop:disable RSpec/NestedGroups let(:created_at) { (Time.now - (reaper_timeout + 100)).to_f } diff --git a/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb b/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb index a53549b5..26f60ac7 100644 --- a/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb +++ b/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb @@ -5,7 +5,7 @@ let(:digest) { "uniquejobs:digest" } let(:job_id) { "job_id" } let(:item) { raw_item } - let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) } + let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info: lock_info, score: score) } let(:raw_item) { { "class" => MyUniqueJob, "args" => [], "jid" => job_id, "lock_digest" => digest } } let(:lock_info) do { @@ -20,6 +20,15 @@ } end + let(:score) do + ( + Time.now - + SidekiqUniqueJobs.config.reaper_timeout - + SidekiqUniqueJobs::Orphans::RubyReaper::SIDEKIQ_BEAT_PAUSE - + 100 + ).to_f + end + before do SidekiqUniqueJobs.disable! lock