From fef0f062d17d54e2ad3529296a18c0fd6716f8da Mon Sep 17 00:00:00 2001 From: Miroslav Hettes Date: Fri, 22 Sep 2023 17:05:48 +0200 Subject: [PATCH 1/7] Introduce a with_advisory_lock! method --- app/jobs/govbox/process_message_job.rb | 2 +- app/models/application_record.rb | 12 ++++++++++++ app/models/govbox/message.rb | 5 ----- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/app/jobs/govbox/process_message_job.rb b/app/jobs/govbox/process_message_job.rb index 24a8ee777..16be44f0d 100644 --- a/app/jobs/govbox/process_message_job.rb +++ b/app/jobs/govbox/process_message_job.rb @@ -4,7 +4,7 @@ module Govbox class ProcessMessageJob < ApplicationJob queue_as :default - retry_on ::Govbox::Message::FailedToAcquireLockError, wait: :exponentially_longer, attempts: Float::INFINITY + retry_on ::ApplicationRecord::FailedToAcquireLockError, wait: :exponentially_longer, attempts: Float::INFINITY def perform(govbox_message) ActiveRecord::Base.transaction do diff --git a/app/models/application_record.rb b/app/models/application_record.rb index 10a4cba84..9e44c581f 100644 --- a/app/models/application_record.rb +++ b/app/models/application_record.rb @@ -1,3 +1,15 @@ class ApplicationRecord < ActiveRecord::Base self.abstract_class = true + + class FailedToAcquireLockError < StandardError + end + + def self.with_advisory_lock!(lock_name, options = {}, &block) + result = with_advisory_lock_result(lock_name, options, &block) + if result.lock_was_acquired? + result.result + else + raise FailedToAcquireLockError + end + end end diff --git a/app/models/govbox/message.rb b/app/models/govbox/message.rb index 266a63fac..e1829c1e6 100644 --- a/app/models/govbox/message.rb +++ b/app/models/govbox/message.rb @@ -54,14 +54,9 @@ def self.create_message_with_thread!(govbox_message) message end - raise FailedToAcquireLockError unless message - self.create_message_objects(message, govbox_message.payload) end - class FailedToAcquireLockError < StandardError - end - private def self.create_message(govbox_message) From 17893d1ea8349f1002795de6b92dcec8c66efacd Mon Sep 17 00:00:00 2001 From: Miroslav Hettes Date: Fri, 22 Sep 2023 17:07:38 +0200 Subject: [PATCH 2/7] Index message thread with advisory_lock --- app/jobs/searchable/reindex_message_thread_job.rb | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/app/jobs/searchable/reindex_message_thread_job.rb b/app/jobs/searchable/reindex_message_thread_job.rb index 53f985c9f..32c952a2e 100644 --- a/app/jobs/searchable/reindex_message_thread_job.rb +++ b/app/jobs/searchable/reindex_message_thread_job.rb @@ -1,7 +1,13 @@ class Searchable::ReindexMessageThreadJob < ApplicationJob queue_as :default + retry_on ::ApplicationRecord::FailedToAcquireLockError, wait: :exponentially_longer, attempts: Float::INFINITY + def perform(message_thread) - ::Searchable::MessageThread.index_record(message_thread) + ::Searchable::MessageThread.transaction do + ::Searchable::MessageThread.with_advisory_lock!("mt_#{message_thread.id}", transaction: true, timeout_seconds: 10) do + ::Searchable::MessageThread.index_record(message_thread) + end + end end end From 2bf7ac658ef74ee8bfe2b359695d4864e092f2e6 Mon Sep 17 00:00:00 2001 From: Miroslav Hettes Date: Fri, 22 Sep 2023 17:29:56 +0200 Subject: [PATCH 3/7] Limit total number of unfinished jobs for reindexing --- app/jobs/searchable/reindex_message_thread_job.rb | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/app/jobs/searchable/reindex_message_thread_job.rb b/app/jobs/searchable/reindex_message_thread_job.rb index 32c952a2e..c925f9b34 100644 --- a/app/jobs/searchable/reindex_message_thread_job.rb +++ b/app/jobs/searchable/reindex_message_thread_job.rb @@ -1,6 +1,16 @@ class Searchable::ReindexMessageThreadJob < ApplicationJob queue_as :default + include GoodJob::ActiveJobExtensions::Concurrency + + good_job_control_concurrency_with( + # Maximum number of unfinished jobs to allow with the concurrency key + # Can be an Integer or Lambda/Proc that is invoked in the context of the job + total_limit: 1, + + key: -> { "Searchable::ReindexMessageThreadJob-#{arguments.first.id}" } + ) + retry_on ::ApplicationRecord::FailedToAcquireLockError, wait: :exponentially_longer, attempts: Float::INFINITY def perform(message_thread) From 4f5dde982a93493c33108376d1503e9938557c59 Mon Sep 17 00:00:00 2001 From: Miroslav Hettes Date: Fri, 22 Sep 2023 17:44:27 +0200 Subject: [PATCH 4/7] Discard on deserialization error --- app/jobs/searchable/reindex_message_thread_job.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/jobs/searchable/reindex_message_thread_job.rb b/app/jobs/searchable/reindex_message_thread_job.rb index c925f9b34..8d3fb73ee 100644 --- a/app/jobs/searchable/reindex_message_thread_job.rb +++ b/app/jobs/searchable/reindex_message_thread_job.rb @@ -8,10 +8,11 @@ class Searchable::ReindexMessageThreadJob < ApplicationJob # Can be an Integer or Lambda/Proc that is invoked in the context of the job total_limit: 1, - key: -> { "Searchable::ReindexMessageThreadJob-#{arguments.first.id}" } + key: -> { "Searchable::ReindexMessageThreadJob-#{arguments.first.try(:id)}" } ) retry_on ::ApplicationRecord::FailedToAcquireLockError, wait: :exponentially_longer, attempts: Float::INFINITY + discard_on ActiveJob::DeserializationError def perform(message_thread) ::Searchable::MessageThread.transaction do From e0020f68fe5e1f35bb36d4a80be2edad7a3a6b73 Mon Sep 17 00:00:00 2001 From: Miroslav Hettes Date: Mon, 25 Sep 2023 10:41:54 +0200 Subject: [PATCH 5/7] Skip reindexing if message thread is nil --- app/jobs/searchable/reindex_message_thread_job.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app/jobs/searchable/reindex_message_thread_job.rb b/app/jobs/searchable/reindex_message_thread_job.rb index 8d3fb73ee..5ff72caee 100644 --- a/app/jobs/searchable/reindex_message_thread_job.rb +++ b/app/jobs/searchable/reindex_message_thread_job.rb @@ -15,6 +15,8 @@ class Searchable::ReindexMessageThreadJob < ApplicationJob discard_on ActiveJob::DeserializationError def perform(message_thread) + return if message_thread.nil? + ::Searchable::MessageThread.transaction do ::Searchable::MessageThread.with_advisory_lock!("mt_#{message_thread.id}", transaction: true, timeout_seconds: 10) do ::Searchable::MessageThread.index_record(message_thread) From eff4b95bb9b440a21fb90501d26aebc8a126ec51 Mon Sep 17 00:00:00 2001 From: Miroslav Hettes Date: Mon, 25 Sep 2023 11:03:58 +0200 Subject: [PATCH 6/7] Remove advisory lock from job with concurrency control --- app/jobs/searchable/reindex_message_thread_job.rb | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/app/jobs/searchable/reindex_message_thread_job.rb b/app/jobs/searchable/reindex_message_thread_job.rb index 5ff72caee..41f7732f1 100644 --- a/app/jobs/searchable/reindex_message_thread_job.rb +++ b/app/jobs/searchable/reindex_message_thread_job.rb @@ -11,16 +11,11 @@ class Searchable::ReindexMessageThreadJob < ApplicationJob key: -> { "Searchable::ReindexMessageThreadJob-#{arguments.first.try(:id)}" } ) - retry_on ::ApplicationRecord::FailedToAcquireLockError, wait: :exponentially_longer, attempts: Float::INFINITY discard_on ActiveJob::DeserializationError def perform(message_thread) return if message_thread.nil? - ::Searchable::MessageThread.transaction do - ::Searchable::MessageThread.with_advisory_lock!("mt_#{message_thread.id}", transaction: true, timeout_seconds: 10) do - ::Searchable::MessageThread.index_record(message_thread) - end - end + ::Searchable::MessageThread.index_record(message_thread) end end From 829ef9b80c22e27094f1fa038c1357fb5f2dbfb9 Mon Sep 17 00:00:00 2001 From: Miroslav Hettes Date: Mon, 25 Sep 2023 11:17:00 +0200 Subject: [PATCH 7/7] Use correct method --- app/models/govbox/message.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/govbox/message.rb b/app/models/govbox/message.rb index e1829c1e6..2833a2f67 100644 --- a/app/models/govbox/message.rb +++ b/app/models/govbox/message.rb @@ -27,7 +27,7 @@ def replyable? end def self.create_message_with_thread!(govbox_message) - message = MessageThread.with_advisory_lock(govbox_message.correlation_id, transaction: true, timeout_seconds: 10) do + message = MessageThread.with_advisory_lock!(govbox_message.correlation_id, transaction: true, timeout_seconds: 10) do folder = Folder.find_or_create_by!( name: "Inbox", box: govbox_message.box