Skip to content

Commit

Permalink
misc(worker): Introduce SolidQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-pochet committed Oct 1, 2024
1 parent 5b5afe0 commit 5960dd8
Show file tree
Hide file tree
Showing 12 changed files with 390 additions and 4 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ gem 'puma', '~> 6.4'
gem 'rails', '~> 7.1.3.4'
gem 'redis'
gem 'sidekiq'
gem 'solid_queue'

# Security
gem 'bcrypt'
Expand Down Expand Up @@ -101,6 +102,7 @@ group :development, :test do
gem 'debug', platforms: %i[mri mingw x64_mingw], require: false
gem 'dotenv'
gem 'i18n-tasks', git: 'https://github.com/glebm/i18n-tasks.git'
gem "mission_control-jobs", github: "zavan/mission_control-jobs", branch: "api-only"
gem 'rspec-rails'
gem 'simplecov', require: false
gem 'webmock'
Expand Down
42 changes: 42 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ GIT
graphiql-rails (1.10.1)
railties

GIT
remote: https://github.com/zavan/mission_control-jobs.git
revision: 5feecbe3305652c4153661ea221201847fe8ed27
branch: api-only
specs:
mission_control-jobs (0.3.1)
importmap-rails
irb (~> 1.13)
propshaft
rails (>= 7.1)
stimulus-rails
turbo-rails

GEM
remote: https://rubygems.org/
specs:
Expand Down Expand Up @@ -194,6 +207,8 @@ GEM
dotenv (3.1.2)
drb (2.2.1)
erubi (1.13.0)
et-orbi (1.2.11)
tzinfo
execjs (2.9.1)
factory_bot (6.4.6)
activesupport (>= 5.0.0)
Expand All @@ -213,6 +228,9 @@ GEM
ffi (1.17.0-x86_64-linux-gnu)
fiber-storage (1.0.0)
formatador (1.1.0)
fugit (1.11.1)
et-orbi (~> 1, >= 1.2.11)
raabro (~> 1.4)
globalid (1.2.1)
activesupport (>= 6.1)
gocardless_pro (2.57.0)
Expand Down Expand Up @@ -282,6 +300,10 @@ GEM
httpclient (2.8.3)
i18n (1.14.5)
concurrent-ruby (~> 1.0)
importmap-rails (2.0.1)
actionpack (>= 6.0.0)
activesupport (>= 6.0.0)
railties (>= 6.0.0)
io-console (0.7.2)
irb (1.14.0)
rdoc (>= 4.0.0)
Expand Down Expand Up @@ -593,6 +615,11 @@ GEM
racc
pg (1.5.7)
prism (0.30.0)
propshaft (1.0.0)
actionpack (>= 7.0.0)
activesupport (>= 7.0.0)
rack
railties (>= 7.0.0)
pry (0.14.2)
coderay (~> 1.1)
method_source (~> 1.0)
Expand All @@ -601,6 +628,7 @@ GEM
public_suffix (5.1.1)
puma (6.4.3)
nio4r (~> 2.0)
raabro (1.4.0)
racc (1.8.1)
rack (2.2.9)
rack-cors (1.1.1)
Expand Down Expand Up @@ -797,6 +825,13 @@ GEM
snaky_hash (2.0.1)
hashie
version_gem (~> 1.1, >= 1.1.1)
solid_queue (0.9.0)
activejob (>= 7.1)
activerecord (>= 7.1)
concurrent-ruby (>= 1.3.1)
fugit (~> 1.11.0)
railties (>= 7.1)
thor (~> 1.3.1)
sorbet-runtime (0.5.11535)
sprockets (4.2.1)
concurrent-ruby (~> 1.0)
Expand All @@ -817,6 +852,8 @@ GEM
standard-performance (1.4.0)
lint_roller (~> 1.1)
rubocop-performance (~> 1.21.0)
stimulus-rails (1.3.4)
railties (>= 6.0.0)
stringio (3.1.1)
stripe (6.5.0)
strong_migrations (2.0.0)
Expand All @@ -830,6 +867,9 @@ GEM
timecop (0.9.10)
timeout (0.4.1)
trailblazer-option (0.1.2)
turbo-rails (2.0.10)
actionpack (>= 6.0.0)
railties (>= 6.0.0)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
uber (0.1.0)
Expand Down Expand Up @@ -908,6 +948,7 @@ DEPENDENCIES
karafka-web (~> 0.9.0)
lograge
logstash-event
mission_control-jobs!
money-rails
multipart-post
mutex_m
Expand Down Expand Up @@ -942,6 +983,7 @@ DEPENDENCIES
simplecov
slim
slim-rails
solid_queue
standard
stripe
strong_migrations
Expand Down
6 changes: 6 additions & 0 deletions bin/jobs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env ruby

require_relative "../config/environment"
require "solid_queue/cli"

SolidQueue::Cli.start(ARGV)
2 changes: 1 addition & 1 deletion config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Application < Rails::Application
]

config.api_only = true
config.active_job.queue_adapter = :sidekiq
config.active_job.queue_adapter = :solid_queue # :sidekiq

# Configuration for active record encryption
config.active_record.encryption.hash_digest_class = OpenSSL::Digest::SHA1
Expand Down
25 changes: 24 additions & 1 deletion config/database.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
default: &default
adapter: postgresql

queue: &queue
<<: *default
migrations_paths: db/queue_migrate

