-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lock the rows when taking the advisory lock with ActiveRecordWithLock adapter #110
Lock the rows when taking the advisory lock with ActiveRecordWithLock adapter #110
Conversation
8bfbac0
to
8628157
Compare
8628157
to
c93e650
Compare
result = Que.execute(:find_job_to_lock, [queue, cursor]) | ||
result = Que.transaction do | ||
observe(nil, FindJobSecondsTotal, queue: queue) do | ||
result = Que.execute(:find_job_to_lock, [queue, cursor]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's obvious why we do this, given that there is already an advisory lock. Let's explain in a comment why this improves performance when there are a large number of workers contending for work, and why this doesn't ensure race safety (hence we still need the advisory lock).
else | ||
config.filter_run_excluding :active_record_with_lock | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can be unintuitive that only some specs run. Given that this is to test adapter spec functionality, could we just add an RSpec for that class?
observe(FindJobHitTotal, nil, { queue: queue, job_hit: job_locked }) | ||
return result if job_locked | ||
end | ||
break if result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: a combination of returns and breaks can be a bit hard to follow. Is it possible to avoid this break such that all result cases are handled by early returns?
@@ -4,7 +4,7 @@ class LockDatabaseRecord < ActiveRecord::Base | |||
establish_connection( | |||
adapter: "postgresql", | |||
host: ENV.fetch("LOCK_PGHOST", "localhost"), | |||
user: ENV.fetch("LOCK_PGUSER", "postgres"), | |||
user: ENV.fetch("LOCK_PGUSER", "ubuntu"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this ubuntu ? Is this something specific to CI specs?
cursor = result.first["job_id"] | ||
break if pg_try_advisory_lock?(cursor) | ||
cursor = result.first["job_id"] | ||
job_locked = pg_try_advisory_lock?(cursor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to include the SQL query here in the metric for the time it takes to find a job ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit + non blocking for this PR: also, if you unify the metric labels you might be able to use Hesiod.register_duration_counters
and Hesiod.observe
as convenience wrappers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time to find a job is in line 51. Locking shouldn't take much time. Not required to track as of now.
There is already a metric within Locker to track this
@@ -28,3 +28,11 @@ def active_record_with_lock_adapter_connection | |||
lock_connection_pool: LockDatabaseRecord.connection_pool, | |||
) | |||
end | |||
|
|||
RSpec.configure do |config| | |||
if ENV["ADAPTER"] == "ActiveRecordWithLock" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason for gating this test? Also, if we're going to gate it do we need to document it/specify it in CI ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments on the control flow. I have a meta-point on the overall Que philosophy, since we're now breaking one of the assertions in the README:
Workers don't block each other when trying to lock jobs, as often occurs with "SELECT FOR UPDATE"-style locking.
If we're now breaking this assertion for these jobs we should make this clear and justify why we're trading this off for the performance hit.
lib/que/sql.rb
Outdated
@@ -184,7 +184,7 @@ module Que | |||
AND retryable = true | |||
AND job_id >= $2 | |||
ORDER BY priority, run_at, job_id | |||
LIMIT 1 | |||
for update skip locked LIMIT 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for update skip locked LIMIT 1 | |
FOR UPDATE SKIP LOCKED LIMIT 1 |
def lock_job_with_lock_database(queue, cursor) | ||
result = [] | ||
loop do | ||
result = Que.execute(:find_job_to_lock, [queue, cursor]) | ||
result = Que.transaction do | ||
observe(nil, FindJobSecondsTotal, queue: queue) do | ||
result = Que.execute(:find_job_to_lock, [queue, cursor]) | ||
end | ||
|
||
break if result.empty? | ||
return result if result.empty? | ||
|
||
cursor = result.first["job_id"] | ||
break if pg_try_advisory_lock?(cursor) | ||
cursor = result.first["job_id"] | ||
job_locked = pg_try_advisory_lock?(cursor) | ||
|
||
observe(FindJobHitTotal, nil, { queue: queue, job_hit: job_locked }) | ||
return result if job_locked | ||
end | ||
break if result | ||
end | ||
|
||
result | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this control flow a bit confusing with the return
s from within the transaction. Since you've got an interaction between the proc, the loop, and the method, it doesn't feel very rubyish since using returns from within a proc could be interpreted as intended to be a next
. A possibly more Rubyish way writing this would be:
def lock_job_with_lock_database(queue, cursor)
loop do
locked_job = Que.transaction do
job_to_lock = observe(nil, FindJobSecondsTotal, queue: queue) do
Que.execute(:find_job_to_lock, [queue, cursor])
end
raise NoLockableJobs if job_to_lock.empty?
cursor = result.first["job_id"]
job_locked = pg_try_advisory_lock?(cursor)
observe(FindJobHitTotal, nil, { queue: queue, job_hit: job_locked })
job_locked
end
return locked_job if locked_job
end
rescue NoLockableJobs
[]
end
It's then very clear that the case where there are no lockable jobs is an early exit, and the rest of the flow loops to find a lockable job in the queue while jobs exist.
spec/integration/integration_spec.rb
Outdated
@@ -119,6 +119,20 @@ def wait_for_jobs_to_be_worked(timeout: 10) | |||
expect(User.count).to eq(3) | |||
expect(User.all.map(&:name).sort).to eq(%w[alice bob charlie]) | |||
end | |||
|
|||
it "increments the metrics", :active_record_with_lock do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than mocking the metrics a nicer way to check this would be to run the workers then just check the metrics are in the expected state after the fact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
spec/integration/integration_spec.rb
Outdated
expect(QueJob.count).to eq(3) | ||
|
||
with_workers(5) { wait_for_jobs_to_be_worked } | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given we've introduced a more complex control flow it would be good to have some non-metric tests for the locking testing the early exit and loop functionality, and asserting we can't enter into an infinite loop when there are no jobs.
1b0e1d9
to
7d136d8
Compare
5c49449
to
4398099
Compare
end | ||
|
||
let(:lock_connection_pool) do | ||
return LockDatabaseRecord.connection_pool if ENV["ADAPTER"] == "ActiveRecordWithLock" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now have separate tests for the two adapters, I think we should use shared specs to define "Que behaviour" for all adapters, so that we only run those per adapter rather than all the specs - this feels a bit clearer than controlling this through env vars.
Non-blocking, can be a clean-up later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is required only for this Adapter I think adding a filter would be better. It gets bit tricky because of the 2 database here.
Rest all the specs are run against specific adapter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated this to exclude all the files in adapter spec directory except for the particular adapter spec
370bc7a
to
d9f4322
Compare
d9f4322
to
ee42d61
Compare
For ActiveRecordWithLock adapter, we want to lock the rows when acquiring the advisory locks on the table to avoid all the workers trying to lock the same job.
Added metrics to observe the latency to find the job and job hit rate.