diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index 7dbd41b..21c68ab 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -3,18 +3,19 @@ module Que module Adapters class ActiveRecordWithLock < Que::Adapters::ActiveRecord - class NoLockableJobs < StandardError; end - FindJobSecondsTotal = Prometheus::Client::Counter.new( - :que_find_job_seconds_total, - docstring: "Seconds spent finding a job", - labels: %i[queue], - ) + METRICS = [ + FindJobSecondsTotal = Prometheus::Client::Counter.new( + :que_find_job_seconds_total, + docstring: "Seconds spent finding a job", + labels: %i[queue], + ), - FindJobHitTotal = Prometheus::Client::Counter.new( - :que_find_job_total, - docstring: "total number of job hit and misses when acquiring a lock", - labels: %i[queue job_hit], - ) + FindJobHitTotal = Prometheus::Client::Counter.new( + :que_find_job_hit_total, + docstring: "total number of job hit and misses when acquiring a lock", + labels: %i[queue job_hit], + ), + ].freeze def initialize(job_connection_pool:, lock_connection_pool:) @job_connection_pool = job_connection_pool @@ -50,7 +51,7 @@ def execute(command, params = []) def lock_job_with_lock_database(queue, cursor) loop do observe(duration_metric: FindJobSecondsTotal, labels: { queue: queue }) do - locked_job = Que.transaction do + Que.transaction do job_to_lock = Que.execute(:find_job_to_lock, [queue, cursor]) return job_to_lock if job_to_lock.empty? @@ -61,7 +62,6 @@ def lock_job_with_lock_database(queue, cursor) observe(count_metric: FindJobHitTotal, labels: { queue: queue, job_hit: job_locked }) return job_to_lock if job_locked end - return locked_job if locked_job end end end diff --git a/lib/que/middleware/worker_collector.rb b/lib/que/middleware/worker_collector.rb index 9d498b9..c90fb70 100644 --- a/lib/que/middleware/worker_collector.rb +++ b/lib/que/middleware/worker_collector.rb @@ -18,6 +18,7 @@ def initialize(app, options = {}) register(*WorkerGroup::METRICS) register(*Worker::METRICS) register(*Locker::METRICS) + register(*Adapters::ActiveRecordWithLock::METRICS) end def call(env) diff --git a/spec/lib/que/adapters/active_record_with_lock_spec.rb b/spec/lib/que/adapters/active_record_with_lock_spec.rb index 73607e9..f2c96f8 100644 --- a/spec/lib/que/adapters/active_record_with_lock_spec.rb +++ b/spec/lib/que/adapters/active_record_with_lock_spec.rb @@ -33,7 +33,9 @@ end it "sets correct metric values" do + expect(QueJob.count).to eq(10) with_workers(5) { wait_for_jobs_to_be_worked } + expect(QueJob.count).to eq(0) expect(described_class::FindJobHitTotal.values[{ :queue => "default", :job_hit => "true" }]).to eq(10.0) end end