Skip to content
This repository has been archived by the owner on Jul 14, 2024. It is now read-only.

Commit

Permalink
fix(pipeline_jobs_stopping): Complete running jobs that have finished…
Browse files Browse the repository at this point in the history
… when viewing the pipeline jobs page
  • Loading branch information
richardmatthewsdev committed Jul 14, 2024
1 parent 19be696 commit 91e35f5
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
14 changes: 14 additions & 0 deletions app/controllers/pipeline_jobs_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
class PipelineJobsController < ApplicationController
before_action :find_pipeline
before_action :find_pipeline_job, only: %i[show cancel]
before_action :complete_finished_jobs

def index
@pipeline_jobs = paginate_and_filter_jobs(@pipeline.pipeline_jobs)
Expand Down Expand Up @@ -36,6 +37,19 @@ def cancel
end

private

# we have noticed an issue where jobs are not being appropriately marked as completed during the worker lifecycle
# this is being caused by latency between the concurrent sidekiq process and the database
# future plan is to introduce a watching process as part of the harvest and move logic about completions and scheduling enrichments there
def complete_finished_jobs
running_reports = @pipeline.pipeline_jobs.flat_map(&:harvest_reports).select { |report| report.status == 'running' }

running_reports.each do |report|
report.update(transformation_status: 'completed') if report.transformation_workers_completed?
report.update(load_status: 'completed') if report.load_workers_completed?
report.update(delete_status: 'completed') if report.delete_workers_completed?
end
end

def find_pipeline
@pipeline = Pipeline.find(params[:pipeline_id])
Expand Down
13 changes: 13 additions & 0 deletions spec/requests/pipeline_jobs_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
let(:pipeline) { create(:pipeline) }
let(:destination) { create(:destination) }
let(:pipeline_job) { create(:pipeline_job, pipeline:, destination:) }
let(:harvest_definition) { create(:harvest_definition, pipeline:) }
let(:harvest_job) { create(:harvest_job, harvest_definition:, pipeline_job:) }

let!(:harvest_report) { create(:harvest_report, pipeline_job:, harvest_job:, extraction_status: 'completed', transformation_status: 'running', transformation_workers_queued: 1, transformation_workers_completed: 1, load_status: 'running', load_workers_queued: 1, load_workers_completed: 1, delete_status: 'running', delete_workers_queued: 0, delete_workers_completed: 0) }

before do
sign_in(user)
Expand All @@ -18,6 +22,15 @@

expect(response).to have_http_status :ok
end

it 'completes running jobs that have finished' do
expect(harvest_report.status).to eq 'running'
get pipeline_pipeline_jobs_path(pipeline)

harvest_report.reload

expect(harvest_report.status).to eq 'completed'
end
end

describe 'POST /create' do
Expand Down

0 comments on commit 91e35f5

Please sign in to comment.