Skip to content

Commit

Permalink
Merge pull request #61 from solver-it-sro/fix-reindex-job
Browse files Browse the repository at this point in the history
GO-205 GO-206 GO-207 Fix reindex job
  • Loading branch information
mirrec authored Sep 26, 2023
2 parents 95d1ccb + f5618ca commit 6100cdd
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 7 deletions.
2 changes: 1 addition & 1 deletion app/jobs/govbox/process_message_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Govbox
class ProcessMessageJob < ApplicationJob
queue_as :default

retry_on ::Govbox::Message::FailedToAcquireLockError, wait: :exponentially_longer, attempts: Float::INFINITY
retry_on ::ApplicationRecord::FailedToAcquireLockError, wait: :exponentially_longer, attempts: Float::INFINITY

def perform(govbox_message)
ActiveRecord::Base.transaction do
Expand Down
14 changes: 14 additions & 0 deletions app/jobs/searchable/reindex_message_thread_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
class Searchable::ReindexMessageThreadJob < ApplicationJob
queue_as :default

include GoodJob::ActiveJobExtensions::Concurrency

good_job_control_concurrency_with(
# Maximum number of unfinished jobs to allow with the concurrency key
# Can be an Integer or Lambda/Proc that is invoked in the context of the job
total_limit: 1,

key: -> { "Searchable::ReindexMessageThreadJob-#{arguments.first.try(:id)}" }
)

discard_on ActiveJob::DeserializationError

def perform(message_thread)
return if message_thread.nil?

::Searchable::MessageThread.index_record(message_thread)
end
end
12 changes: 12 additions & 0 deletions app/models/application_record.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
class ApplicationRecord < ActiveRecord::Base
self.abstract_class = true

class FailedToAcquireLockError < StandardError
end

def self.with_advisory_lock!(lock_name, options = {}, &block)
result = with_advisory_lock_result(lock_name, options, &block)
if result.lock_was_acquired?
result.result
else
raise FailedToAcquireLockError
end
end
end
7 changes: 1 addition & 6 deletions app/models/govbox/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def replyable?
end

def self.create_message_with_thread!(govbox_message)
message = MessageThread.with_advisory_lock(govbox_message.correlation_id, transaction: true, timeout_seconds: 10) do
message = MessageThread.with_advisory_lock!(govbox_message.correlation_id, transaction: true, timeout_seconds: 10) do
folder = Folder.find_or_create_by!(
name: "Inbox",
box: govbox_message.box
Expand All @@ -54,14 +54,9 @@ def self.create_message_with_thread!(govbox_message)
message
end

raise FailedToAcquireLockError unless message

self.create_message_objects(message, govbox_message.payload)
end

class FailedToAcquireLockError < StandardError
end

private

def self.create_message(govbox_message)
Expand Down

0 comments on commit 6100cdd

Please sign in to comment.