diff --git a/app/jobs/govbox/process_message_job.rb b/app/jobs/govbox/process_message_job.rb index 24a8ee777..16be44f0d 100644 --- a/app/jobs/govbox/process_message_job.rb +++ b/app/jobs/govbox/process_message_job.rb @@ -4,7 +4,7 @@ module Govbox class ProcessMessageJob < ApplicationJob queue_as :default - retry_on ::Govbox::Message::FailedToAcquireLockError, wait: :exponentially_longer, attempts: Float::INFINITY + retry_on ::ApplicationRecord::FailedToAcquireLockError, wait: :exponentially_longer, attempts: Float::INFINITY def perform(govbox_message) ActiveRecord::Base.transaction do diff --git a/app/jobs/searchable/reindex_message_thread_job.rb b/app/jobs/searchable/reindex_message_thread_job.rb index 53f985c9f..41f7732f1 100644 --- a/app/jobs/searchable/reindex_message_thread_job.rb +++ b/app/jobs/searchable/reindex_message_thread_job.rb @@ -1,7 +1,21 @@ class Searchable::ReindexMessageThreadJob < ApplicationJob queue_as :default + include GoodJob::ActiveJobExtensions::Concurrency + + good_job_control_concurrency_with( + # Maximum number of unfinished jobs to allow with the concurrency key + # Can be an Integer or Lambda/Proc that is invoked in the context of the job + total_limit: 1, + + key: -> { "Searchable::ReindexMessageThreadJob-#{arguments.first.try(:id)}" } + ) + + discard_on ActiveJob::DeserializationError + def perform(message_thread) + return if message_thread.nil? + ::Searchable::MessageThread.index_record(message_thread) end end diff --git a/app/models/application_record.rb b/app/models/application_record.rb index 10a4cba84..9e44c581f 100644 --- a/app/models/application_record.rb +++ b/app/models/application_record.rb @@ -1,3 +1,15 @@ class ApplicationRecord < ActiveRecord::Base self.abstract_class = true + + class FailedToAcquireLockError < StandardError + end + + def self.with_advisory_lock!(lock_name, options = {}, &block) + result = with_advisory_lock_result(lock_name, options, &block) + if result.lock_was_acquired? + result.result + else + raise FailedToAcquireLockError + end + end end diff --git a/app/models/govbox/message.rb b/app/models/govbox/message.rb index 266a63fac..2833a2f67 100644 --- a/app/models/govbox/message.rb +++ b/app/models/govbox/message.rb @@ -27,7 +27,7 @@ def replyable? end def self.create_message_with_thread!(govbox_message) - message = MessageThread.with_advisory_lock(govbox_message.correlation_id, transaction: true, timeout_seconds: 10) do + message = MessageThread.with_advisory_lock!(govbox_message.correlation_id, transaction: true, timeout_seconds: 10) do folder = Folder.find_or_create_by!( name: "Inbox", box: govbox_message.box @@ -54,14 +54,9 @@ def self.create_message_with_thread!(govbox_message) message end - raise FailedToAcquireLockError unless message - self.create_message_objects(message, govbox_message.payload) end - class FailedToAcquireLockError < StandardError - end - private def self.create_message(govbox_message)