From 4a989d2f662dd11cf7c527625be54bdbed47fbc0 Mon Sep 17 00:00:00 2001 From: Vincent Pochet Date: Mon, 23 Sep 2024 15:42:17 +0200 Subject: [PATCH] misc(worker): Introduce SolidQueue --- Gemfile | 2 + Gemfile.lock | 42 +++++ bin/jobs | 6 + config/application.rb | 2 +- config/database.yml | 28 +++- config/queue.yml | 21 +++ config/recurring.yml | 9 ++ config/routes.rb | 2 + .../20240924093358_create_structure.rb | 133 ++++++++++++++++ db/queue_schema.rb | 144 ++++++++++++++++++ scripts/start.worker.dev.sh | 4 +- scripts/start.worker.sh | 2 +- 12 files changed, 391 insertions(+), 4 deletions(-) create mode 100755 bin/jobs create mode 100644 config/queue.yml create mode 100644 config/recurring.yml create mode 100644 db/queue_migrate/20240924093358_create_structure.rb create mode 100644 db/queue_schema.rb mode change 100755 => 100644 scripts/start.worker.sh diff --git a/Gemfile b/Gemfile index 2734c7b2d1d5..fda9d8d053e7 100644 --- a/Gemfile +++ b/Gemfile @@ -15,6 +15,7 @@ gem 'puma', '~> 6.4' gem 'rails', '~> 7.1.3.4' gem 'redis' gem 'sidekiq' +gem 'solid_queue' # Security gem 'bcrypt' @@ -101,6 +102,7 @@ group :development, :test do gem 'debug', platforms: %i[mri mingw x64_mingw], require: false gem 'dotenv' gem 'i18n-tasks', git: 'https://github.com/glebm/i18n-tasks.git' + gem "mission_control-jobs", github: "zavan/mission_control-jobs", branch: "api-only" gem 'rspec-rails' gem 'simplecov', require: false gem 'webmock' diff --git a/Gemfile.lock b/Gemfile.lock index badcf3a25c3f..f8c42bb35f04 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -29,6 +29,19 @@ GIT graphiql-rails (1.10.1) railties +GIT + remote: https://github.com/zavan/mission_control-jobs.git + revision: 5feecbe3305652c4153661ea221201847fe8ed27 + branch: api-only + specs: + mission_control-jobs (0.3.1) + importmap-rails + irb (~> 1.13) + propshaft + rails (>= 7.1) + stimulus-rails + turbo-rails + GEM remote: https://rubygems.org/ specs: @@ -194,6 +207,8 @@ GEM dotenv (3.1.2) drb (2.2.1) erubi (1.13.0) + et-orbi (1.2.11) + tzinfo execjs (2.9.1) factory_bot (6.4.6) activesupport (>= 5.0.0) @@ -213,6 +228,9 @@ GEM ffi (1.17.0-x86_64-linux-gnu) fiber-storage (1.0.0) formatador (1.1.0) + fugit (1.11.1) + et-orbi (~> 1, >= 1.2.11) + raabro (~> 1.4) globalid (1.2.1) activesupport (>= 6.1) gocardless_pro (2.57.0) @@ -282,6 +300,10 @@ GEM httpclient (2.8.3) i18n (1.14.5) concurrent-ruby (~> 1.0) + importmap-rails (2.0.1) + actionpack (>= 6.0.0) + activesupport (>= 6.0.0) + railties (>= 6.0.0) io-console (0.7.2) irb (1.14.0) rdoc (>= 4.0.0) @@ -593,6 +615,11 @@ GEM racc pg (1.5.7) prism (0.30.0) + propshaft (1.0.0) + actionpack (>= 7.0.0) + activesupport (>= 7.0.0) + rack + railties (>= 7.0.0) pry (0.14.2) coderay (~> 1.1) method_source (~> 1.0) @@ -601,6 +628,7 @@ GEM public_suffix (5.1.1) puma (6.4.3) nio4r (~> 2.0) + raabro (1.4.0) racc (1.8.1) rack (2.2.9) rack-cors (1.1.1) @@ -797,6 +825,13 @@ GEM snaky_hash (2.0.1) hashie version_gem (~> 1.1, >= 1.1.1) + solid_queue (0.9.0) + activejob (>= 7.1) + activerecord (>= 7.1) + concurrent-ruby (>= 1.3.1) + fugit (~> 1.11.0) + railties (>= 7.1) + thor (~> 1.3.1) sorbet-runtime (0.5.11535) sprockets (4.2.1) concurrent-ruby (~> 1.0) @@ -817,6 +852,8 @@ GEM standard-performance (1.4.0) lint_roller (~> 1.1) rubocop-performance (~> 1.21.0) + stimulus-rails (1.3.4) + railties (>= 6.0.0) stringio (3.1.1) stripe (6.5.0) strong_migrations (2.0.0) @@ -830,6 +867,9 @@ GEM timecop (0.9.10) timeout (0.4.1) trailblazer-option (0.1.2) + turbo-rails (2.0.10) + actionpack (>= 6.0.0) + railties (>= 6.0.0) tzinfo (2.0.6) concurrent-ruby (~> 1.0) uber (0.1.0) @@ -908,6 +948,7 @@ DEPENDENCIES karafka-web (~> 0.9.0) lograge logstash-event + mission_control-jobs! money-rails multipart-post mutex_m @@ -942,6 +983,7 @@ DEPENDENCIES simplecov slim slim-rails + solid_queue standard stripe strong_migrations diff --git a/bin/jobs b/bin/jobs new file mode 100755 index 000000000000..dcf59f309ae5 --- /dev/null +++ b/bin/jobs @@ -0,0 +1,6 @@ +#!/usr/bin/env ruby + +require_relative "../config/environment" +require "solid_queue/cli" + +SolidQueue::Cli.start(ARGV) diff --git a/config/application.rb b/config/application.rb index 85cfa375e970..c7dafb090ae7 100644 --- a/config/application.rb +++ b/config/application.rb @@ -23,7 +23,7 @@ class Application < Rails::Application ] config.api_only = true - config.active_job.queue_adapter = :sidekiq + config.active_job.queue_adapter = :solid_queue # :sidekiq # Configuration for active record encryption config.active_record.encryption.hash_digest_class = OpenSSL::Digest::SHA1 diff --git a/config/database.yml b/config/database.yml index 6d280080ee13..aeacf3ef21c4 100644 --- a/config/database.yml +++ b/config/database.yml @@ -1,6 +1,10 @@ default: &default adapter: postgresql +queue: &queue + <<: *default + migrations_paths: db/queue_migrate + development: primary: <<: *default @@ -9,7 +13,7 @@ development: password: changeme database: lago port: 5432 - schema_search_path: 'public' + schema_search_path: "public" events: <<: *default host: db @@ -27,6 +31,13 @@ development: migrations_paths: db/clickhouse_migrate debug: true database_tasks: <% if ENV['LAGO_CLICKHOUSE_MIGRATIONS_ENABLED'].present? %> true <% else %> false <% end %> + queue: + <<: *queue + host: db + username: lago + password: changeme + database: lago_queue + port: 5432 test: primary: @@ -48,6 +59,9 @@ test: debug: true database_tasks: <% if ENV['LAGO_CLICKHOUSE_MIGRATIONS_ENABLED'].present? %> true <% else %> false <% end %> schema_dump: <% if ENV['LAGO_DISABLE_SCHEMA_DUMP'].present? %> false <% else %> clickhouse_schema.rb <% end %> + queue: + <<: *queue + database: lago_development_queue staging: primary: @@ -69,6 +83,12 @@ staging: migrations_paths: db/clickhouse_migrate debug: false database_tasks: <% if ENV['LAGO_CLICKHOUSE_MIGRATIONS_ENABLED'].present? %> true <% else %> false <% end %> + queue: + <<: *default + url: <%= ENV['DATABASE_URL'] %> + prepared_statements: <%= ENV.fetch('DATABASE_PREPARED_STATEMENTS', true) %> + schema_search_path: <%= ENV.fetch('POSTGRES_SCHEMA', 'public') %> + database: app_production_queue production: primary: @@ -96,3 +116,9 @@ production: migrations_paths: db/clickhouse_migrate debug: false database_tasks: <% if ENV['LAGO_CLICKHOUSE_MIGRATIONS_ENABLED'].present? %> true <% else %> false <% end %> + queue: + <<: *default + url: <%= ENV['DATABASE_URL'] %> + prepared_statements: <%= ENV.fetch('DATABASE_PREPARED_STATEMENTS', true) %> + schema_search_path: <%= ENV.fetch('POSTGRES_SCHEMA', 'public') %> + database: app_production_queue diff --git a/config/queue.yml b/config/queue.yml new file mode 100644 index 000000000000..917317c1bfe5 --- /dev/null +++ b/config/queue.yml @@ -0,0 +1,21 @@ +default: &default + dispatchers: + - polling_interval: 1 + batch_size: 500 + workers: + - queues: "*" + threads: 3 + processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %> + polling_interval: 0.1 + +development: + <<: *default + +test: + <<: *default + +staging: + <<: *default + +production: + <<: *default diff --git a/config/recurring.yml b/config/recurring.yml new file mode 100644 index 000000000000..2f919f0dbe5b --- /dev/null +++ b/config/recurring.yml @@ -0,0 +1,9 @@ +# periodic_cleanup: +# class: CleanSoftDeletedRecordsJob +# queue: background +# args: [ 1000, { batch_size: 500 } ] +# schedule: every hour +# periodic_command: +# command: "SoftDeletedRecord.due.delete_all" +# priority: 2 +# schedule: at 5am every day diff --git a/config/routes.rb b/config/routes.rb index 934bbb5415a7..00fcadde042f 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -5,6 +5,8 @@ mount Karafka::Web::App, at: '/karafka' if ENV['KARAFKA_WEB'] mount GraphiQL::Rails::Engine, at: '/graphiql', graphql_path: '/graphql' if Rails.env.development? + mount MissionControl::Jobs::Engine, at: "/jobs" + post '/graphql', to: 'graphql#execute' # Health Check status diff --git a/db/queue_migrate/20240924093358_create_structure.rb b/db/queue_migrate/20240924093358_create_structure.rb new file mode 100644 index 000000000000..4aaada1bd074 --- /dev/null +++ b/db/queue_migrate/20240924093358_create_structure.rb @@ -0,0 +1,133 @@ +class CreateStructure < ActiveRecord::Migration[7.1] + def change + safety_assured do + create_table "solid_queue_blocked_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.string "concurrency_key", null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" + t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" + t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + end + + create_table "solid_queue_claimed_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.bigint "process_id" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true + t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + end + + create_table "solid_queue_failed_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.text "error" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true + end + + create_table "solid_queue_jobs", force: :cascade do |t| + t.string "queue_name", null: false + t.string "class_name", null: false + t.text "arguments" + t.integer "priority", default: 0, null: false + t.string "active_job_id" + t.datetime "scheduled_at" + t.datetime "finished_at" + t.string "concurrency_key" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" + t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" + t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" + t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" + t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" + end + + create_table "solid_queue_pauses", force: :cascade do |t| + t.string "queue_name", null: false + t.datetime "created_at", null: false + t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true + end + + create_table "solid_queue_processes", force: :cascade do |t| + t.string "kind", null: false + t.datetime "last_heartbeat_at", null: false + t.bigint "supervisor_id" + t.integer "pid", null: false + t.string "hostname" + t.text "metadata" + t.datetime "created_at", null: false + t.string "name", null: false + t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" + t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" + end + + create_table "solid_queue_ready_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true + t.index ["priority", "job_id"], name: "index_solid_queue_poll_all" + t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue" + end + + create_table "solid_queue_recurring_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "task_key", null: false + t.datetime "run_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true + t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true + end + + create_table "solid_queue_recurring_tasks", force: :cascade do |t| + t.string "key", null: false + t.string "schedule", null: false + t.string "command", limit: 2048 + t.string "class_name" + t.text "arguments" + t.string "queue_name" + t.integer "priority", default: 0 + t.boolean "static", default: true, null: false + t.text "description" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" + end + + create_table "solid_queue_scheduled_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "scheduled_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true + t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all" + end + + create_table "solid_queue_semaphores", force: :cascade do |t| + t.string "key", null: false + t.integer "value", default: 1, null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" + t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" + t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true + end + + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + end + end +end diff --git a/db/queue_schema.rb b/db/queue_schema.rb new file mode 100644 index 000000000000..ac1f87d78914 --- /dev/null +++ b/db/queue_schema.rb @@ -0,0 +1,144 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.1].define(version: 2024_09_24_093358) do + # These are extensions that must be enabled in order to support this database + enable_extension "plpgsql" + + create_table "solid_queue_blocked_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.string "concurrency_key", null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" + t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" + t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + end + + create_table "solid_queue_claimed_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.bigint "process_id" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true + t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + end + + create_table "solid_queue_failed_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.text "error" + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true + end + + create_table "solid_queue_jobs", force: :cascade do |t| + t.string "queue_name", null: false + t.string "class_name", null: false + t.text "arguments" + t.integer "priority", default: 0, null: false + t.string "active_job_id" + t.datetime "scheduled_at" + t.datetime "finished_at" + t.string "concurrency_key" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" + t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" + t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" + t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" + t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" + end + + create_table "solid_queue_pauses", force: :cascade do |t| + t.string "queue_name", null: false + t.datetime "created_at", null: false + t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true + end + + create_table "solid_queue_processes", force: :cascade do |t| + t.string "kind", null: false + t.datetime "last_heartbeat_at", null: false + t.bigint "supervisor_id" + t.integer "pid", null: false + t.string "hostname" + t.text "metadata" + t.datetime "created_at", null: false + t.string "name", null: false + t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" + t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" + end + + create_table "solid_queue_ready_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true + t.index ["priority", "job_id"], name: "index_solid_queue_poll_all" + t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue" + end + + create_table "solid_queue_recurring_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "task_key", null: false + t.datetime "run_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true + t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true + end + + create_table "solid_queue_recurring_tasks", force: :cascade do |t| + t.string "key", null: false + t.string "schedule", null: false + t.string "command", limit: 2048 + t.string "class_name" + t.text "arguments" + t.string "queue_name" + t.integer "priority", default: 0 + t.boolean "static", default: true, null: false + t.text "description" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" + end + + create_table "solid_queue_scheduled_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "scheduled_at", null: false + t.datetime "created_at", null: false + t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true + t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all" + end + + create_table "solid_queue_semaphores", force: :cascade do |t| + t.string "key", null: false + t.integer "value", default: 1, null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" + t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" + t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true + end + + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade +end diff --git a/scripts/start.worker.dev.sh b/scripts/start.worker.dev.sh index 982c2f28a4bd..ae6e691a469c 100755 --- a/scripts/start.worker.dev.sh +++ b/scripts/start.worker.dev.sh @@ -1,4 +1,6 @@ #!/bin/bash bundle install -bundle exec sidekiq -C config/sidekiq.yml +#bundle exec sidekiq -C config/sidekiq.yml + +bundle exec rake solid_queue:start diff --git a/scripts/start.worker.sh b/scripts/start.worker.sh old mode 100755 new mode 100644 index b7c2653000ad..ec2c32af51b5 --- a/scripts/start.worker.sh +++ b/scripts/start.worker.sh @@ -1,3 +1,3 @@ #!/bin/bash -bundle exec sidekiq -C config/sidekiq.yml \ No newline at end of file +bundle exec sidekiq -C config/sidekiq.yml