diff --git a/lib/sidekiq_unique_jobs/locksmith.rb b/lib/sidekiq_unique_jobs/locksmith.rb index 3f49619c3..9a8096e6b 100644 --- a/lib/sidekiq_unique_jobs/locksmith.rb +++ b/lib/sidekiq_unique_jobs/locksmith.rb @@ -82,7 +82,7 @@ def delete # Deletes the lock regardless of if it has a pttl set # def delete! - call_script(:delete, key.to_a, [job_id, config.pttl, config.type, config.limit]).to_i.positive? + call_script(:delete, key.to_a, argv).to_i.positive? end # @@ -362,7 +362,11 @@ def taken?(conn) end def argv - [job_id, config.pttl, config.type, config.limit] + [job_id, config.pttl, config.type, config.limit, lock_score] + end + + def lock_score + item[AT].to_s end def lock_info @@ -375,6 +379,7 @@ def lock_info TYPE => config.type, LOCK_ARGS => item[LOCK_ARGS], TIME => now_f, + AT => lock_score, ) end diff --git a/lib/sidekiq_unique_jobs/lua/delete.lua b/lib/sidekiq_unique_jobs/lua/delete.lua index b4f6a6e64..1b325a385 100644 --- a/lib/sidekiq_unique_jobs/lua/delete.lua +++ b/lib/sidekiq_unique_jobs/lua/delete.lua @@ -13,14 +13,15 @@ local job_id = ARGV[1] local pttl = tonumber(ARGV[2]) local lock_type = ARGV[3] local limit = tonumber(ARGV[4]) +local score = ARGV[5] -------- END lock arguments ----------- -------- BEGIN injected arguments -------- -local current_time = tonumber(ARGV[5]) -local debug_lua = tostring(ARGV[6]) == "1" -local max_history = tonumber(ARGV[7]) -local script_name = tostring(ARGV[8]) .. ".lua" -local redisversion = tostring(ARGV[9]) +local current_time = tonumber(ARGV[6]) +local debug_lua = tostring(ARGV[7]) == "1" +local max_history = tonumber(ARGV[8]) +local script_name = tostring(ARGV[9]) .. ".lua" +local redisversion = tostring(ARGV[10]) --------- END injected arguments --------- -------- BEGIN local functions -------- diff --git a/lib/sidekiq_unique_jobs/lua/lock.lua b/lib/sidekiq_unique_jobs/lua/lock.lua index 4d813e9a4..bb59abbf7 100644 --- a/lib/sidekiq_unique_jobs/lua/lock.lua +++ b/lib/sidekiq_unique_jobs/lua/lock.lua @@ -15,15 +15,16 @@ local job_id = ARGV[1] local pttl = tonumber(ARGV[2]) local lock_type = ARGV[3] local limit = tonumber(ARGV[4]) +local score = ARGV[5] -------- END lock arguments ----------- -------- BEGIN injected arguments -------- -local current_time = tonumber(ARGV[5]) -local debug_lua = tostring(ARGV[6]) == "1" -local max_history = tonumber(ARGV[7]) -local script_name = tostring(ARGV[8]) .. ".lua" -local redisversion = ARGV[9] +local current_time = tonumber(ARGV[6]) +local debug_lua = tostring(ARGV[7]) == "1" +local max_history = tonumber(ARGV[8]) +local script_name = tostring(ARGV[9]) .. ".lua" +local redisversion = ARGV[10] --------- END injected arguments --------- @@ -62,8 +63,13 @@ if lock_type == "until_expired" and pttl and pttl > 0 then log_debug("ZADD", expiring_digests, current_time + pttl, digest) redis.call("ZADD", expiring_digests, current_time + pttl, digest) else - log_debug("ZADD", digests, current_time, digest) - redis.call("ZADD", digests, current_time, digest) + if #score == 0 then + log_debug("ZADD", digests, current_time, digest) + redis.call("ZADD", digests, current_time, digest) + else + log_debug("ZADD", digests, score, digest) + redis.call("ZADD", digests, score, digest) + end end log_debug("HSET", locked, job_id, current_time) diff --git a/lib/sidekiq_unique_jobs/lua/queue.lua b/lib/sidekiq_unique_jobs/lua/queue.lua index 6c4787008..e0579c93d 100644 --- a/lib/sidekiq_unique_jobs/lua/queue.lua +++ b/lib/sidekiq_unique_jobs/lua/queue.lua @@ -14,14 +14,15 @@ local job_id = ARGV[1] -- The job_id that was previously primed local pttl = tonumber(ARGV[2]) local lock_type = ARGV[3] local limit = tonumber(ARGV[4]) +local score = ARGV[5] -------- END lock arguments ----------- -------- BEGIN injected arguments -------- -local current_time = tonumber(ARGV[5]) -local debug_lua = tostring(ARGV[6]) == "1" -local max_history = tonumber(ARGV[7]) -local script_name = tostring(ARGV[8]) .. ".lua" +local current_time = tonumber(ARGV[6]) +local debug_lua = tostring(ARGV[7]) == "1" +local max_history = tonumber(ARGV[8]) +local script_name = tostring(ARGV[9]) .. ".lua" --------- END injected arguments --------- diff --git a/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua b/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua index 78f53cab4..bde751bbf 100644 --- a/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua +++ b/lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua @@ -1,19 +1,32 @@ local function delete_from_sorted_set(name, digest) - local per = 50 - local total = redis.call("zcard", name) - local index = 0 - local result + local score = redis.call("ZSCORE", "uniquejobs:digests", digest) + local total = redis.call("ZCARD", name) + local per = 50 + local offset = 0 + + while (offset < total) do + local items + + if score then + items = redis.call("ZRANGE", name, score, "+inf", "BYSCORE", "LIMIT", offset, per) + else + items = redis.call("ZRANGE", name, offset, offset + per -1) + end + + if #items == 0 then + break + end - while (index < total) do - local items = redis.call("ZRANGE", name, index, index + per -1) for _, item in pairs(items) do if string.find(item, digest) then redis.call("ZREM", name, item) - result = item - break + + return item end end - index = index + per + + offset = offset + per end - return result + + return nil end diff --git a/lib/sidekiq_unique_jobs/lua/unlock.lua b/lib/sidekiq_unique_jobs/lua/unlock.lua index 70d44bf1a..811768b72 100644 --- a/lib/sidekiq_unique_jobs/lua/unlock.lua +++ b/lib/sidekiq_unique_jobs/lua/unlock.lua @@ -14,15 +14,16 @@ local job_id = ARGV[1] local pttl = tonumber(ARGV[2]) local lock_type = ARGV[3] local limit = tonumber(ARGV[4]) +local score = ARGV[5] -------- END lock arguments ----------- -------- BEGIN injected arguments -------- -local current_time = tonumber(ARGV[5]) -local debug_lua = tostring(ARGV[6]) == "1" -local max_history = tonumber(ARGV[7]) -local script_name = tostring(ARGV[8]) .. ".lua" -local redisversion = ARGV[9] +local current_time = tonumber(ARGV[6]) +local debug_lua = tostring(ARGV[7]) == "1" +local max_history = tonumber(ARGV[8]) +local script_name = tostring(ARGV[9]) .. ".lua" +local redisversion = ARGV[10] --------- END injected arguments --------- diff --git a/spec/sidekiq_unique_jobs/lua/delete_spec.rb b/spec/sidekiq_unique_jobs/lua/delete_spec.rb index cb35f2610..f9043afa2 100644 --- a/spec/sidekiq_unique_jobs/lua/delete_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/delete_spec.rb @@ -9,6 +9,7 @@ lock_ttl, lock_type, lock_limit, + lock_score, ] end let(:job_id) { "jobid" } @@ -22,6 +23,8 @@ let(:lock_ttl) { nil } let(:locked_jid) { job_id } let(:lock_limit) { 1 } + let(:now_f) { SidekiqUniqueJobs.now_f } + let(:lock_score) { now_f.to_s } context "when queued" do before do diff --git a/spec/sidekiq_unique_jobs/lua/lock_spec.rb b/spec/sidekiq_unique_jobs/lua/lock_spec.rb index 672bbd89b..821f553cb 100644 --- a/spec/sidekiq_unique_jobs/lua/lock_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/lock_spec.rb @@ -3,8 +3,8 @@ RSpec.describe "lock.lua" do subject(:lock) { call_script(:lock, key.to_a, argv_one) } - let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit] } - let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit] } + let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit, lock_score] } + let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit, lock_score] } let(:job_id_one) { "job_id_one" } let(:job_id_two) { "job_id_two" } let(:lock_type) { :until_executed } @@ -19,6 +19,7 @@ let(:locked_jid) { job_id_one } let(:now_f) { SidekiqUniqueJobs.now_f } let(:lock_limit) { 1 } + let(:lock_score) { now_f.to_s } shared_context "with a primed key", :with_primed_key do before do diff --git a/spec/sidekiq_unique_jobs/lua/queue_spec.rb b/spec/sidekiq_unique_jobs/lua/queue_spec.rb index 9e31debc2..8c1bb0cc4 100644 --- a/spec/sidekiq_unique_jobs/lua/queue_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/queue_spec.rb @@ -10,6 +10,7 @@ lock_pttl, lock_type, lock_limit, + lock_score, ] end let(:digest) { "uniquejobs:digest" } @@ -21,6 +22,7 @@ let(:locked_jid) { job_id } let(:lock_limit) { 1 } let(:now_f) { SidekiqUniqueJobs.now_f } + let(:lock_score) { now_f.to_s } before do flush_redis @@ -54,7 +56,7 @@ context "when queued by another job_id" do before do - call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit]) + call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score]) end context "with lock_limit 1" do @@ -94,7 +96,7 @@ context "when queued by same job_id" do before do - call_script(:queue, key.to_a, [job_id_one, lock_pttl, lock_type, lock_limit]) + call_script(:queue, key.to_a, [job_id_one, lock_pttl, lock_type, lock_limit, lock_score]) end it "stores the right keys in redis" do @@ -113,9 +115,9 @@ context "when primed by another job_id" do before do - call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit]) + call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score]) rpoplpush(key.queued, key.primed) - call_script(:lock, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit]) + call_script(:lock, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score]) end context "with lock_limit 1" do diff --git a/spec/sidekiq_unique_jobs/lua/unlock_spec.rb b/spec/sidekiq_unique_jobs/lua/unlock_spec.rb index 9e82e98eb..1b5696b33 100644 --- a/spec/sidekiq_unique_jobs/lua/unlock_spec.rb +++ b/spec/sidekiq_unique_jobs/lua/unlock_spec.rb @@ -3,8 +3,8 @@ RSpec.describe "unlock.lua" do subject(:unlock) { call_script(:unlock, key.to_a, argv_one) } - let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit] } - let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit] } + let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit, lock_score] } + let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit, lock_score] } let(:job_id_one) { "job_id_one" } let(:job_id_two) { "job_id_two" } let(:lock_type) { :until_executed } @@ -17,6 +17,7 @@ let(:lock_ttl) { nil } let(:locked_jid) { job_id_one } let(:lock_limit) { 1 } + let(:lock_score) { now_f.to_s } shared_context "with a lock", :with_a_lock do before do