Skip to content

Commit

Permalink
Namespace async loaders and clean up errors
Browse files Browse the repository at this point in the history
  • Loading branch information
moveson committed Dec 2, 2024
1 parent 58671d6 commit 53ee75b
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 158 deletions.
16 changes: 10 additions & 6 deletions lib/etl/async_importer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,32 @@ def set_etl_strategies
when :event_course_splits
self.extract_strategy = Extractors::CsvFileStrategy
self.transform_strategy = Transformers::EventCourseSplitsStrategy
self.load_strategy = Loaders::AsyncInsertStrategy
self.load_strategy = Loaders::Async::InsertStrategy
when :event_group_entrants
self.extract_strategy = Extractors::CsvFileStrategy
self.transform_strategy = Transformers::EventGroupEntrantsStrategy
self.load_strategy = Loaders::AsyncInsertStrategy
self.load_strategy = Loaders::Async::InsertStrategy
when :event_entrants_with_military_times
self.extract_strategy = Extractors::CsvFileStrategy
self.transform_strategy = Transformers::Async::EffortsWithTimesStrategy
self.load_strategy = Loaders::AsyncInsertStrategy
self.load_strategy = Loaders::Async::InsertStrategy
self.custom_options = { time_format: :military }
when :hardrock_historical_facts
self.extract_strategy = Extractors::CsvFileStrategy
self.transform_strategy = Transformers::Async::HardrockHistoricalFactsStrategy
self.load_strategy = Loaders::AsyncInsertStrategy
self.load_strategy = Loaders::Async::InsertStrategy
when :ultrasignup_historical_facts
self.extract_strategy = Extractors::CsvFileStrategy
self.transform_strategy = Transformers::Async::UltrasignupHistoricalFactsStrategy
self.load_strategy = Loaders::AsyncInsertStrategy
self.load_strategy = Loaders::Async::InsertStrategy
when :ultrasignup_historical_facts_compare_order_ids
self.extract_strategy = Extractors::CsvFileStrategy
self.transform_strategy = Transformers::Async::NullStrategy
self.load_strategy = Loaders::Async::UltrasignupOrderIDCompareStrategy
when :lottery_entrants
self.extract_strategy = Extractors::CsvFileStrategy
self.transform_strategy = Transformers::LotteryEntrantsStrategy
self.load_strategy = Loaders::AsyncInsertStrategy
self.load_strategy = Loaders::Async::InsertStrategy
else
errors << format_not_recognized_error(format)
end
Expand Down
106 changes: 58 additions & 48 deletions lib/etl/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,138 +3,148 @@
module ETL
module Errors
def bad_url_error(url, error)
{title: "Bad URL", detail: {messages: ["#{url} reported an error: #{error}"]}}
{ title: "Bad URL", detail: { messages: ["#{url} reported an error: #{error}"] } }
end

def data_not_present_error
{title: "Data not present", detail: {messages: ["No data was provided"]}}
{ title: "Data not present", detail: { messages: ["No data was provided"] } }
end

def file_not_found_error(file_path)
{title: "File not found", detail: {messages: ["File #{file_path} could not be read"]}}
{ title: "File not found", detail: { messages: ["File #{file_path} could not be read"] } }
end

def file_too_large_error(file)
{title: "File too large", detail: {messages: ["File #{file} is #{file.size / 1.kilobyte} KB, but maximum file size is 500 KB"]}}
{ title: "File too large", detail: { messages: ["File #{file} is #{file.size / 1.kilobyte} KB, but maximum file size is 500 KB"] } }
end

def file_type_incorrect_error(file)
{title: "File type incorrect", detail: {messages: ["File #{file} must have a .csv extension and must be a CSV file"]}}
{ title: "File type incorrect", detail: { messages: ["File #{file} must have a .csv extension and must be a CSV file"] } }
end

def format_not_recognized_error(format)
{title: "Format not recognized", detail: {messages: ["data_format #{format} is not recognized"]}}
{ title: "Format not recognized", detail: { messages: ["data_format #{format} is not recognized"] } }
end

def invalid_file_error(file)
{title: "Invalid file", detail: {messages: ["#{file} is not a valid file"]}}
{ title: "Invalid file", detail: { messages: ["#{file} is not a valid file"] } }
end

def invalid_json_error(string)
{title: "Invalid JSON", detail: {messages: ["#{string} is not valid JSON"]}}
{ title: "Invalid JSON", detail: { messages: ["#{string} is not valid JSON"] } }
end

