From 3f473f791d67c1c25bf6296055fbc2b3c5828dea Mon Sep 17 00:00:00 2001 From: Zeke Gabrielse Date: Fri, 16 Feb 2024 16:03:49 -0600 Subject: [PATCH] add performance test for replace conflict strategy --- .../unique_job_on_conflict_replace_spec.rb | 55 +++++++++++++++++++ spec/support/sidekiq_unique_jobs/testing.rb | 8 +-- .../workers/unique_job_on_conflict_replace.rb | 14 +++++ .../unique_job_on_conflict_replace_spec.rb | 18 ++++++ 4 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 spec/performance/unique_job_on_conflict_replace_spec.rb create mode 100644 spec/support/workers/unique_job_on_conflict_replace.rb create mode 100644 spec/workers/unique_job_on_conflict_replace_spec.rb diff --git a/spec/performance/unique_job_on_conflict_replace_spec.rb b/spec/performance/unique_job_on_conflict_replace_spec.rb new file mode 100644 index 000000000..e7f570819 --- /dev/null +++ b/spec/performance/unique_job_on_conflict_replace_spec.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +RSpec.describe UniqueJobOnConflictReplace, :perf do + let(:lock_prefix) { described_class.sidekiq_options.fetch("lock_prefix") { SidekiqUniqueJobs.config.lock_prefix } } + let(:lock_timeout) { described_class.sidekiq_options.fetch("lock_timeout") { SidekiqUniqueJobs.config.lock_timeout } } + let(:lock_ttl) { described_class.sidekiq_options.fetch("lock_ttl") { SidekiqUniqueJobs.config.lock_ttl } } + let(:queue) { described_class.sidekiq_options["queue"] } + let(:on_conflict) { described_class.sidekiq_options["on_conflict"] } + let(:lock) { described_class.sidekiq_options["lock"] } + + before do + flushdb + end + + context "when schedule queue is large" do + it "locks and replaces quickly" do + (0..100_000).each_slice(1_000) do |nums| + redis do |conn| + conn.pipelined do |pipeline| + nums.each do |num| + created_at = Time.now.to_f + scheduled_at = created_at + rand(3_600..2_592_000) + + payload = { + "retry" => true, + "queue" => queue, + "lock" => lock, + "on_conflict" => on_conflict, + "class" => described_class.name, + "args" => [num, { "type" => "extremely unique" }], + "jid" => SecureRandom.hex(12), + "created_at" => created_at, + "lock_timeout" => lock_timeout, + "lock_ttl" => lock_ttl, + "lock_prefix" => lock_prefix, + "lock_args" => [num, { "type" => "extremely unique" }], + "lock_digest" => "#{lock_prefix}:#{SecureRandom.hex}", + } + + pipeline.zadd("schedule", scheduled_at, payload.to_json) + end + end + end + end + + # queueing it once at the end of the queue should succeed + expect(described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" })).not_to be_nil + + # queueing it again should be performant + expect do + described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" }) + end.to perform_under(10).ms + end + end +end diff --git a/spec/support/sidekiq_unique_jobs/testing.rb b/spec/support/sidekiq_unique_jobs/testing.rb index 765521bec..8169aa9b3 100644 --- a/spec/support/sidekiq_unique_jobs/testing.rb +++ b/spec/support/sidekiq_unique_jobs/testing.rb @@ -59,12 +59,12 @@ def debug(*args) redis { |conn| conn.debug(*args) } end - def flushall(options = nil) - redis { |conn| conn.flushall(options) } + def flushall(...) + redis { |conn| conn.flushall(...) } end - def flushdb(options = nil) - redis { |conn| conn.flushdb(options) } + def flushdb(...) + redis { |conn| conn.flushdb(...) } end def info(_cmd = nil) diff --git a/spec/support/workers/unique_job_on_conflict_replace.rb b/spec/support/workers/unique_job_on_conflict_replace.rb new file mode 100644 index 000000000..64c6a5631 --- /dev/null +++ b/spec/support/workers/unique_job_on_conflict_replace.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +# :nocov: + +class UniqueJobOnConflictReplace + include Sidekiq::Worker + sidekiq_options lock: :until_executing, + queue: :customqueue, + on_conflict: :replace + + def perform(one, two) + [one, two] + end +end diff --git a/spec/workers/unique_job_on_conflict_replace_spec.rb b/spec/workers/unique_job_on_conflict_replace_spec.rb new file mode 100644 index 000000000..6571ef440 --- /dev/null +++ b/spec/workers/unique_job_on_conflict_replace_spec.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +RSpec.describe UniqueJobOnConflictReplace do + it_behaves_like "sidekiq with options" do + let(:options) do + { + "lock" => :until_executing, + "on_conflict" => :replace, + "queue" => :customqueue, + "retry" => true, + } + end + end + + it_behaves_like "a performing worker" do + let(:args) { ["hundred", { "type" => "extremely unique", "id" => 44 }] } + end +end