Skip to content

Commit

Permalink
move requeue from worker into optional behavior on RunsExtractors and (
Browse files Browse the repository at this point in the history
…#692)

RunsReducers
  • Loading branch information
amyrebecca authored Apr 8, 2019
1 parent cf7905d commit 4c0ca58
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 36 deletions.
16 changes: 15 additions & 1 deletion app/models/runs_extractors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def initialize(extractors)
@extractors = extractors
end

def extract(classification)
def extract(classification, and_reduce: false)
return [] unless extractors&.present?

tries ||= 2
Expand Down Expand Up @@ -57,6 +57,20 @@ def extract(classification)
end
end

if and_reduce
extracts = extracts.select { |extract| extract != Extractor::NoData }

if extracts.present?
ids = extracts.map(&:id)
ReduceWorker.perform_async(classification.workflow_id, "Workflow", classification.subject_id, classification.user_id, ids)

project = Project.find_by_id(workflow.project_id)
if project && project.has_reducers?
ReduceWorker.perform_async(classification.project_id, "Project", classification.subject_id, classification.user_id, ids)
end
end
end

extracts
rescue ActiveRecord::RecordNotUnique, PG::UniqueViolation
sleep 2
Expand Down
6 changes: 5 additions & 1 deletion app/models/runs_reducers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def initialize(reducible, reducers)
@reducers = reducers
end

def reduce(subject_id, user_id, extract_ids=[])
def reduce(subject_id, user_id, extract_ids=[], and_check_rules: false)
return [] unless reducers&.present?
retries ||= 2

Expand Down Expand Up @@ -49,6 +49,10 @@ def reduce(subject_id, user_id, extract_ids=[])
end
end

if reducible.is_a?(Workflow) && and_check_rules
CheckRulesWorker.perform_async(reducible.id, reducible.class, subject_id, user_id) unless new_reductions.blank?
end

new_reductions
rescue ActiveRecord::StaleObjectError
retry unless (retries-=1).zero?
Expand Down
13 changes: 2 additions & 11 deletions app/workers/extract_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,10 @@ def perform(classification_id)
workflow = classification.workflow
return if workflow.paused?

extracts = workflow.extractors_runner.extract(classification)

extracts = workflow.extractors_runner.extract(classification, and_reduce: true)
classification.destroy

if extracts.present?
ids = extracts.map(&:id)
ReduceWorker.perform_async(classification.workflow_id, "Workflow", classification.subject_id, classification.user_id, ids)

project = Project.find_by_id(workflow.project_id)
if project && project.has_reducers?
ReduceWorker.perform_async(classification.project_id, "Project", classification.subject_id, classification.user_id, ids)
end
end
extracts
rescue ActiveRecord::RecordNotFound => e
if Extract.where(classification_id: classification_id).any?
# This will sometimes happen in the following sequence of events:
Expand Down
7 changes: 1 addition & 6 deletions app/workers/reduce_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ def self.unique_args(args)
def perform(reducible_id, reducible_class, subject_id, user_id, extract_ids = [])
reducible = reducible_class.constantize.find(reducible_id)
return if reducible.paused?

reductions = reducible.reducers_runner.reduce(subject_id, user_id, extract_ids)

if reducible.is_a?(Workflow)
CheckRulesWorker.perform_async(reducible_id, reducible_class, subject_id, user_id) unless reductions.blank?
end
reducible.reducers_runner.reduce(subject_id, user_id, extract_ids, and_check_rules: true)
end

def self.test_uniq(testing)
Expand Down
18 changes: 1 addition & 17 deletions spec/workers/reduce_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,9 @@
let(:runner) { reducible.reducers_runner }

it "calls #reduce on the correct pipeline" do
expect_any_instance_of(RunsReducers).to receive(:reduce).once.with(subject.id, nil, [])
expect_any_instance_of(RunsReducers).to receive(:reduce).once.with(subject.id, nil, [], {and_check_rules: true})
described_class.new.perform(reducible.id, reducible.class.to_s, subject.id, nil)
end

it "calls CheckRulesWorker when reducing for a workflow" do
workflow = create :workflow
allow_any_instance_of(RunsReducers).to receive(:reduce).and_return('not blank')
expect do
described_class.new.perform(workflow.id, 'Workflow', nil, nil, nil)
end.to change(CheckRulesWorker.jobs, :size).by(1)
end

it "will not try to run CheckRulesWorker when reducing for a project" do
project = create :project
allow_any_instance_of(RunsReducers).to receive(:reduce).and_return('not blank')
expect do
described_class.new.perform(project.id, 'Project', nil, nil, nil)
end.not_to change(CheckRulesWorker.jobs, :size)
end
end

describe '#unique_args' do
Expand Down

0 comments on commit 4c0ca58

Please sign in to comment.