Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose job latency metric via ActiveSupport Notifications job middleware #366

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,32 @@ Que.job_middleware.push(
)
```

#### Existing Middleware

Que ships with middleware to expose job metrics using ActiveSupport notifications to subscribe to it you can implelent the following

```ruby
::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, labels|
# do something with notification.
end
```

`started` and `finished` are numeric values representing a monotonic clock so can
be used for timing calculations without concerning ourselves with the system clock.

`labels` is a hash containing the following keys

* `job_class` - the class of the job.
* `queue` - the queue this job was queued into.
* `priority` - the priority of this job.
* `latency` - the amount of time this job was waiting in the queue for.

To use this middleware you will have to initialize it with Que

```ruby
Que.job_middleware.push(Que::ActiveSupport::JobMiddleware)
```

### Defining Middleware For SQL statements

SQL middleware wraps queries that Que executes, or which you might decide to execute via Que.execute(). You can use hook this into NewRelic or a similar service to instrument how long SQL queries take, for example.
Expand Down
26 changes: 26 additions & 0 deletions lib/que/active_support/job_middleware.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

module Que
module ActiveSupport
module JobMiddleware
def self.call(job)
labels = {
job_class: job.que_attrs[:job_class],
priority: job.que_attrs[:priority],
queue: job.que_attrs[:queue],
latency: job.que_attrs[:latency],
}

started = Process.clock_gettime(Process::CLOCK_MONOTONIC)
yield
ensure
::ActiveSupport::Notifications.publish(
"que_job.worked",
started,
Process.clock_gettime(Process::CLOCK_MONOTONIC),
labels.merge(error: job.que_error.present?),
)
end
end
end
end
9 changes: 9 additions & 0 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
require 'set'

module Que
class << self
attr_accessor :locker
end

Listener::MESSAGE_FORMATS[:job_available] =
{
queue: String,
Expand Down Expand Up @@ -71,6 +75,11 @@ def initialize(
Que.assert Array, worker_priorities
worker_priorities.each { |p| Que.assert([Integer, NilClass], p) }

# We assign this globally because we only ever expect on locker to be
# created per worker process. This can be used by middleware or external
# code to access the locker during runtime.
Que.locker = self

# We use a JobBuffer to track jobs and pass them to workers, and a
# ResultQueue to receive messages from workers.
@job_buffer = JobBuffer.new(
Expand Down
2 changes: 2 additions & 0 deletions lib/que/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class Poller
SELECT
(j).*,
l.locked,
extract(epoch from (now() - (j).run_at)) as latency,
l.remaining_priorities
FROM (
SELECT j
Expand All @@ -81,6 +82,7 @@ class Poller
SELECT
(j).*,
l.locked,
extract(epoch from (now() - (j).run_at)) as latency,
l.remaining_priorities
FROM (
SELECT
Expand Down
53 changes: 53 additions & 0 deletions spec/que/active_support/job_middleware_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

require 'spec_helper'

if defined?(::ActiveSupport)
require 'que/active_support/job_middleware'

describe Que::ActiveSupport::JobMiddleware do
let(:job) { Que::Job.new(**labels) }

let(:labels) do
{
job_class: "Foo",
priority: 100,
queue: "foo_queue",
latency: 100,
}
end

it "records metrics when job succeeds" do
called = false
subscriber = ::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, metric_labels|
assert_equal "que_job.worked", message
assert started != nil
assert finished != nil
assert_equal labels.merge(error: false), metric_labels
called = true
end

Que::ActiveSupport::JobMiddleware.call(job) { }

assert_equal true, called

::ActiveSupport::Notifications.unsubscribe(subscriber)
end

it "records metrics when job fails" do
called = false
subscriber = ::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, metric_labels|
assert_equal "que_job.worked", message
assert started != nil
assert finished != nil
assert_equal labels.merge(error: true), metric_labels
called = true
end

Que::ActiveSupport::JobMiddleware.call(job) { job.que_error = "error" }

assert_equal true, called
::ActiveSupport::Notifications.unsubscribe(subscriber)
end
end
end
1 change: 1 addition & 0 deletions spec/que/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def poll(
metajobs.each do |metajob|
# Make sure we pull in run_at timestamps in iso8601 format.
assert_match(Que::TIME_REGEX, metajob.job[:run_at])
assert metajob.job[:latency].to_f > 0
end

returned_job_ids = metajobs.map(&:id)
Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# in some spec runs.
if ENV['USE_RAILS'] == 'true'
require 'active_record'
require 'active_support/notifications'
require 'active_job'

ActiveJob::Base.queue_adapter = :que
Expand Down