Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure a new lock isn't conflicting with itself #830

Merged
merged 3 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/batch_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module SidekiqUniqueJobs
class BatchDelete
#
# @return [Integer] the default batch size
BATCH_SIZE = 100
BATCH_SIZE = 500

#
# @return [Array<String>] Supported key suffixes
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module SidekiqUniqueJobs
DEAD_VERSION = "uniquejobs:dead"
DIGESTS = "uniquejobs:digests"
EXPIRING_DIGESTS = "uniquejobs:expiring_digests"
ORPHANED_DIGESTS = "uniquejobs:orphaned_digests"
ERRORS = "errors"
JID = "jid"
LIMIT = "limit"
Expand Down
23 changes: 13 additions & 10 deletions lib/sidekiq_unique_jobs/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class Lock # rubocop:disable Metrics/ClassLength
#
# @return [Lock] a newly lock that has been locked
#
def self.create(digest, job_id, lock_info = {})
lock = new(digest, time: Timing.now_f)
lock.lock(job_id, lock_info)
def self.create(digest, job_id, lock_info: {}, time: Timing.now_f, score: nil)
lock = new(digest, time: time)
lock.lock(job_id, lock_info, score)
lock
end

Expand Down Expand Up @@ -63,15 +63,16 @@ def initialize(key, time: nil)
#
# @return [void]
#
def lock(job_id, lock_info = {})
def lock(job_id, lock_info = {}, score = nil)
score ||= now_f
redis do |conn|
conn.multi do |pipeline|
pipeline.set(key.digest, job_id)
pipeline.hset(key.locked, job_id, now_f)
info.set(lock_info, pipeline)
add_digest_to_set(pipeline, lock_info)
pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "queue.lua", "Queued"))
pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "lock.lua", "Locked"))
add_digest_to_set(pipeline, lock_info, score)
pipeline.zadd(key.changelog, score, changelog_json(job_id, "queue.lua", "Queued"))
pipeline.zadd(key.changelog, score, changelog_json(job_id, "lock.lua", "Locked"))
end
end
end
Expand Down Expand Up @@ -333,12 +334,14 @@ def changelog_json(job_id, script, message)
#
# @return [nil]
#
def add_digest_to_set(pipeline, lock_info)
def add_digest_to_set(pipeline, lock_info, score = nil)
score ||= now_f
digest_string = key.digest

if lock_info["lock"] == :until_expired
pipeline.zadd(key.expiring_digests, now_f + lock_info["ttl"], digest_string)
pipeline.zadd(key.expiring_digests, score + lock_info["ttl"], digest_string)
else
pipeline.zadd(key.digests, now_f, digest_string)
pipeline.zadd(key.digests, score, digest_string)
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/sidekiq_unique_jobs/orphans/reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ class Reaper
#
# @return [Hash<Symbol, SidekiqUniqueJobs::Orphans::Reaper] the current implementation of reapers
REAPERS = {
lua: SidekiqUniqueJobs::Orphans::LuaReaper,
lua: SidekiqUniqueJobs::Orphans::RubyReaper,
ruby: SidekiqUniqueJobs::Orphans::RubyReaper,
none: SidekiqUniqueJobs::Orphans::NullReaper,
nil => SidekiqUniqueJobs::Orphans::NullReaper,
false => SidekiqUniqueJobs::Orphans::NullReaper,
true => SidekiqUniqueJobs::Orphans::RubyReaper,
}.freeze

#
Expand Down
33 changes: 26 additions & 7 deletions lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ module Orphans
class RubyReaper < Reaper
include SidekiqUniqueJobs::Timing

#
# @return [Integer] a best guess of Sidekiq::Launcher::BEAT_PAUSE
SIDEKIQ_BEAT_PAUSE = 10
#
# @return [String] the suffix for :RUN locks
RUN_SUFFIX = ":RUN"
Expand Down Expand Up @@ -74,25 +77,41 @@ def call

BatchDelete.call(expired_digests, conn)
BatchDelete.call(orphans, conn)

# orphans.each_slice(500) do |chunk|
# conn.pipelined do |pipeline|
# chunk.each do |digest|
# next if belongs_to_job?(digest)

# pipeline.zadd(ORPHANED_DIGESTS, now_f, digest)
# end
# end
# end
end

def expired_digests
max_score = (start_time - reaper_timeout).to_f

conn.zrange(EXPIRING_DIGESTS, 0, max_score, "byscore")
end

