Skip to content

Commit

Permalink
Fix active worker detection by using correct keys (#756) (#799)
Browse files Browse the repository at this point in the history
* Fix active worker detection by using correct keys (#756)

Rename keys from :workers to :work since that's correct name set in Redis

* chore(lint): fix linter issues

* fix(unlock): ensure callback and unlock (#771)

* chore(deps): update gems (solargraph is awesome)

* fix(unlock): ensure unlock and callback runs

* chore(lint): lint'em real good
# Conflicts:
#	.github/workflows/rspec.yml
#	myapp/.tool-versions

* fix: backport the fix for the return value of #deep_transform_keys (#750)

Backport fix the return value of #deep_transform_keys

* Hide lock info debug suggestion on lock page if it's already enabled. (#763)

only show lock info suggestion if value is not already on

* chore(v7): backport fixes from v8

* chore(ci): backport ci changes from v8

---------

Co-authored-by: Dominik Szromik <[email protected]>
Co-authored-by: Egor Romanov <[email protected]>
Co-authored-by: Jeremiah <[email protected]>
  • Loading branch information
4 people authored Jul 14, 2023
1 parent 0253377 commit 0d9a4ea
Show file tree
Hide file tree
Showing 63 changed files with 295 additions and 195 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Lint
name: ci
on:
pull_request:
push:
Expand All @@ -14,7 +14,7 @@ jobs:
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.1
bundler: 2.3.19
bundler: 2.4.12
bundler-cache: true
- run: bin/bundle --jobs=$(nproc) --retry=$(nproc)
- run: bin/rubocop -P
Expand All @@ -29,7 +29,7 @@ jobs:
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.1
bundler: 2.3.19
bundler: 2.4.12
bundler-cache: true
- run: bin/bundle --jobs=$(nproc) --retry=$(nproc)
- run: bin/reek .
25 changes: 17 additions & 8 deletions .github/workflows/rspec.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: RSpec
name: ci
on:
pull_request:
push:
Expand All @@ -22,8 +22,8 @@ jobs:
- uses: actions/checkout@v3
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.1
bundler: 2.3.19
ruby-version: 3.2
bundler: 2.4.12
bundler-cache: true

- name: Install Code Climate reporter
Expand All @@ -42,7 +42,7 @@ jobs:
COV=true bin/rspec --require spec_helper --tag ~perf
./cc-test-reporter after-build --coverage-input-type simplecov --exit-code $?
tests:
rspec:
services:
toxiproxy:
image: ghcr.io/shopify/toxiproxy
Expand All @@ -59,14 +59,23 @@ jobs:
strategy:
fail-fast: true
matrix:
ruby: [2.5, 2.6, 2.7, '3.0', 3.1]
ruby: [2.7, '3.0', 3.1, 3.2]
gemfile:
- sidekiq_5.0
- sidekiq_5.1
- sidekiq_5.2
- sidekiq_6.0
- sidekiq_6.1
- sidekiq_6.2
- sidekiq_6.3
- sidekiq_6.4
- sidekiq_6.5

steps:
- uses: actions/checkout@v3
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
bundler: 2.3.19
bundler: 2.4.12
bundler-cache: true
- run: bin/appraisal install --jobs=$(nproc) --retry=$(nproc)
- run: bin/appraisal rspec --require spec_helper --tag ~perf
- run: bin/rspec --require spec_helper --tag ~perf
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ tmp/
/gemfiles/vendor/
/vendor/
/myapp/vendor/bundle/
/.bundle/
/myapp/node_modules/
/myapp/yarn-error.log
3 changes: 3 additions & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
nodejs 20.4.0
yarn 1.22.19
direnv 2.32.2
2 changes: 0 additions & 2 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ end

if respond_to?(:install_if)
install_if -> { RUBY_PLATFORM.include?("darwin") } do
gem "fasterer"
gem "fuubar"
gem "github_changelog_generator"
gem "pry"
gem "redcarpet", "~> 3.4"
gem "rspec-nc"
gem "ruby-prof", ">= 0.17.0", require: false
gem "stackprof", ">= 0.2.9", require: false
gem "test-prof"
Expand Down
2 changes: 1 addition & 1 deletion bin/_guard-core
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ENV["BUNDLE_GEMFILE"] ||= File.expand_path("../../Gemfile",
bundle_binstub = File.expand_path("bundle", __dir__)

if File.file?(bundle_binstub)
if /This file was generated by Bundler/.match?(File.read(bundle_binstub, 300))
if File.read(bundle_binstub, 300).include?("This file was generated by Bundler")
load(bundle_binstub)
else
abort("Your `bin/bundle` was not generated by Bundler, so this binstub cannot run.
Expand Down
2 changes: 1 addition & 1 deletion bin/appraisal
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ENV["BUNDLE_GEMFILE"] ||= File.expand_path("../../Gemfile",
bundle_binstub = File.expand_path("bundle", __dir__)

if File.file?(bundle_binstub)
if /This file was generated by Bundler/.match?(File.read(bundle_binstub, 300))
if File.read(bundle_binstub, 300).include?("This file was generated by Bundler")
load(bundle_binstub)
else
abort("Your `bin/bundle` was not generated by Bundler, so this binstub cannot run.
Expand Down
2 changes: 2 additions & 0 deletions bin/bundle
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ m = Module.new do
end

def bundler_version
# rubocop:disable ThreadSafety/InstanceVariableInClassMethod
@bundler_version ||=
env_var_version || cli_arg_version ||
lockfile_version
# rubocop:enable ThreadSafety/InstanceVariableInClassMethod
end

def bundler_requirement
Expand Down
2 changes: 1 addition & 1 deletion bin/code_climate_reek
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ENV["BUNDLE_GEMFILE"] ||= File.expand_path("../../Gemfile",
bundle_binstub = File.expand_path("bundle", __dir__)

if File.file?(bundle_binstub)
if /This file was generated by Bundler/.match?(File.read(bundle_binstub, 300))
if File.read(bundle_binstub, 300).include?("This file was generated by Bundler")
load(bundle_binstub)
else
abort("Your `bin/bundle` was not generated by Bundler, so this binstub cannot run.
Expand Down
2 changes: 1 addition & 1 deletion bin/rake
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ENV["BUNDLE_GEMFILE"] ||= File.expand_path("../../Gemfile",
bundle_binstub = File.expand_path("bundle", __dir__)

if File.file?(bundle_binstub)
if /This file was generated by Bundler/.match?(File.read(bundle_binstub, 300))
if File.read(bundle_binstub, 300).include?("This file was generated by Bundler")
load(bundle_binstub)
else
abort("Your `bin/bundle` was not generated by Bundler, so this binstub cannot run.
Expand Down
2 changes: 1 addition & 1 deletion bin/reek
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ENV["BUNDLE_GEMFILE"] ||= File.expand_path("../../Gemfile",
bundle_binstub = File.expand_path("bundle", __dir__)

if File.file?(bundle_binstub)
if /This file was generated by Bundler/.match?(File.read(bundle_binstub, 300))
if File.read(bundle_binstub, 300).include?("This file was generated by Bundler")
load(bundle_binstub)
else
abort("Your `bin/bundle` was not generated by Bundler, so this binstub cannot run.
Expand Down
2 changes: 1 addition & 1 deletion bin/rspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ENV["BUNDLE_GEMFILE"] ||= File.expand_path("../../Gemfile",
bundle_binstub = File.expand_path("bundle", __dir__)

if File.file?(bundle_binstub)
if /This file was generated by Bundler/.match?(File.read(bundle_binstub, 300))
if File.read(bundle_binstub, 300).include?("This file was generated by Bundler")
load(bundle_binstub)
else
abort("Your `bin/bundle` was not generated by Bundler, so this binstub cannot run.
Expand Down
2 changes: 1 addition & 1 deletion bin/rubocop
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ENV["BUNDLE_GEMFILE"] ||= File.expand_path("../../Gemfile",
bundle_binstub = File.expand_path("bundle", __dir__)

if File.file?(bundle_binstub)
if /This file was generated by Bundler/.match?(File.read(bundle_binstub, 300))
if File.read(bundle_binstub, 300).include?("This file was generated by Bundler")
load(bundle_binstub)
else
abort("Your `bin/bundle` was not generated by Bundler, so this binstub cannot run.
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/sidekiq_5.0.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
source "https://rubygems.org"

gem "appraisal"
gem "faraday-retry"
gem "gem-release"
gem "github-markup"
gem "rack-test"
gem "rake", "13.0.3"
gem "redis-namespace"
gem "reek", ">= 5.3"
gem "rspec"
gem "rspec-benchmark"
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
require "sidekiq_unique_jobs/logging/middleware_context"
require "sidekiq_unique_jobs/timing"
require "sidekiq_unique_jobs/sidekiq_worker_methods"
require "sidekiq_unique_jobs/lock_type"
require "sidekiq_unique_jobs/connection"
require "sidekiq_unique_jobs/exceptions"
require "sidekiq_unique_jobs/script"
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/batch_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def del_digest(pipeline, digest)

def keys_for_digest(digest)
[digest, "#{digest}:RUN"].each_with_object([]) do |key, digest_keys|
digest_keys.concat([key])
digest_keys.push(key)
digest_keys.concat(SUFFIXES.map { |suffix| "#{key}:#{suffix}" })
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/core_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def slice!(*keys)
def _deep_transform_keys_in_object(object, &block)
case object
when Hash
object.each_with_object({}) do |(key, value), result|
object.each_with_object(self.class.new) do |(key, value), result|
result[yield(key)] = _deep_transform_keys_in_object(value, &block)
end
when Array
Expand Down
5 changes: 5 additions & 0 deletions lib/sidekiq_unique_jobs/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Job
# @return [Hash] the job hash
def prepare(item)
stringify_on_conflict_hash(item)
add_lock_type(item)
add_lock_timeout(item)
add_lock_ttl(item)
add_digest(item)
Expand Down Expand Up @@ -54,5 +55,9 @@ def add_lock_digest(item)
def add_lock_prefix(item)
item[LOCK_PREFIX] ||= SidekiqUniqueJobs.config.lock_prefix
end

def add_lock_type(item)
item[LOCK] ||= SidekiqUniqueJobs::LockType.call(item)
end
end
end
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/lock/until_executed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def lock(&block)
def execute
executed = locksmith.execute do
yield
ensure
unlock_and_callback
end

Expand Down
3 changes: 1 addition & 2 deletions lib/sidekiq_unique_jobs/lock/while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ def execute(&block)
with_logging_context do
executed = locksmith.execute do
yield
callback_safely if locksmith.unlock
ensure
locksmith.unlock
unlock_and_callback
end

unless executed
Expand Down
37 changes: 37 additions & 0 deletions lib/sidekiq_unique_jobs/lock_type.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
# Calculates the lock type
#
class LockType
# includes "SidekiqUniqueJobs::SidekiqWorkerMethods"
# @!parse include SidekiqUniqueJobs::SidekiqWorkerMethods
include SidekiqUniqueJobs::SidekiqWorkerMethods

#
# Computes lock type from job arguments, sidekiq_options.
#
# @return [Symbol] the lock type
# @return [NilClass] if no lock type is found.
#
def self.call(item)
new(item).call
end

# @!attribute [r] item
# @return [Hash] the Sidekiq job hash
attr_reader :item

# @param [Hash] item the Sidekiq job hash
# @option item [Symbol, nil] :lock the type of lock to use.
# @option item [String] :class the class of the sidekiq worker
def initialize(item)
@item = item
self.job_class = item[CLASS]
end

def call
item[LOCK] || job_options[LOCK] || default_job_options[LOCK]
end
end
end
16 changes: 13 additions & 3 deletions lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ def unlock(conn = nil)
#
def unlock!(conn = nil)
call_script(:unlock, key.to_a, argv, conn) do |unlocked_jid|
reflect(:debug, :unlocked, item, unlocked_jid) if unlocked_jid == job_id
if unlocked_jid == job_id
reflect(:debug, :unlocked, item, unlocked_jid)
reflect(:unlocked, item)
end

unlocked_jid
end
Expand Down Expand Up @@ -248,8 +251,12 @@ def primed_async(conn, wait = nil, &block)
concurrent_timeout = add_drift(timeout)

reflect(:debug, :timeouts, item,
timeouts: { brpoplpush_timeout: brpoplpush_timeout, concurrent_timeout: concurrent_timeout })
timeouts: {
brpoplpush_timeout: brpoplpush_timeout,
concurrent_timeout: concurrent_timeout,
})

# NOTE: When debugging, change .value to .value!
primed_jid = Concurrent::Promises
.future(conn) { |red_con| pop_queued(red_con, timeout) }
.value
Expand Down Expand Up @@ -300,7 +307,10 @@ def pop_queued(conn, wait = 1)
def brpoplpush(conn, wait)
# passing timeout 0 to brpoplpush causes it to block indefinitely
raise InvalidArgument, "wait must be an integer" unless wait.is_a?(Integer)
return conn.brpoplpush(key.queued, key.primed, wait) if conn.class.to_s == "Redis::Namespace"

if defined?(::Redis::Namespace) && conn.instance_of?(::Redis::Namespace)
return conn.brpoplpush(key.queued, key.primed, wait)
end

if VersionCheck.satisfied?(redis_version, ">= 6.2.0") && conn.respond_to?(:blmove)
conn.blmove(key.queued, key.primed, "RIGHT", "LEFT", timeout: wait)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ local function find_digest_in_process_set(digest, threshold)
log_debug("Found number of processes:", #processes, "next cursor:", next_process_cursor)

for _, process in ipairs(processes) do
local workers_key = process .. ":workers"
local workers_key = process .. ":work"
log_debug("searching in process set:", process,
"for digest:", digest,
"cursor:", process_cursor)
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/options_with_fallback.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def lock_class
# The type of lock for this worker
#
#
# @return [Symbol]
# @return [Symbol, NilClass]
#
def lock_type
@lock_type ||= options[LOCK] || item[LOCK]
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq_unique_jobs/orphans/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def stop
# @return [<type>] <description>
#
def task
@task ||= default_task
@task ||= default_task # rubocop:disable ThreadSafety/InstanceVariableInClassMethod
end

#
Expand Down Expand Up @@ -100,7 +100,7 @@ def default_task
# @return [void]
#
def task=(task)
@task = task
@task = task # rubocop:disable ThreadSafety/InstanceVariableInClassMethod
end

#
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticCo
else
pipeline.exists(key)
end
pipeline.hgetall("#{key}:workers")
pipeline.hgetall("#{key}:work")
end

next unless valid
Expand Down
14 changes: 7 additions & 7 deletions lib/sidekiq_unique_jobs/script/caller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ def call_script(file_name, *args)
# Only used to reduce a little bit of duplication
# @see call_script
def do_call(file_name, conn, keys, argv)
argv = argv.dup.concat([
now_f,
debug_lua,
max_history,
file_name,
redis_version,
])
argv = argv.dup.push(
now_f,
debug_lua,
max_history,
file_name,
redis_version,
)
Script.execute(file_name, conn, keys: keys, argv: argv)
end

Expand Down
Loading

0 comments on commit 0d9a4ea

Please sign in to comment.