-
Notifications
You must be signed in to change notification settings - Fork 190
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add view and job table enhancements for improved observability
* Que migration v8: que_jobs_ext view; add first_run_at column to que_jobs
- Loading branch information
1 parent
e9715e5
commit 95b4ed6
Showing
11 changed files
with
312 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
DROP VIEW IF EXISTS public.que_jobs_ext; | ||
|
||
ALTER TABLE que_jobs | ||
DROP COLUMN first_run_at; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
-- Add 'first_run_at' column to 'que_jobs'. The 'first_run_at' column will store the timestamp with time zone (timestamptz) | ||
-- of the initial scheduled execution time for que jobs. The column default is set to now, but realistically this will always | ||
-- be defaulted to the initial run_at value. This enhancement helps re-trace the execution of the job when reviewing failures. | ||
|
||
ALTER TABLE que_jobs | ||
ADD COLUMN first_run_at timestamptz NOT NULL DEFAULT now(); | ||
|
||
|
||
-- This view extends the functionality of the que job management system by providing an enriched view of que jobs. | ||
-- It combines data from the 'que_jobs' table and the 'pg_locks' table to present a comprehensive overview of que jobs, | ||
-- including their status, associated information, and locking details. The view is designed to facilitate the monitoring | ||
-- and management of que jobs, allowing you to track job statuses, locking details, and job-related information. | ||
|
||
-- Columns: | ||
-- - lock_id: Unique identifier for the lock associated with the job. | ||
-- - que_locker_pid: Process ID (PID) of the que job locker. | ||
-- - sub_class: The job class extracted from the job arguments. | ||
-- - updated_at: The most recent timestamp among 'run_at,' 'expired_at,' and 'finished_at.' | ||
-- - status: The status of the job, which can be 'running,' 'completed,' 'failed,' 'errored,' 'queued,' or 'scheduled.' | ||
|
||
CREATE OR REPLACE VIEW public.que_jobs_ext | ||
AS | ||
SELECT | ||
locks.id AS lock_id, | ||
locks.pid as que_locker_pid, | ||
(que_jobs.args -> 0) ->> 'job_class'::text AS sub_class, | ||
greatest(run_at, expired_at, finished_at) as updated_at, | ||
|
||
case | ||
when locks.id is not null then 'running' | ||
when finished_at is not null then 'completed' | ||
when expired_at is not null then 'failed' | ||
when error_count > 0 then 'errored' | ||
when run_at < now() then 'queued' | ||
else 'scheduled' | ||
end as status, | ||
|
||
-- que_jobs.*: | ||
que_jobs.id, | ||
que_jobs.priority, | ||
que_jobs.run_at, | ||
que_jobs.first_run_at, | ||
que_jobs.job_class, | ||
que_jobs.error_count, | ||
que_jobs.last_error_message, | ||
que_jobs.queue, | ||
que_jobs.last_error_backtrace, | ||
que_jobs.finished_at, | ||
que_jobs.expired_at, | ||
que_jobs.args, | ||
que_jobs.data, | ||
que_jobs.kwargs, | ||
que_jobs.job_schema_version | ||
|
||
FROM que_jobs | ||
LEFT JOIN ( | ||
SELECT | ||
(classid::bigint << 32) + objid::bigint AS id | ||
, pid | ||
FROM pg_locks | ||
WHERE pg_locks.locktype = 'advisory'::text) locks USING (id); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'spec_helper' | ||
|
||
describe "Que Jobs Ext View", skip: true do | ||
|
||
class TestJob < Que::Job | ||
include Que::JobMethods | ||
|
||
def default_resolve_action | ||
# prevents default deletion of complete jobs for testing purposes | ||
finish | ||
end | ||
end | ||
|
||
class TestFailedJob < TestJob | ||
def run | ||
raise Que::Error, 'Test Error' | ||
end | ||
end | ||
|
||
describe 'job.enqueue' do | ||
it "should mirror enqueued job" do | ||
assert_equal 0, jobs_dataset.count | ||
assert_equal 0, jobs_ext_dataset.count | ||
|
||
TestJob.enqueue( | ||
1, | ||
'two', | ||
string: "string", | ||
integer: 5, | ||
array: [1, "two", {three: 3}], | ||
hash: {one: 1, two: 'two', three: [3]}, | ||
job_options: { | ||
priority: 4, | ||
queue: 'special_queue_name', | ||
run_at: Time.now | ||
} | ||
) | ||
|
||
assert_equal 1, jobs_dataset.count | ||
assert_equal 1, jobs_ext_dataset.count | ||
|
||
job = jobs_dataset.first | ||
ext_job = jobs_ext_dataset.first | ||
assert_equal ext_job[:queue], job[:queue] | ||
assert_equal ext_job[:priority], job[:priority] | ||
assert_equal ext_job[:run_at], job[:run_at] | ||
assert_equal ext_job[:first_run_at], job[:first_run_at] | ||
assert_equal ext_job[:job_class], job[:job_class] | ||
assert_equal ext_job[:args], job[:args] | ||
assert_equal ext_job[:job_schema_version], job[:job_schema_version] | ||
|
||
jobs_dataset.delete | ||
|
||
assert_equal 0, jobs_dataset.count | ||
assert_equal 0, jobs_ext_dataset.count | ||
end | ||
|
||
it "should include additional lock data" do | ||
locker_settings.clear | ||
locker_settings[:listen] = false | ||
locker_settings[:poll_interval] = 0.02 | ||
locker | ||
|
||
TestJob.enqueue | ||
|
||
sleep_until { locked_ids.count.positive? && locked_ids.first == jobs_ext_dataset.first[:lock_id] } | ||
|
||
locker.stop! | ||
|
||
jobs_dataset.delete | ||
end | ||
|
||
it "should add additional updated_at" do | ||
TestJob.enqueue | ||
|
||
ext_job = jobs_ext_dataset.first | ||
|
||
assert_equal ext_job[:run_at], ext_job[:updated_at] | ||
|
||
locker | ||
|
||
sleep_until_equal(1) { finished_jobs_dataset.count } | ||
|
||
locker.stop! | ||
|
||
ext_job = jobs_ext_dataset.first | ||
|
||
assert_equal ext_job[:finished_at], ext_job[:updated_at] | ||
|
||
jobs_dataset.delete | ||
end | ||
|
||
describe "should include additional status" do | ||
|
||
let(:notified_errors) { [] } | ||
|
||
it "should set status to scheduled when run_at is in the future" do | ||
TestJob.enqueue(job_options: { run_at: Time.now + 1 }) | ||
|
||
assert_equal jobs_ext_dataset.first[:status], 'scheduled' | ||
|
||
jobs_dataset.delete | ||
end | ||
|
||
it "should set status to queued when run_at is in the past and the job is not currently running, completed, failed or errored" do | ||
TestJob.enqueue(job_options: { run_at: Time.now - 1 }) | ||
|
||
assert_equal jobs_ext_dataset.first[:status], 'queued' | ||
|
||
jobs_dataset.delete | ||
end | ||
|
||
it "should set status to running when the job has a lock associated with it" do | ||
locker_settings.clear | ||
locker_settings[:listen] = false | ||
locker_settings[:poll_interval] = 0.02 | ||
locker | ||
|
||
TestJob.enqueue | ||
|
||
sleep_until { locked_ids.count.positive? && locked_ids.first == jobs_ext_dataset.first[:lock_id] && jobs_ext_dataset.first[:status] == 'running' } | ||
|
||
locker.stop! | ||
|
||
jobs_dataset.delete | ||
end | ||
|
||
it "should set status to complete when finished_at is present" do | ||
TestJob.enqueue | ||
|
||
locker | ||
|
||
sleep_until_equal(1) { DB[:que_lockers].count } | ||
|
||
sleep_until { finished_jobs_dataset.count.positive? } | ||
|
||
locker.stop! | ||
|
||
assert_equal jobs_ext_dataset.first[:status], 'completed' | ||
|
||
jobs_dataset.delete | ||
end | ||
|
||
it "should set status to errored when error_count is positive and expired_at is not present" do | ||
Que.error_notifier = proc { |e| notified_errors << e } | ||
|
||
TestFailedJob.class_eval do | ||
self.maximum_retry_count = 100 # prevent from entering failed state on first error | ||
end | ||
|
||
locker | ||
|
||
sleep_until_equal(1) { DB[:que_lockers].count } | ||
|
||
TestFailedJob.enqueue | ||
|
||
sleep_until { errored_jobs_dataset.where(expired_at: nil).count.positive? } | ||
|
||
locker.stop! | ||
|
||
ext_job = jobs_ext_dataset.first | ||
|
||
assert_equal ext_job[:status], 'errored' | ||
assert_equal notified_errors.count, 1 | ||
assert_equal notified_errors.first.message, 'Test Error' | ||
|
||
|
||
jobs_dataset.delete | ||
end | ||
|
||
it "should set status to failed when expired_at is present" do | ||
TestFailedJob.class_eval do | ||
self.maximum_retry_count = 0 | ||
end | ||
|
||
Que.error_notifier = proc { |e| notified_errors << e } | ||
|
||
locker | ||
|
||
sleep_until_equal(1) { DB[:que_lockers].count } | ||
|
||
TestFailedJob.enqueue | ||
|
||
sleep_until { expired_jobs_dataset.count.positive? } | ||
|
||
locker.stop! | ||
|
||
assert_equal jobs_ext_dataset.first[:status], 'failed' | ||
assert_equal notified_errors.count, 1 | ||
assert_equal notified_errors.first.message, 'Test Error' | ||
|
||
|
||
jobs_dataset.delete | ||
end | ||
end | ||
end | ||
end |
Oops, something went wrong.