def orphaned_digests
conn.zrange(ORPHANED_DIGESTS, 0, max_score, "byscore")
end

def max_score
(start_time - reaper_timeout - SIDEKIQ_BEAT_PAUSE).to_f
end

#
# Find orphaned digests
#
#
# @return [Array<String>] an array of orphaned digests
#
def orphans # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
page = 0
per = reaper_count * 2
orphans = []
results = conn.zrange(digests.key, page * per, (page + 1) * per)
page = 0
per = reaper_count * 2
results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per)

while results.size.positive?
results.each do |digest|
Expand All @@ -107,7 +126,7 @@ def orphans # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
break if orphans.size >= reaper_count

page += 1
results = conn.zrange(digests.key, page * per, (page + 1) * per)
results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per)
end

orphans
Expand Down Expand Up @@ -218,7 +237,7 @@ def match?(key_one, key_two)
end

def considered_active?(time_f)
(Time.now - reaper_timeout).to_f < time_f
max_score < time_f
end

#
Expand Down
8 changes: 8 additions & 0 deletions lib/sidekiq_unique_jobs/redis/sorted_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ def add(values)
end
end

def byscore(min, max, offset: nil, count: nil)
redis do |conn|
return conn.zrange(key, min, max, "byscore") unless offset && count

conn.zrange(key, min, max, "byscore", "limit", offset, count)
end
end

#
# Return the zrak of the member
#
Expand Down
4 changes: 2 additions & 2 deletions spec/sidekiq_unique_jobs/lock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
its(:to_s) { is_expected.to eq(expected_string) }

describe ".create" do
subject(:create) { described_class.create(key, job_id, lock_info) }
subject(:create) { described_class.create(key, job_id, lock_info: lock_info) }

it "creates all expected keys in redis" do
create
Expand Down Expand Up @@ -77,7 +77,7 @@
describe "#del" do
subject(:del) { lock.del }

let(:lock) { described_class.create(key, job_id, info) }
let(:lock) { described_class.create(key, job_id, lock_info: info) }

it "creates keys and adds job_id to locked hash" do
expect { lock }.to change { entity.locked_jids }.to([job_id])
Expand Down
2 changes: 1 addition & 1 deletion spec/sidekiq_unique_jobs/lua/reap_orphans_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
end
let(:argv) { [100, threshold] }
let(:digest) { "uniquejobs:digest" }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info: lock_info) }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:created_at) { (Time.now - 1000).to_f }
Expand Down
13 changes: 11 additions & 2 deletions spec/sidekiq_unique_jobs/orphans/reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@
let(:digest) { "uniquejobs:digest" }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info: lock_info, score: score) }
let(:raw_item) { { "class" => MyUniqueJob, "args" => [], "jid" => job_id, "lock_digest" => digest } }

let(:score) do
(
Time.now -
SidekiqUniqueJobs.config.reaper_timeout -
SidekiqUniqueJobs::Orphans::RubyReaper::SIDEKIQ_BEAT_PAUSE -
100
).to_f
end
let(:lock_info) do
{
"job_id" => job_id,
Expand Down Expand Up @@ -152,7 +161,7 @@
end

context "when digest has :RUN suffix" do
let(:lock) { SidekiqUniqueJobs::Lock.create("#{digest}:RUN", job_id, lock_info) }
let(:lock) { SidekiqUniqueJobs::Lock.create("#{digest}:RUN", job_id, lock_info: lock_info) }

context "that matches current digest" do # rubocop:disable RSpec/NestedGroups
let(:created_at) { (Time.now - (reaper_timeout + 100)).to_f }
Expand Down
11 changes: 10 additions & 1 deletion spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
let(:digest) { "uniquejobs:digest" }
let(:job_id) { "job_id" }
let(:item) { raw_item }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info) }
let(:lock) { SidekiqUniqueJobs::Lock.create(digest, job_id, lock_info: lock_info, score: score) }
let(:raw_item) { { "class" => MyUniqueJob, "args" => [], "jid" => job_id, "lock_digest" => digest } }
let(:lock_info) do
{
Expand All @@ -20,6 +20,15 @@
}
end

let(:score) do
(
Time.now -
SidekiqUniqueJobs.config.reaper_timeout -
SidekiqUniqueJobs::Orphans::RubyReaper::SIDEKIQ_BEAT_PAUSE -
100
).to_f
end

before do
SidekiqUniqueJobs.disable!
lock
Expand Down