Skip to content

Commit

Permalink
Que migration v8: que_jobs_ext view; add first_run_at column to que_jobs
Browse files Browse the repository at this point in the history
FIX bulk_insert_jobs and insert_jobs don't pass redundant arg for first_run_at (use run_at)

cleanup. specs. documentation

migrations and specs support backwards compatibility

PR change requests.

remove unnecessary db_version check in introspection spec.

Add view and job table enhancements for improved observability
  • Loading branch information
maxschridde1494 committed Oct 24, 2023
1 parent a0e652c commit cd6ec2d
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 12 deletions.
15 changes: 15 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
- [Enqueueing jobs in bulk](#enqueueing-jobs-in-bulk)
- [Expired jobs](#expired-jobs)
- [Finished jobs](#finished-jobs)
- [Que Jobs Ext view](#que-jobs-ext-view)

<!-- /MarkdownTOC -->

Expand Down Expand Up @@ -879,3 +880,17 @@ SET LOCAL que.skip_notify TO true;
DELETE FROM que_jobs WHERE finished_at < (select now() - interval '7 days');
COMMIT;
```

## Que Jobs Ext view

This view extends the functionality of the que job management system by providing an enriched view of que jobs. It combines data from the 'que_jobs' table and the 'pg_locks' table to present a comprehensive overview of que jobs, including their status, associated information, and locking details.

This view is designed to facilitate the monitoring and management of que jobs, allowing you to track job statuses, locking details, and job-related information.

Additional Columns:

- lock_id: Unique identifier for the lock associated with the job.
- que_locker_pid: Process ID (PID) of the que job locker.
- sub_class: The job class extracted from the job arguments.
- updated_at: The most recent timestamp among 'run_at,' 'expired_at,' and 'finished_at.'
- status: The status of the job, which can be 'running,' 'completed,' 'failed,' 'errored,' 'queued,' or 'scheduled.'
13 changes: 8 additions & 5 deletions lib/que/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Job
SQL[:insert_job] =
%{
INSERT INTO public.que_jobs
(queue, priority, run_at, job_class, args, kwargs, data, job_schema_version)
(queue, priority, run_at, job_class, args, kwargs, data, job_schema_version, first_run_at)
VALUES
(
coalesce($1, 'default')::text,
Expand All @@ -22,7 +22,8 @@ class Job
coalesce($5, '[]')::jsonb,
coalesce($6, '{}')::jsonb,
coalesce($7, '{}')::jsonb,
#{Que.job_schema_version}
#{Que.job_schema_version},
coalesce($3, now())::timestamptz
)
RETURNING *
}
Expand All @@ -33,7 +34,7 @@ class Job
SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb)
)
INSERT INTO public.que_jobs
(queue, priority, run_at, job_class, args, kwargs, data, job_schema_version)
(queue, priority, run_at, job_class, args, kwargs, data, job_schema_version, first_run_at)
SELECT
coalesce($1, 'default')::text,
coalesce($2, 100)::smallint,
Expand All @@ -42,7 +43,8 @@ class Job
args_and_kwargs.args,
args_and_kwargs.kwargs,
coalesce($6, '{}')::jsonb,
#{Que.job_schema_version}
#{Que.job_schema_version},
coalesce($3, now())::timestamptz
FROM args_and_kwargs
RETURNING *
}
Expand Down Expand Up @@ -75,7 +77,8 @@ class << self
:maximum_retry_count,
:queue,
:priority,
:run_at
:run_at,
:first_run_at

def enqueue(*args)
args, kwargs = Que.split_out_ruby2_keywords(args)
Expand Down
2 changes: 1 addition & 1 deletion lib/que/migrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Que
module Migrations
# In order to ship a schema change, add the relevant up and down sql files
# to the migrations directory, and bump the version here.
CURRENT_VERSION = 7
CURRENT_VERSION = 8

class << self
def migrate!(version:)
Expand Down
4 changes: 4 additions & 0 deletions lib/que/migrations/8/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP VIEW IF EXISTS public.que_jobs_ext;

ALTER TABLE que_jobs
DROP COLUMN first_run_at;
61 changes: 61 additions & 0 deletions lib/que/migrations/8/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
-- Add 'first_run_at' column to 'que_jobs'. The 'first_run_at' column will store the timestamp with time zone (timestamptz)
-- of the initial scheduled execution time for que jobs. The column default is set to now, but realistically this will always
-- be defaulted to the initial run_at value. This enhancement helps re-trace the execution of the job when reviewing failures.

ALTER TABLE que_jobs
ADD COLUMN first_run_at timestamptz NOT NULL DEFAULT now();


-- This view extends the functionality of the que job management system by providing an enriched view of que jobs.
-- It combines data from the 'que_jobs' table and the 'pg_locks' table to present a comprehensive overview of que jobs,
-- including their status, associated information, and locking details. The view is designed to facilitate the monitoring
-- and management of que jobs, allowing you to track job statuses, locking details, and job-related information.

-- Columns:
-- - lock_id: Unique identifier for the lock associated with the job.
-- - que_locker_pid: Process ID (PID) of the que job locker.
-- - sub_class: The job class extracted from the job arguments.
-- - updated_at: The most recent timestamp among 'run_at,' 'expired_at,' and 'finished_at.'
-- - status: The status of the job, which can be 'running,' 'completed,' 'failed,' 'errored,' 'queued,' or 'scheduled.'

CREATE OR REPLACE VIEW public.que_jobs_ext
AS
SELECT
locks.id AS lock_id,
locks.pid as que_locker_pid,
(que_jobs.args -> 0) ->> 'job_class'::text AS sub_class,
greatest(run_at, expired_at, finished_at) as updated_at,

case
when locks.id is not null then 'running'
when finished_at is not null then 'completed'
when expired_at is not null then 'failed'
when error_count > 0 then 'errored'
when run_at < now() then 'queued'
else 'scheduled'
end as status,

-- que_jobs.*:
que_jobs.id,
que_jobs.priority,
que_jobs.run_at,
que_jobs.first_run_at,
que_jobs.job_class,
que_jobs.error_count,
que_jobs.last_error_message,
que_jobs.queue,
que_jobs.last_error_backtrace,
que_jobs.finished_at,
que_jobs.expired_at,
que_jobs.args,
que_jobs.data,
que_jobs.kwargs,
que_jobs.job_schema_version

FROM que_jobs
LEFT JOIN (
SELECT
(classid::bigint << 32) + objid::bigint AS id
, pid
FROM pg_locks
WHERE pg_locks.locktype = 'advisory'::text) locks USING (id);
2 changes: 1 addition & 1 deletion lib/que/utils/queue_management.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def clear!
# Very old migrations may use Que.create! and Que.drop!, which just
# created and dropped the initial version of the jobs table.
def create!; migrate!(version: 1); end
def drop!; migrate!(version: 0); end
def drop!; migrate!(version: 0); end
end
end
end
1 change: 1 addition & 0 deletions spec/que/job.bulk_enqueue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def assert_enqueue(
assert_equal expected_job_class.to_s, job[:job_class]
assert_equal expected_args[i], job[:args]
assert_equal expected_kwargs[i], job[:kwargs]
assert_equal job[:run_at], job[:first_run_at]
end

jobs_dataset.delete
Expand Down
1 change: 1 addition & 0 deletions spec/que/job.enqueue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def assert_enqueue(
assert_equal expected_job_class.to_s, job[:job_class]
assert_equal expected_args, job[:args]
assert_equal expected_job_schema_version, job[:job_schema_version]
assert_equal job[:first_run_at], job[:run_at]

jobs_dataset.delete
end
Expand Down
8 changes: 4 additions & 4 deletions spec/que/utils/introspection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@
locker.stop!

state = states.first
assert_equal \
%i(priority run_at id job_class error_count last_error_message queue
last_error_backtrace finished_at expired_at args data job_schema_version kwargs ruby_hostname ruby_pid),
state.keys
expected_keys = %i(priority run_at id job_class error_count last_error_message queue
last_error_backtrace finished_at expired_at args data job_schema_version kwargs ruby_hostname ruby_pid first_run_at)

assert_equal expected_keys.sort, state.keys.sort

assert_equal 2, state[:priority]
assert_in_delta state[:run_at], Time.now, QueSpec::TIME_SKEW
Expand Down
199 changes: 199 additions & 0 deletions spec/que/views/que_jobs_ext_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# frozen_string_literal: true

require 'spec_helper'

describe "Que Jobs Ext View", skip: true do

class TestJob < Que::Job
include Que::JobMethods

def default_resolve_action
# prevents default deletion of complete jobs for testing purposes
finish
end
end

class TestFailedJob < TestJob
def run
raise Que::Error, 'Test Error'
end
end

describe 'job.enqueue' do
it "should mirror enqueued job" do
assert_equal 0, jobs_dataset.count
assert_equal 0, jobs_ext_dataset.count

TestJob.enqueue(
1,
'two',
string: "string",
integer: 5,
array: [1, "two", {three: 3}],
hash: {one: 1, two: 'two', three: [3]},
job_options: {
priority: 4,
queue: 'special_queue_name',
run_at: Time.now
}
)

assert_equal 1, jobs_dataset.count
assert_equal 1, jobs_ext_dataset.count

job = jobs_dataset.first
ext_job = jobs_ext_dataset.first
assert_equal ext_job[:queue], job[:queue]
assert_equal ext_job[:priority], job[:priority]
assert_equal ext_job[:run_at], job[:run_at]
assert_equal ext_job[:first_run_at], job[:first_run_at]
assert_equal ext_job[:job_class], job[:job_class]
assert_equal ext_job[:args], job[:args]
assert_equal ext_job[:job_schema_version], job[:job_schema_version]

jobs_dataset.delete

assert_equal 0, jobs_dataset.count
assert_equal 0, jobs_ext_dataset.count
end

it "should include additional lock data" do
locker_settings.clear
locker_settings[:listen] = false
locker_settings[:poll_interval] = 0.02
locker

TestJob.enqueue

sleep_until { locked_ids.count.positive? && locked_ids.first == jobs_ext_dataset.first[:lock_id] }

locker.stop!

jobs_dataset.delete
end

it "should add additional updated_at" do
TestJob.enqueue

ext_job = jobs_ext_dataset.first

assert_equal ext_job[:run_at], ext_job[:updated_at]

locker

sleep_until_equal(1) { finished_jobs_dataset.count }

locker.stop!

ext_job = jobs_ext_dataset.first

assert_equal ext_job[:finished_at], ext_job[:updated_at]

jobs_dataset.delete
end

describe "should include additional status" do

let(:notified_errors) { [] }

it "should set status to scheduled when run_at is in the future" do
TestJob.enqueue(job_options: { run_at: Time.now + 1 })

assert_equal jobs_ext_dataset.first[:status], 'scheduled'

jobs_dataset.delete
end

it "should set status to queued when run_at is in the past and the job is not currently running, completed, failed or errored" do
TestJob.enqueue(job_options: { run_at: Time.now - 1 })

assert_equal jobs_ext_dataset.first[:status], 'queued'

jobs_dataset.delete
end

it "should set status to running when the job has a lock associated with it" do
locker_settings.clear
locker_settings[:listen] = false
locker_settings[:poll_interval] = 0.02
locker

TestJob.enqueue

sleep_until { locked_ids.count.positive? && locked_ids.first == jobs_ext_dataset.first[:lock_id] && jobs_ext_dataset.first[:status] == 'running' }

locker.stop!

jobs_dataset.delete
end

it "should set status to complete when finished_at is present" do
TestJob.enqueue

locker

sleep_until_equal(1) { DB[:que_lockers].count }

sleep_until { finished_jobs_dataset.count.positive? }

locker.stop!

assert_equal jobs_ext_dataset.first[:status], 'completed'

jobs_dataset.delete
end

it "should set status to errored when error_count is positive and expired_at is not present" do
Que.error_notifier = proc { |e| notified_errors << e }

TestFailedJob.class_eval do
self.maximum_retry_count = 100 # prevent from entering failed state on first error
end

locker

sleep_until_equal(1) { DB[:que_lockers].count }

TestFailedJob.enqueue

sleep_until { errored_jobs_dataset.where(expired_at: nil).count.positive? }

locker.stop!

ext_job = jobs_ext_dataset.first

assert_equal ext_job[:status], 'errored'
assert_equal notified_errors.count, 1
assert_equal notified_errors.first.message, 'Test Error'


jobs_dataset.delete
end

it "should set status to failed when expired_at is present" do
TestFailedJob.class_eval do
self.maximum_retry_count = 0
end

Que.error_notifier = proc { |e| notified_errors << e }

locker

sleep_until_equal(1) { DB[:que_lockers].count }

TestFailedJob.enqueue

sleep_until { expired_jobs_dataset.count.positive? }

locker.stop!

assert_equal jobs_ext_dataset.first[:status], 'failed'
assert_equal notified_errors.count, 1
assert_equal notified_errors.first.message, 'Test Error'


jobs_dataset.delete
end
end
end
end
Loading

0 comments on commit cd6ec2d

Please sign in to comment.