Skip to content

Commit

Permalink
Merge pull request #34 from basecamp/better-job-release
Browse files Browse the repository at this point in the history
Link supervisor with forks in the process registry
  • Loading branch information
rosa authored Nov 2, 2023
2 parents 7649fb8 + 0c23eeb commit a49b63c
Show file tree
Hide file tree
Showing 18 changed files with 203 additions and 146 deletions.
17 changes: 6 additions & 11 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ def success?
CLAIM_ATTRIBUTES = %w[ job_id ]

class << self
def claiming(executions, &block)
job_data = Array(executions).collect { |execution| execution.attributes.slice(*CLAIM_ATTRIBUTES) }
def claiming(executions, process_id, &block)
job_data = Array(executions).collect { |execution| { job_id: execution.job_id, process_id: process_id } }

insert_all(job_data)
where(job_id: job_data.map { |data| data["job_id"]} ).tap do |claimed|
where(job_id: job_data.map { |data| data[:job_id]} ).tap do |claimed|
block.call(claimed)
SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs")
end
Expand All @@ -25,10 +25,9 @@ def release_all
end
end

def perform(process)
claimed_by(process)

def perform
result = execute

if result.success?
finished
else
Expand All @@ -44,12 +43,8 @@ def release
end

private
def claimed_by(process)
update!(process: process)
SolidQueue.logger.info("[SolidQueue] Performing job #{job.id} - #{job.active_job_id}")
end

def execute
SolidQueue.logger.info("[SolidQueue] Performing job #{job.id} - #{job.active_job_id}")
ActiveJob::Base.execute(job.arguments)
Result.new(true, nil)
rescue Exception => e
Expand Down
14 changes: 6 additions & 8 deletions app/models/solid_queue/process.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
class SolidQueue::Process < SolidQueue::Record
include Prunable

if Gem::Version.new(Rails.version) >= Gem::Version.new("7.1")
serialize :metadata, coder: JSON
else
serialize :metadata, JSON
end

belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :forks
has_many :forks, class_name: "SolidQueue::Process", inverse_of: :supervisor, dependent: :destroy
has_many :claimed_executions

store :metadata, accessors: [ :kind, :pid ], coder: JSON

after_destroy -> { claimed_executions.release_all }

def self.register(metadata)
create!(metadata: metadata, last_heartbeat_at: Time.current)
def self.register(**attributes)
create!(attributes.merge(last_heartbeat_at: Time.current))
end

def heartbeat
Expand Down
13 changes: 6 additions & 7 deletions app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ class ReadyExecution < Execution
before_create :assume_attributes_from_job

class << self
def claim(queues, limit)
def claim(queues, limit, process_id)
transaction do
candidates = select_candidates(queues, limit)
lock(candidates)
lock(candidates, process_id)
end
end

Expand All @@ -22,18 +22,17 @@ def select_candidates(queues, limit)
queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED")
end

def lock(candidates)
def lock(candidates, process_id)
return [] if candidates.none?

SolidQueue::ClaimedExecution.claiming(candidates) do |claimed|
SolidQueue::ClaimedExecution.claiming(candidates, process_id) do |claimed|
where(job_id: claimed.pluck(:job_id)).delete_all
end
end
end

def claim
def claim(process_id)
transaction do
SolidQueue::ClaimedExecution.claiming(self) do |claimed|
SolidQueue::ClaimedExecution.claiming(self, process_id) do |claimed|
delete if claimed.one?
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
require "solid_queue/pool"
require "solid_queue/queue_parser"
require "solid_queue/runner"
require "solid_queue/runner/process_registration"
require "solid_queue/process_registration"
require "solid_queue/worker"
require "solid_queue/scheduler"
require "solid_queue/supervisor"
Expand Down
6 changes: 3 additions & 3 deletions lib/solid_queue/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ def initialize(size, on_idle: nil)
@mutex = Mutex.new
end

def post(execution, process)
def post(execution)
available_threads.decrement

future = Concurrent::Future.new(args: [ execution, process ], executor: executor) do |thread_execution, thread_process|
future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
wrap_in_app_executor do
thread_execution.perform(thread_process)
thread_execution.perform
ensure
available_threads.increment
mutex.synchronize { on_idle.try(:call) if idle? }
Expand Down
71 changes: 71 additions & 0 deletions lib/solid_queue/process_registration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# frozen_string_literal: true