development:
primary:
<<: *default
Expand All @@ -9,7 +13,7 @@ development:
password: changeme
database: lago
port: 5432
schema_search_path: 'public'
schema_search_path: "public"
events:
<<: *default
host: db
Expand All @@ -27,6 +31,13 @@ development:
migrations_paths: db/clickhouse_migrate
debug: true
database_tasks: <% if ENV['LAGO_CLICKHOUSE_MIGRATIONS_ENABLED'].present? %> true <% else %> false <% end %>
queue:
<<: *queue
host: db
username: lago
password: changeme
database: lago_queue
port: 5432

test:
primary:
Expand Down Expand Up @@ -69,6 +80,12 @@ staging:
migrations_paths: db/clickhouse_migrate
debug: false
database_tasks: <% if ENV['LAGO_CLICKHOUSE_MIGRATIONS_ENABLED'].present? %> true <% else %> false <% end %>
queue:
<<: *default
url: <%= ENV['DATABASE_URL'] %>
prepared_statements: <%= ENV.fetch('DATABASE_PREPARED_STATEMENTS', true) %>
schema_search_path: <%= ENV.fetch('POSTGRES_SCHEMA', 'public') %>
database: app_production_queue

production:
primary:
Expand Down Expand Up @@ -96,3 +113,9 @@ production:
migrations_paths: db/clickhouse_migrate
debug: false
database_tasks: <% if ENV['LAGO_CLICKHOUSE_MIGRATIONS_ENABLED'].present? %> true <% else %> false <% end %>
queue:
<<: *default
url: <%= ENV['DATABASE_URL'] %>
prepared_statements: <%= ENV.fetch('DATABASE_PREPARED_STATEMENTS', true) %>
schema_search_path: <%= ENV.fetch('POSTGRES_SCHEMA', 'public') %>
database: app_production_queue
21 changes: 21 additions & 0 deletions config/queue.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
default: &default
dispatchers:
- polling_interval: 1
batch_size: 500
workers:
- queues: "*"
threads: 3
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
polling_interval: 0.1

development:
<<: *default

test:
<<: *default

staging:
<<: *default

production:
<<: *default
9 changes: 9 additions & 0 deletions config/recurring.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# periodic_cleanup:
# class: CleanSoftDeletedRecordsJob
# queue: background
# args: [ 1000, { batch_size: 500 } ]
# schedule: every hour
# periodic_command:
# command: "SoftDeletedRecord.due.delete_all"
# priority: 2
# schedule: at 5am every day
2 changes: 2 additions & 0 deletions config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
mount Karafka::Web::App, at: '/karafka' if ENV['KARAFKA_WEB']
mount GraphiQL::Rails::Engine, at: '/graphiql', graphql_path: '/graphql' if Rails.env.development?

mount MissionControl::Jobs::Engine, at: "/jobs"

post '/graphql', to: 'graphql#execute'

# Health Check status
Expand Down
135 changes: 135 additions & 0 deletions db/queue_migrate/20240924093358_create_structure.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# frozen_string_literal: true

class CreateStructure < ActiveRecord::Migration[7.1]
def change
safety_assured do
create_table "solid_queue_blocked_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.string "concurrency_key", null: false
t.datetime "expires_at", null: false
t.datetime "created_at", null: false
t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release"
t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance"
t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true
end

create_table "solid_queue_claimed_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.bigint "process_id"
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true
t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id"
end

create_table "solid_queue_failed_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.text "error"
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true
end

create_table "solid_queue_jobs", force: :cascade do |t|
t.string "queue_name", null: false
t.string "class_name", null: false
t.text "arguments"
t.integer "priority", default: 0, null: false
t.string "active_job_id"
t.datetime "scheduled_at"
t.datetime "finished_at"
t.string "concurrency_key"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id"
t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name"
t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at"
t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering"
t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting"
end

create_table "solid_queue_pauses", force: :cascade do |t|
t.string "queue_name", null: false
t.datetime "created_at", null: false
t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true
end

create_table "solid_queue_processes", force: :cascade do |t|
t.string "kind", null: false
t.datetime "last_heartbeat_at", null: false
t.bigint "supervisor_id"
t.integer "pid", null: false
t.string "hostname"
t.text "metadata"
t.datetime "created_at", null: false
t.string "name", null: false
t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at"
t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true
t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id"
end

create_table "solid_queue_ready_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true
t.index ["priority", "job_id"], name: "index_solid_queue_poll_all"
t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue"
end

create_table "solid_queue_recurring_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "task_key", null: false
t.datetime "run_at", null: false
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true
t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true
end

create_table "solid_queue_recurring_tasks", force: :cascade do |t|
t.string "key", null: false
t.string "schedule", null: false
t.string "command", limit: 2048
t.string "class_name"
t.text "arguments"
t.string "queue_name"
t.integer "priority", default: 0
t.boolean "static", default: true, null: false
t.text "description"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true
t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static"
end

create_table "solid_queue_scheduled_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.datetime "scheduled_at", null: false
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true
t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all"
end

create_table "solid_queue_semaphores", force: :cascade do |t|
t.string "key", null: false
t.integer "value", default: 1, null: false
t.datetime "expires_at", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at"
t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value"
t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true
end

add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
end
end
end
Loading

0 comments on commit 5960dd8

Please sign in to comment.