def invalid_proto_record_error(proto_record, row_index)
{title: "Invalid proto record", detail: {row_index: row_index, messages: ["Invalid proto record: #{proto_record}"]}}
{ title: "Invalid proto record", detail: { row_index: row_index, messages: ["Invalid proto record: #{proto_record}"] } }
end

def jsonapi_error_object(record)
{title: "#{record.class} could not be saved",
detail: {attributes: record.attributes.compact.transform_keys { |key| key.camelize(:lower) },
messages: record.errors.full_messages}}
{ title: "#{record.class} could not be saved",
detail: { attributes: record.attributes.compact.transform_keys { |key| key.camelize(:lower) },
messages: record.errors.full_messages } }
end

def missing_current_user_error
{title: "Current user id is missing",
detail: {messages: ["This import requires that a current_user_id be provided"]}}
{ title: "Current user id is missing",
detail: { messages: ["This import requires that a current_user_id be provided"] } }
end

def missing_data_error(raw_data)
{title: "Invalid data",
detail: {messages: ["The provided file #{raw_data} has a problem with the ['data'] key or its values"]}}
{ title: "Invalid data",
detail: { messages: ["The provided file #{raw_data} has a problem with the ['data'] key or its values"] } }
end

def missing_event_error
{title: "Event is missing",
detail: {messages: ["This import requires that an event be provided"]}}
{ title: "Event is missing",
detail: { messages: ["This import requires that an event be provided"] } }
end

def missing_fields_error(raw_data)
{title: "Invalid fields",
detail: {messages: ["The provided file #{raw_data} has a problem with the ['list'] key " +
"or the ['list']['fields'] key or its values"]}}
{ title: "Invalid fields",
detail: { messages: ["The provided file #{raw_data} has a problem with the ['list'] key " +
"or the ['list']['fields'] key or its values"] } }
end

def missing_key_error(*keys)
{title: "Key is missing",
detail: {messages: ["This import requires a column titled '#{keys.join(' or ')}' in order to proceed"]}}
{ title: "Key is missing",
detail: { messages: ["This import requires a column titled '#{keys.join(' or ')}' in order to proceed"] } }
end

def missing_parent_error(type = nil)
type ||= "record"
{title: "Parent is missing",
detail: {messages: ["This import requires that a parent #{type} be provided"]}}
{ title: "Parent is missing",
detail: { messages: ["This import requires that a parent #{type} be provided"] } }
end

def missing_records_error
{title: "No records were provided",
detail: {messages: ["No records were provided for this import"]}}
{ title: "No records were provided",
detail: { messages: ["No records were provided for this import"] } }
end

def missing_split_error
{title: "Split is missing",
detail: {messages: ["This import requires that a split be provided"]}}
{ title: "Split is missing",
detail: { messages: ["This import requires that a split be provided"] } }
end

def missing_start_key_error
{title: "Start key is missing",
detail: {messages: ['This import requires a column titled "start" or "start offset" to indicate at what point split times begin']}}
{ title: "Start key is missing",
detail: { messages: ['This import requires a column titled "start" or "start offset" to indicate at what point split times begin'] } }
end

def missing_table_error
{title: "Table is missing",
detail: {messages: ["A required table was not found in the provided source data"]}}
{ title: "Table is missing",
detail: { messages: ["A required table was not found in the provided source data"] } }
end

def orders_missing_error(ids)
{ title: "Orders are missing",
detail: { messages: ["Orders exist in Ultrasignup but are missing in OST: #{ids}"] } }
end

def orders_outdated_error(ids)
{ title: "Orders are outdated",
detail: { messages: ["Orders exist in OST but have been removed from Ultrasignup: #{ids}"] } }
end

def resource_not_found_error(resource_class, provided_resource_name, row_index)
humanized_resource_class = resource_class.to_s.underscore.humanize
message = provided_resource_name.present? ? "#{humanized_resource_class} could not be found: #{provided_resource_name}" : "#{humanized_resource_class} was not provided"
{title: "#{humanized_resource_class} not found", detail: {row_index: row_index, messages: [message]}}
{ title: "#{humanized_resource_class} not found", detail: { row_index: row_index, messages: [message] } }
end

def record_not_saved_error(error, row_index)
{title: "Record could not be saved",
detail: {row_index: row_index, messages: ["The record could not be saved: #{error}"]}}
{ title: "Record could not be saved",
detail: { row_index: row_index, messages: ["The record could not be saved: #{error}"] } }
end