module SolidQueue
module ProcessRegistration
extend ActiveSupport::Concern

included do
include ActiveSupport::Callbacks
define_callbacks :start, :run, :shutdown

set_callback :start, :before, :register
set_callback :start, :before, :start_heartbeat

set_callback :run, :after, -> { stop unless registered? }

set_callback :shutdown, :before, :stop_heartbeat
set_callback :shutdown, :after, :deregister

attr_reader :supervisor
end

def inspect
metadata.inspect
end
alias to_s inspect

def supervised_by(process)
@supervisor = process
end

private
attr_accessor :process

def register
@process = SolidQueue::Process.register(supervisor: supervisor, metadata: metadata)
end

def deregister
process.deregister
end

def registered?
process.persisted?
end

def start_heartbeat
@heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) { heartbeat }
@heartbeat_task.execute
end

def stop_heartbeat
@heartbeat_task.shutdown
end

def heartbeat
process.heartbeat
end

def hostname
@hostname ||= Socket.gethostname
end

def process_pid
@pid ||= ::Process.pid
end

def metadata
{ kind: self.class.name.demodulize, hostname: hostname, pid: process_pid, supervisor_pid: supervisor&.pid }
end
end
end
15 changes: 1 addition & 14 deletions lib/solid_queue/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@ module Runner
extend ActiveSupport::Concern

included do
include ActiveSupport::Callbacks
define_callbacks :start, :run, :shutdown

include AppExecutor, Procline
include ProcessRegistration, Interruptible

attr_accessor :supervisor_pid
end

def start(mode: :supervised)
Expand Down Expand Up @@ -95,7 +90,7 @@ def finished?
end

def supervisor_went_away?
supervised? && supervisor_pid != ::Process.ppid
supervised? && supervisor&.pid != ::Process.ppid
end

def supervised?
Expand All @@ -109,13 +104,5 @@ def all_work_completed?
def running_inline?
mode.inline?
end

def hostname
@hostname ||= Socket.gethostname
end

def process_pid
@pid ||= ::Process.pid
end
end
end
52 changes: 0 additions & 52 deletions lib/solid_queue/runner/process_registration.rb

This file was deleted.

30 changes: 17 additions & 13 deletions lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module SolidQueue
class Supervisor
include AppExecutor, Signals, Procline
include AppExecutor, ProcessRegistration, Signals, Procline

class << self
def start(mode: :work, load_configuration_from: nil)
Expand All @@ -19,26 +19,30 @@ def initialize(runners)
end

def start
procline "starting"
run_callbacks(:start) do
procline "starting"

sync_std_streams
setup_pidfile
register_signal_handlers
start_process_prune
sync_std_streams
setup_pidfile
register_signal_handlers
start_process_prune

start_runners
start_runners

procline "started"
procline "started"

supervise
supervise
end
rescue GracefulShutdownRequested
graceful_shutdown
rescue ImmediateShutdownRequested
immediate_shutdown
ensure
stop_process_prune
restore_default_signal_handlers
delete_pidfile
run_callbacks(:shutdown) do
stop_process_prune
restore_default_signal_handlers
delete_pidfile
end
end

private
Expand Down Expand Up @@ -114,7 +118,7 @@ def prune_dead_processes
end

def start_runner(runner)
runner.supervisor_pid = ::Process.pid
runner.supervised_by process

pid = fork do
runner.start
Expand Down
4 changes: 2 additions & 2 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ def initialize(**options)

private
def run
claimed_executions = SolidQueue::ReadyExecution.claim(queues, pool.idle_threads)
claimed_executions = SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id)

if claimed_executions.size > 0
procline "performing #{claimed_executions.count} jobs in #{queues}"

claimed_executions.each do |execution|
pool.post(execution, process)
pool.post(execution)
end
else
procline "waiting for jobs in #{queues}"
Expand Down
4 changes: 3 additions & 1 deletion test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2023_10_30_164933) do
ActiveRecord::Schema[7.1].define(version: 2023_11_02_184135) do
create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "queue_name"
t.string "status"
Expand Down Expand Up @@ -59,7 +59,9 @@
t.text "metadata"
t.datetime "created_at", null: false
t.datetime "last_heartbeat_at", null: false
t.bigint "supervisor_id"
t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at"
t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id"
end

create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
Expand Down
Loading

0 comments on commit a49b63c

Please sign in to comment.