diff --git a/lib/sidekiq_unique_jobs/batch_delete.rb b/lib/sidekiq_unique_jobs/batch_delete.rb index 350ee2521..ce44f2198 100644 --- a/lib/sidekiq_unique_jobs/batch_delete.rb +++ b/lib/sidekiq_unique_jobs/batch_delete.rb @@ -18,6 +18,7 @@ class BatchDelete PRIMED LOCKED INFO + SCORE ].freeze # includes "SidekiqUniqueJobs::Connection" diff --git a/lib/sidekiq_unique_jobs/key.rb b/lib/sidekiq_unique_jobs/key.rb index 296224f3a..90213aaba 100644 --- a/lib/sidekiq_unique_jobs/key.rb +++ b/lib/sidekiq_unique_jobs/key.rb @@ -37,6 +37,10 @@ class Key # @!attribute [r] expiring_digests # @return [String] the zset with locked expiring_digests attr_reader :expiring_digests + # + # @!attribute [r] score + # @return [String] score (timestamp) for the lock + attr_reader :score # # Initialize a new Key @@ -49,6 +53,7 @@ def initialize(digest) @primed = suffixed_key("PRIMED") @locked = suffixed_key("LOCKED") @info = suffixed_key("INFO") + @score = suffixed_key("SCORE") @changelog = CHANGELOGS @digests = DIGESTS @expiring_digests = EXPIRING_DIGESTS @@ -86,7 +91,7 @@ def ==(other) # @return [Array] an ordered array with all keys # def to_a - [digest, queued, primed, locked, info, changelog, digests, expiring_digests] + [digest, queued, primed, locked, info, changelog, digests, expiring_digests, score] end private diff --git a/lib/sidekiq_unique_jobs/lock.rb b/lib/sidekiq_unique_jobs/lock.rb index cfd2a6b8d..4a2dab042 100644 --- a/lib/sidekiq_unique_jobs/lock.rb +++ b/lib/sidekiq_unique_jobs/lock.rb @@ -73,6 +73,7 @@ def lock(job_id, lock_info = {}, score = nil) add_digest_to_set(pipeline, lock_info, score) pipeline.zadd(key.changelog, score, changelog_json(job_id, "queue.lua", "Queued")) pipeline.zadd(key.changelog, score, changelog_json(job_id, "lock.lua", "Locked")) + pipeline.set(key.score, now_f) end end end @@ -129,7 +130,7 @@ def del redis do |conn| conn.multi do |pipeline| pipeline.zrem(DIGESTS, key.digest) - pipeline.del(key.digest, key.queued, key.primed, key.locked, key.info) + pipeline.del(key.digest, key.queued, key.primed, key.locked, key.info, key.score) end end end @@ -264,6 +265,19 @@ def changelog @changelog ||= Changelog.new end + # + # The score for the lock + # + # @note Used to improve performance when deleting digests from + # sorted sets, e.g. the schedule and retry sets. + # + # + # @return [Redis::String] a string representation of the key + # + def score + @score ||= Redis::String.new(key.score) + end + # # A nicely formatted string with information about this lock # @@ -280,6 +294,7 @@ def to_s primed_jids: #{primed_jids} locked_jids: #{locked_jids} changelogs: #{changelogs} + score: #{score.value} MESSAGE end diff --git a/lib/sidekiq_unique_jobs/locksmith.rb b/lib/sidekiq_unique_jobs/locksmith.rb index 3f49619c3..7394395c4 100644 --- a/lib/sidekiq_unique_jobs/locksmith.rb +++ b/lib/sidekiq_unique_jobs/locksmith.rb @@ -230,6 +230,8 @@ def enqueue(conn) return unless validity >= 0 || config.pttl.zero? write_lock_info(conn) + write_lock_score(conn) + yield job_id end @@ -331,6 +333,20 @@ def write_lock_info(conn) conn.set(key.info, lock_info) end + # + # Writes lock score to redis. + # The lock score is used to improve performance when iterating large sorted + # sets e.g the schedule and retry sets. + # + # + # @return [void] + # + def write_lock_score(conn) + return unless lock_score? + + conn.set(key.score, lock_score) + end + # # Used to combat redis imprecision with ttl/pttl # @@ -378,6 +394,27 @@ def lock_info ) end + # + # Reads lock score from redis. + # The lock score is used to improve performance when iterating large sorted + # sets e.g the schedule and retry sets. + # + # + # @return [Float] + # + def lock_score + @lock_score ||= item[AT] + end + + # + # Checks if a lock score exists. + # + # @return [true, false] + # + def lock_score? + !lock_score.nil? + end + def redis_version @redis_version ||= SidekiqUniqueJobs.config.redis_version end diff --git a/lib/sidekiq_unique_jobs/lua/delete.lua b/lib/sidekiq_unique_jobs/lua/delete.lua index b4f6a6e64..adfca283d 100644 --- a/lib/sidekiq_unique_jobs/lua/delete.lua +++ b/lib/sidekiq_unique_jobs/lua/delete.lua @@ -1,11 +1,13 @@ -------- BEGIN keys --------- -local digest = KEYS[1] -local queued = KEYS[2] -local primed = KEYS[3] -local locked = KEYS[4] -local info = KEYS[5] -local changelog = KEYS[6] -local digests = KEYS[7] +local digest = KEYS[1] +local queued = KEYS[2] +local primed = KEYS[3] +local locked = KEYS[4] +local info = KEYS[5] +local changelog = KEYS[6] +local digests = KEYS[7] +local expiring_digests = KEYS[8] +local score = KEYS[9] -------- END keys --------- -------- BEGIN lock arguments --------- @@ -37,8 +39,8 @@ local count = 0 log_debug("ZREM", digests, digest) count = count + redis.call("ZREM", digests, digest) -log_debug("UNLINK", digest, queued, primed, locked, info) -count = count + redis.call("UNLINK", digest, queued, primed, locked, info) +log_debug("UNLINK", digest, queued, primed, locked, info, score) +count = count + redis.call("UNLINK", digest, queued, primed, locked, info, score) log("Deleted (" .. count .. ") keys") diff --git a/lib/sidekiq_unique_jobs/lua/lock.lua b/lib/sidekiq_unique_jobs/lua/lock.lua index 4d813e9a4..1f8aa3d0e 100644 --- a/lib/sidekiq_unique_jobs/lua/lock.lua +++ b/lib/sidekiq_unique_jobs/lua/lock.lua @@ -7,6 +7,7 @@ local info = KEYS[5] local changelog = KEYS[6] local digests = KEYS[7] local expiring_digests = KEYS[8] +local score = KEYS[9] -------- END keys --------- @@ -85,6 +86,9 @@ if pttl and pttl > 0 then log_debug("PEXPIRE", info, pttl) redis.call("PEXPIRE", info, pttl) + + log_debug("PEXPIRE", score, pttl) + redis.call("PEXPIRE", score, pttl) end log_debug("PEXPIRE", queued, 1000) diff --git a/lib/sidekiq_unique_jobs/lua/locked.lua b/lib/sidekiq_unique_jobs/lua/locked.lua index 33203f6ea..9b92939e2 100644 --- a/lib/sidekiq_unique_jobs/lua/locked.lua +++ b/lib/sidekiq_unique_jobs/lua/locked.lua @@ -1,11 +1,13 @@ -------- BEGIN keys --------- -local digest = KEYS[1] -local queued = KEYS[2] -local primed = KEYS[3] -local locked = KEYS[4] -local info = KEYS[5] -local changelog = KEYS[6] -local digests = KEYS[7] +local digest = KEYS[1] +local queued = KEYS[2] +local primed = KEYS[3] +local locked = KEYS[4] +local info = KEYS[5] +local changelog = KEYS[6] +local digests = KEYS[7] +local expiring_digests = KEYS[8] +local score = KEYS[9] -------- END keys --------- -------- BEGIN lock arguments --------- diff --git a/lib/sidekiq_unique_jobs/lua/queue.lua b/lib/sidekiq_unique_jobs/lua/queue.lua index 6c4787008..3fef4c426 100644 --- a/lib/sidekiq_unique_jobs/lua/queue.lua +++ b/lib/sidekiq_unique_jobs/lua/queue.lua @@ -6,6 +6,7 @@ local locked = KEYS[4] local info = KEYS[5] local changelog = KEYS[6] local digests = KEYS[7] +local score = KEYS[8] -------- END keys --------- diff --git a/lib/sidekiq_unique_jobs/lua/reap_orphans.lua b/lib/sidekiq_unique_jobs/lua/reap_orphans.lua index b52f680e1..7a67c8564 100644 --- a/lib/sidekiq_unique_jobs/lua/reap_orphans.lua +++ b/lib/sidekiq_unique_jobs/lua/reap_orphans.lua @@ -77,8 +77,9 @@ repeat local run_primed = digest .. ":RUN:PRIMED" local run_locked = digest .. ":RUN:LOCKED" local run_info = digest .. ":RUN:INFO" + local score = digest .. ":SCORE" - redis.call("UNLINK", digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info) + redis.call("UNLINK", digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info, score) redis.call("ZREM", digests_set, digest) del_count = del_count + 1 @@ -104,8 +105,9 @@ if del_count < reaper_count then local run_primed = digest .. ":RUN:PRIMED" local run_locked = digest .. ":RUN:LOCKED" local run_info = digest .. ":RUN:INFO" + local score = digest .. ":SCORE" - redis.call("UNLINK", digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info) + redis.call("UNLINK", digest, queued, primed, locked, info, run_digest, run_queued, run_primed, run_locked, run_info, score) redis.call("ZREM", expiring_digests_set, digest) del_count = del_count + 1 diff --git a/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua b/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua index 78f53cab4..654c1acec 100644 --- a/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua +++ b/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua @@ -1,11 +1,19 @@ local function delete_from_sorted_set(name, digest) - local per = 50 - local total = redis.call("zcard", name) - local index = 0 + local offset = 0 + local per = 50 + local total = redis.call("zcard", name) + local score = redis.call("get", digest .. ":SCORE") local result - while (index < total) do - local items = redis.call("ZRANGE", name, index, index + per -1) + while (offset < total) do + local items + + if score then + items = redis.call("ZRANGE", name, score, "+inf", "BYSCORE", "LIMIT", offset, per) + else + items = redis.call("ZRANGE", name, offset, offset + per -1) + end + for _, item in pairs(items) do if string.find(item, digest) then redis.call("ZREM", name, item) @@ -13,7 +21,9 @@ local function delete_from_sorted_set(name, digest) break end end - index = index + per + + offset = offset + per end + return result end diff --git a/lib/sidekiq_unique_jobs/lua/unlock.lua b/lib/sidekiq_unique_jobs/lua/unlock.lua index 70d44bf1a..36895edbb 100644 --- a/lib/sidekiq_unique_jobs/lua/unlock.lua +++ b/lib/sidekiq_unique_jobs/lua/unlock.lua @@ -1,11 +1,13 @@ -------- BEGIN keys --------- -local digest = KEYS[1] -local queued = KEYS[2] -local primed = KEYS[3] -local locked = KEYS[4] -local info = KEYS[5] -local changelog = KEYS[6] -local digests = KEYS[7] +local digest = KEYS[1] +local queued = KEYS[2] +local primed = KEYS[3] +local locked = KEYS[4] +local info = KEYS[5] +local changelog = KEYS[6] +local digests = KEYS[7] +local expiring_digests = KEYS[8] +local score = KEYS[9] -------- END keys --------- @@ -71,6 +73,9 @@ if lock_type ~= "until_expired" then log_debug("UNLINK", digest, info) redis.call("UNLINK", digest, info) + log_debug("UNLINK", score, info) + redis.call("UNLINK", score, info) + log_debug("HDEL", locked, job_id) redis.call("HDEL", locked, job_id) end diff --git a/spec/sidekiq_unique_jobs/key_spec.rb b/spec/sidekiq_unique_jobs/key_spec.rb index 728195516..db865e23e 100644 --- a/spec/sidekiq_unique_jobs/key_spec.rb +++ b/spec/sidekiq_unique_jobs/key_spec.rb @@ -28,6 +28,7 @@ uniquejobs:changelog uniquejobs:digests uniquejobs:expiring_digests + #{digest_one}:SCORE ], ) end diff --git a/spec/sidekiq_unique_jobs/lock_spec.rb b/spec/sidekiq_unique_jobs/lock_spec.rb index cc9162346..0c6e5902c 100644 --- a/spec/sidekiq_unique_jobs/lock_spec.rb +++ b/spec/sidekiq_unique_jobs/lock_spec.rb @@ -16,6 +16,7 @@ primed_jids: [] locked_jids: [] changelogs: [] + score:\s MESSAGE end @@ -45,7 +46,7 @@ it "creates all expected keys in redis" do create - expect(keys).to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests) + expect(keys).to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests, key.score) expect(create.locked_jids).to include(job_id) end end @@ -70,7 +71,7 @@ it "creates keys and adds job_id to locked hash" do expect { lock }.to change { entity.locked_jids }.to([job_id]) - expect(keys).to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests) + expect(keys).to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests, key.score) end end @@ -82,7 +83,7 @@ it "creates keys and adds job_id to locked hash" do expect { lock }.to change { entity.locked_jids }.to([job_id]) del - expect(keys).not_to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests) + expect(keys).not_to contain_exactly(key.digest, key.locked, key.info, key.changelog, key.digests, key.score) end end