Skip to content

Commit

Permalink
Move scheduling logic to the mlpop lua command
Browse files Browse the repository at this point in the history
Resque finds jobs by iterating on a list of queues, for each queue it
tries to lpop a job until successful.

This patch moves that logic to the server by implementing mlpop,
that approach reduces the required round-trips especially for workers
watching a large set of queues.

The API is not altered, we keep the same signature while accepting
variadic arguments.
  • Loading branch information
ctrochalakis committed Feb 14, 2021
1 parent ac60015 commit 35421dd
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 18 deletions.
24 changes: 18 additions & 6 deletions lib/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,23 @@ def push(queue, item)
data_store.push_to_queue(queue,encode(item))
end

# Pops a job off a queue. Queue name should be a string.
# Pops a job off a set of queues. Queue names should be a string.
#
# Returns a Ruby object.
def pop(queue)
decode(data_store.pop_from_queue(queue))
def pop(*queues)
queue, decoded_job = pop_with_queue(*queues)

decoded_job
end

# Pops a job off a set of queues. Queue names should be strings.
#
# Returns an array of [queue, Ruby object] or nil.
def pop_with_queue(*queues)
queue, job = data_store.pop_from_queue(*queues)
return if queue.nil?

[queue, decode(job)]
end

# Returns an integer representing the size of a queue.
Expand Down Expand Up @@ -435,11 +447,11 @@ def queue_from_class(klass)

# This method will return a `Resque::Job` object or a non-true value
# depending on whether a job can be obtained. You should pass it the
# precise name of a queue: case matters.
# precise queue names: case matters.
#
# This method is considered part of the `stable` API.
def reserve(queue)
Job.reserve(queue)
def reserve(*queues)
Job.reserve(*queues)
end

# Validates if the given klass could be a valid Resque job
Expand Down
61 changes: 58 additions & 3 deletions lib/resque/data_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,45 @@ def push_to_queue(queue,encoded_item)
end
end

# Pop whatever is on queue
def pop_from_queue(queue)
@redis.lpop(redis_key_for_queue(queue))
# MLPOP key [key ..]
#
# LPOP the first available element by probing each key
# in the specified order.
#
# Moving the scheduling logic to the server reduces
# the required round-trips especially for workers with
# many queues in their watchlists.
#
# Returns a array of [key-index, element value] or nil
# if no element was found.
#
# Note that we choose to return the key index instead
# of the key itself: Redis::Namespace converts the keys
# passed to the script (adding a prefix), and we would
# have to reverse map the return values. We avoid that
# by returning the index instead.
def mlpop(*queues, keys:)
@redis.evalsha(mlpop_sha, keys: keys)
rescue Redis::CommandError => e
# If the script is not found (e.g. when the server
# is restarted) reload it.
if e.message.include?("NOSCRIPT")
mlpop_sha!
retry
end

raise e
end

def pop_from_queue(*queues)
keys = queues.map { |q| redis_key_for_queue(q) }
key_idx, payload = mlpop(*queues, keys: keys)
return nil if key_idx.nil?

[queues[Integer(key_idx)], payload]
end


# Get the number of items in the queue
def queue_size(queue)
@redis.llen(redis_key_for_queue(queue)).to_i
Expand Down Expand Up @@ -176,6 +210,27 @@ def list_range(key, start = 0, count = 1)

private

# Return the mlpop script sha hash by loading the script to Redis
def mlpop_sha
@mlpop_sha ||= @redis.script(:load, <<-eos
for i, q in ipairs(KEYS) do
local qpop = redis.call("lpop", q)
if qpop then
-- return the zero-index position
return {i-1, qpop}
end
end
return false
eos
)
end

def mlpop_sha!
@mlpop_sha = nil
mlpop_sha
end


def redis_key_for_queue(queue)
"queue:#{queue}"
end
Expand Down
8 changes: 5 additions & 3 deletions lib/resque/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,12 @@ def self.destroy(queue, klass, *args)
destroyed
end

# Given a string queue name, returns an instance of Resque::Job
# Given a list of queue names, returns an instance of Resque::Job
# if any jobs are available. If not, returns nil.
def self.reserve(queue)
return unless payload = Resque.pop(queue)
def self.reserve(*queues)
queue, payload = Resque.pop_with_queue(*queues)
return unless queue

new(queue, payload)
end

Expand Down
10 changes: 4 additions & 6 deletions lib/resque/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,10 @@ def perform(job)
# Attempts to grab a job off one of the provided queues. Returns
# nil if no job can be found.
def reserve
queues.each do |queue|
log_with_severity :debug, "Checking #{queue}"
if job = Resque.reserve(queue)
log_with_severity :debug, "Found job on #{queue}"
return job
end
log_with_severity :debug, "Checking #{queues}"
if job = Resque.reserve(*queues)
log_with_severity :debug, "Found job on #{job.queue}"
return job
end

nil
Expand Down
19 changes: 19 additions & 0 deletions test/resque_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,25 @@
assert_equal '/tmp', job.args[1]
end

it "can grab jobs off multiple queues" do
Resque::Job.create(:jobs1, 'some-job', 20, '/tmp')
Resque::Job.create(:jobs2, 'some-job', 40, '/')

job = Resque.reserve(:jobs1, :jobs2)

assert_kind_of Resque::Job, job
assert_equal SomeJob, job.payload_class
assert_equal 20, job.args[0]
assert_equal '/tmp', job.args[1]

job = Resque.reserve(:jobs, :hobbies)

assert_kind_of Resque::Job, job
assert_equal SomeJob, job.payload_class
assert_equal 40, job.args[0]
assert_equal '/', job.args[1]
end

it "can re-queue jobs" do
Resque::Job.create(:jobs, 'some-job', 20, '/tmp')

Expand Down

0 comments on commit 35421dd

Please sign in to comment.