def resource_error_object(record, row_index)
{title: "#{record.class} #{record} could not be saved",
detail: {row_index: row_index, attributes: record.attributes.compact, messages: record.errors.full_messages}}
{ title: "#{record.class} #{record} could not be saved",
detail: { row_index: row_index, attributes: record.attributes.compact, messages: record.errors.full_messages } }
end

def smarter_csv_error(exception)
{title: "CSV error",
detail: {messages: [exception.message]}}
{ title: "CSV error",
detail: { messages: [exception.message] } }
end

def source_not_recognized_error(source)
{title: "Source not recognized", detail: {messages: ["Importer does not recognize the source: #{source}"]}}
{ title: "Source not recognized", detail: { messages: ["Importer does not recognize the source: #{source}"] } }
end

def split_mismatch_error(event, time_points_size, time_keys_size)
{title: "Split mismatch error",
detail: {messages: ["#{event} expects #{time_points_size} time points (including the start split) " +
"but the provided data contemplates #{time_keys_size} time points."]}}
{ title: "Split mismatch error",
detail: { messages: ["#{event} expects #{time_points_size} time points (including the start split) " +
"but the provided data contemplates #{time_keys_size} time points."] } }
end

def transform_failed_error(error_text, row_index)
{title: "Transform failed error",
detail: {row_index: row_index, messages: ["Transform failed: #{error_text}"]}}
{ title: "Transform failed error",
detail: { row_index: row_index, messages: ["Transform failed: #{error_text}"] } }
end

def value_not_permitted_error(option, permitted_values, provided_value)
{title: "Argument value is not permitted",
detail: {messages: ["Values for #{option} must be within #{permitted_values.to_sentence} but #{provided_value} was provided"]}}
{ title: "Argument value is not permitted",
detail: { messages: ["Values for #{option} must be within #{permitted_values.to_sentence} but #{provided_value} was provided"] } }
end
end
end
101 changes: 101 additions & 0 deletions lib/etl/loaders/async/insert_strategy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# frozen_string_literal: true

module ETL::Loaders::Async
# If no unique_key is provided, this is a plain insert loader that will
# keep track of errors if validations are violated at the model or database
# level.
#
# If a unique_key is provided, records having the same unique key as an existing
# database record will be ignored.
class InsertStrategy
include ETL::Errors

CHUNK_SIZE = 20

attr_reader :errors

# @param [Array<ProtoRecord>] proto_records
# @param [Hash] options
# @return [nil]
def self.load_records(proto_records, options)
new(proto_records, options).load_records
end

# @param [Array<ProtoRecord>] proto_records
# @param [Hash] options
def initialize(proto_records, options)
@proto_records = proto_records
@options = options
@import_job = options[:import_job]
@unique_key = options[:unique_key]
@errors = []
end

# @return [nil]
def load_records
custom_load
end

private

attr_reader :proto_records, :options, :import_job, :unique_key

# @return [nil]
def custom_load
proto_records.each.with_index(1) do |proto_record, row_index|
if proto_record.record_class.blank?
errors << invalid_proto_record_error(proto_record, row_index)
next
end

record = build_record(proto_record)

if unique_key.present?
unique_attributes = unique_key.map { |attr| [attr, record.send(attr)] }.to_h

if record.class.exists?(unique_attributes)
import_job.increment!(:ignored_count)
next
end
end

if record.save
import_job.increment!(:succeeded_count)
else
import_job.increment!(:failed_count)
errors << resource_error_object(record, row_index)
end

rescue ActiveRecord::ActiveRecordError => e
import_job.increment!(:failed_count)
errors << record_not_saved_error(e, row_index)
ensure
import_job.set_elapsed_time!
import_job.touch if row_index % CHUNK_SIZE == 0
end

nil
end

# @param [ProtoRecord] proto_record
# @return [::ApplicationRecord]
def build_record(proto_record)
model_class = proto_record.record_class
attributes = proto_record.to_h
record = model_class.new(attributes)
assign_child_records(proto_record, record)
record
end

# @param [ProtoRecord] proto_record
# @param [::ApplicationRecord] record
# @return [Array<ProtoRecord>]
def assign_child_records(proto_record, record)
proto_record.children.each do |child_proto|
child_relationship = child_proto.record_type.to_s.pluralize
child_record = record.send(child_relationship).new
child_record.assign_attributes(child_proto.to_h)
end
end
end
end
Loading

0 comments on commit 53ee75b

Please sign in to comment.