From 95b4ed6a0c083319be7bfdf64d521e4fffca2fcc Mon Sep 17 00:00:00 2001 From: Max Jacob Schridde Date: Tue, 24 Oct 2023 10:42:23 -0700 Subject: [PATCH] Add view and job table enhancements for improved observability * Que migration v8: que_jobs_ext view; add first_run_at column to que_jobs --- docs/README.md | 15 ++ lib/que/job.rb | 13 +- lib/que/migrations.rb | 2 +- lib/que/migrations/8/down.sql | 4 + lib/que/migrations/8/up.sql | 61 ++++++++ lib/que/utils/queue_management.rb | 2 +- spec/que/job.bulk_enqueue_spec.rb | 1 + spec/que/job.enqueue_spec.rb | 1 + spec/que/utils/introspection_spec.rb | 8 +- spec/que/views/que_jobs_ext_spec.rb | 199 +++++++++++++++++++++++++++ spec/spec_helper.rb | 18 ++- 11 files changed, 312 insertions(+), 12 deletions(-) create mode 100644 lib/que/migrations/8/down.sql create mode 100644 lib/que/migrations/8/up.sql create mode 100644 spec/que/views/que_jobs_ext_spec.rb diff --git a/docs/README.md b/docs/README.md index 4b5e1f46..0c4468cd 100644 --- a/docs/README.md +++ b/docs/README.md @@ -56,6 +56,7 @@ - [Enqueueing jobs in bulk](#enqueueing-jobs-in-bulk) - [Expired jobs](#expired-jobs) - [Finished jobs](#finished-jobs) +- [Que Jobs Ext view](#que-jobs-ext-view) @@ -879,3 +880,17 @@ SET LOCAL que.skip_notify TO true; DELETE FROM que_jobs WHERE finished_at < (select now() - interval '7 days'); COMMIT; ``` + +## Que Jobs Ext view + +This view extends the functionality of the que job management system by providing an enriched view of que jobs. It combines data from the 'que_jobs' table and the 'pg_locks' table to present a comprehensive overview of que jobs, including their status, associated information, and locking details. + +This view is designed to facilitate the monitoring and management of que jobs, allowing you to track job statuses, locking details, and job-related information. + +Additional Columns: + +- lock_id: Unique identifier for the lock associated with the job. +- que_locker_pid: Process ID (PID) of the que job locker. +- sub_class: The job class extracted from the job arguments. +- updated_at: The most recent timestamp among 'run_at,' 'expired_at,' and 'finished_at.' +- status: The status of the job, which can be 'running,' 'completed,' 'failed,' 'errored,' 'queued,' or 'scheduled.' \ No newline at end of file diff --git a/lib/que/job.rb b/lib/que/job.rb index 6093b162..319096d3 100644 --- a/lib/que/job.rb +++ b/lib/que/job.rb @@ -12,7 +12,7 @@ class Job SQL[:insert_job] = %{ INSERT INTO public.que_jobs - (queue, priority, run_at, job_class, args, kwargs, data, job_schema_version) + (queue, priority, run_at, job_class, args, kwargs, data, job_schema_version, first_run_at) VALUES ( coalesce($1, 'default')::text, @@ -22,7 +22,8 @@ class Job coalesce($5, '[]')::jsonb, coalesce($6, '{}')::jsonb, coalesce($7, '{}')::jsonb, - #{Que.job_schema_version} + #{Que.job_schema_version}, + coalesce($3, now())::timestamptz ) RETURNING * } @@ -33,7 +34,7 @@ class Job SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb) ) INSERT INTO public.que_jobs - (queue, priority, run_at, job_class, args, kwargs, data, job_schema_version) + (queue, priority, run_at, job_class, args, kwargs, data, job_schema_version, first_run_at) SELECT coalesce($1, 'default')::text, coalesce($2, 100)::smallint, @@ -42,7 +43,8 @@ class Job args_and_kwargs.args, args_and_kwargs.kwargs, coalesce($6, '{}')::jsonb, - #{Que.job_schema_version} + #{Que.job_schema_version}, + coalesce($3, now())::timestamptz FROM args_and_kwargs RETURNING * } @@ -75,7 +77,8 @@ class << self :maximum_retry_count, :queue, :priority, - :run_at + :run_at, + :first_run_at def enqueue(*args) args, kwargs = Que.split_out_ruby2_keywords(args) diff --git a/lib/que/migrations.rb b/lib/que/migrations.rb index 255ae6da..967a0291 100644 --- a/lib/que/migrations.rb +++ b/lib/que/migrations.rb @@ -4,7 +4,7 @@ module Que module Migrations # In order to ship a schema change, add the relevant up and down sql files # to the migrations directory, and bump the version here. - CURRENT_VERSION = 7 + CURRENT_VERSION = 8 class << self def migrate!(version:) diff --git a/lib/que/migrations/8/down.sql b/lib/que/migrations/8/down.sql new file mode 100644 index 00000000..8f6acbb5 --- /dev/null +++ b/lib/que/migrations/8/down.sql @@ -0,0 +1,4 @@ +DROP VIEW IF EXISTS public.que_jobs_ext; + +ALTER TABLE que_jobs + DROP COLUMN first_run_at; diff --git a/lib/que/migrations/8/up.sql b/lib/que/migrations/8/up.sql new file mode 100644 index 00000000..d4feb91d --- /dev/null +++ b/lib/que/migrations/8/up.sql @@ -0,0 +1,61 @@ +-- Add 'first_run_at' column to 'que_jobs'. The 'first_run_at' column will store the timestamp with time zone (timestamptz) +-- of the initial scheduled execution time for que jobs. The column default is set to now, but realistically this will always +-- be defaulted to the initial run_at value. This enhancement helps re-trace the execution of the job when reviewing failures. + +ALTER TABLE que_jobs + ADD COLUMN first_run_at timestamptz NOT NULL DEFAULT now(); + + +-- This view extends the functionality of the que job management system by providing an enriched view of que jobs. +-- It combines data from the 'que_jobs' table and the 'pg_locks' table to present a comprehensive overview of que jobs, +-- including their status, associated information, and locking details. The view is designed to facilitate the monitoring +-- and management of que jobs, allowing you to track job statuses, locking details, and job-related information. + +-- Columns: +-- - lock_id: Unique identifier for the lock associated with the job. +-- - que_locker_pid: Process ID (PID) of the que job locker. +-- - sub_class: The job class extracted from the job arguments. +-- - updated_at: The most recent timestamp among 'run_at,' 'expired_at,' and 'finished_at.' +-- - status: The status of the job, which can be 'running,' 'completed,' 'failed,' 'errored,' 'queued,' or 'scheduled.' + +CREATE OR REPLACE VIEW public.que_jobs_ext +AS +SELECT + locks.id AS lock_id, + locks.pid as que_locker_pid, + (que_jobs.args -> 0) ->> 'job_class'::text AS sub_class, + greatest(run_at, expired_at, finished_at) as updated_at, + + case + when locks.id is not null then 'running' + when finished_at is not null then 'completed' + when expired_at is not null then 'failed' + when error_count > 0 then 'errored' + when run_at < now() then 'queued' + else 'scheduled' + end as status, + + -- que_jobs.*: + que_jobs.id, + que_jobs.priority, + que_jobs.run_at, + que_jobs.first_run_at, + que_jobs.job_class, + que_jobs.error_count, + que_jobs.last_error_message, + que_jobs.queue, + que_jobs.last_error_backtrace, + que_jobs.finished_at, + que_jobs.expired_at, + que_jobs.args, + que_jobs.data, + que_jobs.kwargs, + que_jobs.job_schema_version + + FROM que_jobs + LEFT JOIN ( + SELECT + (classid::bigint << 32) + objid::bigint AS id + , pid + FROM pg_locks + WHERE pg_locks.locktype = 'advisory'::text) locks USING (id); \ No newline at end of file diff --git a/lib/que/utils/queue_management.rb b/lib/que/utils/queue_management.rb index 61789495..9d25d90e 100644 --- a/lib/que/utils/queue_management.rb +++ b/lib/que/utils/queue_management.rb @@ -12,7 +12,7 @@ def clear! # Very old migrations may use Que.create! and Que.drop!, which just # created and dropped the initial version of the jobs table. def create!; migrate!(version: 1); end - def drop!; migrate!(version: 0); end + def drop!; migrate!(version: 0); end end end end diff --git a/spec/que/job.bulk_enqueue_spec.rb b/spec/que/job.bulk_enqueue_spec.rb index 4a1b85b8..3b0dfe88 100644 --- a/spec/que/job.bulk_enqueue_spec.rb +++ b/spec/que/job.bulk_enqueue_spec.rb @@ -44,6 +44,7 @@ def assert_enqueue( assert_equal expected_job_class.to_s, job[:job_class] assert_equal expected_args[i], job[:args] assert_equal expected_kwargs[i], job[:kwargs] + assert_equal job[:run_at], job[:first_run_at] end jobs_dataset.delete diff --git a/spec/que/job.enqueue_spec.rb b/spec/que/job.enqueue_spec.rb index bdb87817..71fc987a 100644 --- a/spec/que/job.enqueue_spec.rb +++ b/spec/que/job.enqueue_spec.rb @@ -39,6 +39,7 @@ def assert_enqueue( assert_equal expected_job_class.to_s, job[:job_class] assert_equal expected_args, job[:args] assert_equal expected_job_schema_version, job[:job_schema_version] + assert_equal job[:first_run_at], job[:run_at] jobs_dataset.delete end diff --git a/spec/que/utils/introspection_spec.rb b/spec/que/utils/introspection_spec.rb index c3711fa1..1d378696 100644 --- a/spec/que/utils/introspection_spec.rb +++ b/spec/que/utils/introspection_spec.rb @@ -65,10 +65,10 @@ locker.stop! state = states.first - assert_equal \ - %i(priority run_at id job_class error_count last_error_message queue - last_error_backtrace finished_at expired_at args data job_schema_version kwargs ruby_hostname ruby_pid), - state.keys + expected_keys = %i(priority run_at id job_class error_count last_error_message queue + last_error_backtrace finished_at expired_at args data job_schema_version kwargs ruby_hostname ruby_pid first_run_at) + + assert_equal expected_keys.sort, state.keys.sort assert_equal 2, state[:priority] assert_in_delta state[:run_at], Time.now, QueSpec::TIME_SKEW diff --git a/spec/que/views/que_jobs_ext_spec.rb b/spec/que/views/que_jobs_ext_spec.rb new file mode 100644 index 00000000..a90bf989 --- /dev/null +++ b/spec/que/views/que_jobs_ext_spec.rb @@ -0,0 +1,199 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe "Que Jobs Ext View", skip: true do + + class TestJob < Que::Job + include Que::JobMethods + + def default_resolve_action + # prevents default deletion of complete jobs for testing purposes + finish + end + end + + class TestFailedJob < TestJob + def run + raise Que::Error, 'Test Error' + end + end + + describe 'job.enqueue' do + it "should mirror enqueued job" do + assert_equal 0, jobs_dataset.count + assert_equal 0, jobs_ext_dataset.count + + TestJob.enqueue( + 1, + 'two', + string: "string", + integer: 5, + array: [1, "two", {three: 3}], + hash: {one: 1, two: 'two', three: [3]}, + job_options: { + priority: 4, + queue: 'special_queue_name', + run_at: Time.now + } + ) + + assert_equal 1, jobs_dataset.count + assert_equal 1, jobs_ext_dataset.count + + job = jobs_dataset.first + ext_job = jobs_ext_dataset.first + assert_equal ext_job[:queue], job[:queue] + assert_equal ext_job[:priority], job[:priority] + assert_equal ext_job[:run_at], job[:run_at] + assert_equal ext_job[:first_run_at], job[:first_run_at] + assert_equal ext_job[:job_class], job[:job_class] + assert_equal ext_job[:args], job[:args] + assert_equal ext_job[:job_schema_version], job[:job_schema_version] + + jobs_dataset.delete + + assert_equal 0, jobs_dataset.count + assert_equal 0, jobs_ext_dataset.count + end + + it "should include additional lock data" do + locker_settings.clear + locker_settings[:listen] = false + locker_settings[:poll_interval] = 0.02 + locker + + TestJob.enqueue + + sleep_until { locked_ids.count.positive? && locked_ids.first == jobs_ext_dataset.first[:lock_id] } + + locker.stop! + + jobs_dataset.delete + end + + it "should add additional updated_at" do + TestJob.enqueue + + ext_job = jobs_ext_dataset.first + + assert_equal ext_job[:run_at], ext_job[:updated_at] + + locker + + sleep_until_equal(1) { finished_jobs_dataset.count } + + locker.stop! + + ext_job = jobs_ext_dataset.first + + assert_equal ext_job[:finished_at], ext_job[:updated_at] + + jobs_dataset.delete + end + + describe "should include additional status" do + + let(:notified_errors) { [] } + + it "should set status to scheduled when run_at is in the future" do + TestJob.enqueue(job_options: { run_at: Time.now + 1 }) + + assert_equal jobs_ext_dataset.first[:status], 'scheduled' + + jobs_dataset.delete + end + + it "should set status to queued when run_at is in the past and the job is not currently running, completed, failed or errored" do + TestJob.enqueue(job_options: { run_at: Time.now - 1 }) + + assert_equal jobs_ext_dataset.first[:status], 'queued' + + jobs_dataset.delete + end + + it "should set status to running when the job has a lock associated with it" do + locker_settings.clear + locker_settings[:listen] = false + locker_settings[:poll_interval] = 0.02 + locker + + TestJob.enqueue + + sleep_until { locked_ids.count.positive? && locked_ids.first == jobs_ext_dataset.first[:lock_id] && jobs_ext_dataset.first[:status] == 'running' } + + locker.stop! + + jobs_dataset.delete + end + + it "should set status to complete when finished_at is present" do + TestJob.enqueue + + locker + + sleep_until_equal(1) { DB[:que_lockers].count } + + sleep_until { finished_jobs_dataset.count.positive? } + + locker.stop! + + assert_equal jobs_ext_dataset.first[:status], 'completed' + + jobs_dataset.delete + end + + it "should set status to errored when error_count is positive and expired_at is not present" do + Que.error_notifier = proc { |e| notified_errors << e } + + TestFailedJob.class_eval do + self.maximum_retry_count = 100 # prevent from entering failed state on first error + end + + locker + + sleep_until_equal(1) { DB[:que_lockers].count } + + TestFailedJob.enqueue + + sleep_until { errored_jobs_dataset.where(expired_at: nil).count.positive? } + + locker.stop! + + ext_job = jobs_ext_dataset.first + + assert_equal ext_job[:status], 'errored' + assert_equal notified_errors.count, 1 + assert_equal notified_errors.first.message, 'Test Error' + + + jobs_dataset.delete + end + + it "should set status to failed when expired_at is present" do + TestFailedJob.class_eval do + self.maximum_retry_count = 0 + end + + Que.error_notifier = proc { |e| notified_errors << e } + + locker + + sleep_until_equal(1) { DB[:que_lockers].count } + + TestFailedJob.enqueue + + sleep_until { expired_jobs_dataset.count.positive? } + + locker.stop! + + assert_equal jobs_ext_dataset.first[:status], 'failed' + assert_equal notified_errors.count, 1 + assert_equal notified_errors.first.message, 'Test Error' + + + jobs_dataset.delete + end + end + end +end \ No newline at end of file diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index ac34bb5c..212f03e5 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -227,10 +227,18 @@ def jobs_dataset DB[:que_jobs] end + def jobs_ext_dataset + DB[:que_jobs_ext] + end + def active_jobs_dataset jobs_dataset.where(finished_at: nil, expired_at: nil) end + def errored_jobs_dataset + jobs_dataset.exclude(error_count: 0) + end + def expired_jobs_dataset jobs_dataset.exclude(expired_at: nil) end @@ -267,8 +275,16 @@ def internal_messages(event: nil) messages end + def pg_advisory_locks + DB[:pg_locks].where(locktype: 'advisory') + end + def locked_ids - DB[:pg_locks].where(locktype: 'advisory').select_order_map(Sequel.lit("(classid::bigint << 32) + objid::bigint")) + pg_advisory_locks.select_order_map(Sequel.lit("(classid::bigint << 32) + objid::bigint")) + end + + def locked_pids + pg_advisory_locks.select_order_map(:pid) end def current_spec_location