From 53ee75be298c7cb0df579d28e68d94a645c136da Mon Sep 17 00:00:00 2001 From: moveson Date: Mon, 2 Dec 2024 10:51:10 -0700 Subject: [PATCH] Namespace async loaders and clean up errors --- lib/etl/async_importer.rb | 16 ++- lib/etl/errors.rb | 106 ++++++++++-------- lib/etl/loaders/async/insert_strategy.rb | 101 +++++++++++++++++ lib/etl/loaders/async_insert_strategy.rb | 103 ----------------- .../insert_strategy_spec.rb} | 2 +- 5 files changed, 170 insertions(+), 158 deletions(-) create mode 100644 lib/etl/loaders/async/insert_strategy.rb delete mode 100644 lib/etl/loaders/async_insert_strategy.rb rename spec/lib/etl/loaders/{async_insert_strategy_spec.rb => async/insert_strategy_spec.rb} (99%) diff --git a/lib/etl/async_importer.rb b/lib/etl/async_importer.rb index 9115646ac..e91df8be5 100644 --- a/lib/etl/async_importer.rb +++ b/lib/etl/async_importer.rb @@ -40,28 +40,32 @@ def set_etl_strategies when :event_course_splits self.extract_strategy = Extractors::CsvFileStrategy self.transform_strategy = Transformers::EventCourseSplitsStrategy - self.load_strategy = Loaders::AsyncInsertStrategy + self.load_strategy = Loaders::Async::InsertStrategy when :event_group_entrants self.extract_strategy = Extractors::CsvFileStrategy self.transform_strategy = Transformers::EventGroupEntrantsStrategy - self.load_strategy = Loaders::AsyncInsertStrategy + self.load_strategy = Loaders::Async::InsertStrategy when :event_entrants_with_military_times self.extract_strategy = Extractors::CsvFileStrategy self.transform_strategy = Transformers::Async::EffortsWithTimesStrategy - self.load_strategy = Loaders::AsyncInsertStrategy + self.load_strategy = Loaders::Async::InsertStrategy self.custom_options = { time_format: :military } when :hardrock_historical_facts self.extract_strategy = Extractors::CsvFileStrategy self.transform_strategy = Transformers::Async::HardrockHistoricalFactsStrategy - self.load_strategy = Loaders::AsyncInsertStrategy + self.load_strategy = Loaders::Async::InsertStrategy when :ultrasignup_historical_facts self.extract_strategy = Extractors::CsvFileStrategy self.transform_strategy = Transformers::Async::UltrasignupHistoricalFactsStrategy - self.load_strategy = Loaders::AsyncInsertStrategy + self.load_strategy = Loaders::Async::InsertStrategy + when :ultrasignup_historical_facts_compare_order_ids + self.extract_strategy = Extractors::CsvFileStrategy + self.transform_strategy = Transformers::Async::NullStrategy + self.load_strategy = Loaders::Async::UltrasignupOrderIDCompareStrategy when :lottery_entrants self.extract_strategy = Extractors::CsvFileStrategy self.transform_strategy = Transformers::LotteryEntrantsStrategy - self.load_strategy = Loaders::AsyncInsertStrategy + self.load_strategy = Loaders::Async::InsertStrategy else errors << format_not_recognized_error(format) end diff --git a/lib/etl/errors.rb b/lib/etl/errors.rb index efd3e780f..d27c1f663 100644 --- a/lib/etl/errors.rb +++ b/lib/etl/errors.rb @@ -3,138 +3,148 @@ module ETL module Errors def bad_url_error(url, error) - {title: "Bad URL", detail: {messages: ["#{url} reported an error: #{error}"]}} + { title: "Bad URL", detail: { messages: ["#{url} reported an error: #{error}"] } } end def data_not_present_error - {title: "Data not present", detail: {messages: ["No data was provided"]}} + { title: "Data not present", detail: { messages: ["No data was provided"] } } end def file_not_found_error(file_path) - {title: "File not found", detail: {messages: ["File #{file_path} could not be read"]}} + { title: "File not found", detail: { messages: ["File #{file_path} could not be read"] } } end def file_too_large_error(file) - {title: "File too large", detail: {messages: ["File #{file} is #{file.size / 1.kilobyte} KB, but maximum file size is 500 KB"]}} + { title: "File too large", detail: { messages: ["File #{file} is #{file.size / 1.kilobyte} KB, but maximum file size is 500 KB"] } } end def file_type_incorrect_error(file) - {title: "File type incorrect", detail: {messages: ["File #{file} must have a .csv extension and must be a CSV file"]}} + { title: "File type incorrect", detail: { messages: ["File #{file} must have a .csv extension and must be a CSV file"] } } end def format_not_recognized_error(format) - {title: "Format not recognized", detail: {messages: ["data_format #{format} is not recognized"]}} + { title: "Format not recognized", detail: { messages: ["data_format #{format} is not recognized"] } } end def invalid_file_error(file) - {title: "Invalid file", detail: {messages: ["#{file} is not a valid file"]}} + { title: "Invalid file", detail: { messages: ["#{file} is not a valid file"] } } end def invalid_json_error(string) - {title: "Invalid JSON", detail: {messages: ["#{string} is not valid JSON"]}} + { title: "Invalid JSON", detail: { messages: ["#{string} is not valid JSON"] } } end def invalid_proto_record_error(proto_record, row_index) - {title: "Invalid proto record", detail: {row_index: row_index, messages: ["Invalid proto record: #{proto_record}"]}} + { title: "Invalid proto record", detail: { row_index: row_index, messages: ["Invalid proto record: #{proto_record}"] } } end def jsonapi_error_object(record) - {title: "#{record.class} could not be saved", - detail: {attributes: record.attributes.compact.transform_keys { |key| key.camelize(:lower) }, - messages: record.errors.full_messages}} + { title: "#{record.class} could not be saved", + detail: { attributes: record.attributes.compact.transform_keys { |key| key.camelize(:lower) }, + messages: record.errors.full_messages } } end def missing_current_user_error - {title: "Current user id is missing", - detail: {messages: ["This import requires that a current_user_id be provided"]}} + { title: "Current user id is missing", + detail: { messages: ["This import requires that a current_user_id be provided"] } } end def missing_data_error(raw_data) - {title: "Invalid data", - detail: {messages: ["The provided file #{raw_data} has a problem with the ['data'] key or its values"]}} + { title: "Invalid data", + detail: { messages: ["The provided file #{raw_data} has a problem with the ['data'] key or its values"] } } end def missing_event_error - {title: "Event is missing", - detail: {messages: ["This import requires that an event be provided"]}} + { title: "Event is missing", + detail: { messages: ["This import requires that an event be provided"] } } end def missing_fields_error(raw_data) - {title: "Invalid fields", - detail: {messages: ["The provided file #{raw_data} has a problem with the ['list'] key " + - "or the ['list']['fields'] key or its values"]}} + { title: "Invalid fields", + detail: { messages: ["The provided file #{raw_data} has a problem with the ['list'] key " + + "or the ['list']['fields'] key or its values"] } } end def missing_key_error(*keys) - {title: "Key is missing", - detail: {messages: ["This import requires a column titled '#{keys.join(' or ')}' in order to proceed"]}} + { title: "Key is missing", + detail: { messages: ["This import requires a column titled '#{keys.join(' or ')}' in order to proceed"] } } end def missing_parent_error(type = nil) type ||= "record" - {title: "Parent is missing", - detail: {messages: ["This import requires that a parent #{type} be provided"]}} + { title: "Parent is missing", + detail: { messages: ["This import requires that a parent #{type} be provided"] } } end def missing_records_error - {title: "No records were provided", - detail: {messages: ["No records were provided for this import"]}} + { title: "No records were provided", + detail: { messages: ["No records were provided for this import"] } } end def missing_split_error - {title: "Split is missing", - detail: {messages: ["This import requires that a split be provided"]}} + { title: "Split is missing", + detail: { messages: ["This import requires that a split be provided"] } } end def missing_start_key_error - {title: "Start key is missing", - detail: {messages: ['This import requires a column titled "start" or "start offset" to indicate at what point split times begin']}} + { title: "Start key is missing", + detail: { messages: ['This import requires a column titled "start" or "start offset" to indicate at what point split times begin'] } } end def missing_table_error - {title: "Table is missing", - detail: {messages: ["A required table was not found in the provided source data"]}} + { title: "Table is missing", + detail: { messages: ["A required table was not found in the provided source data"] } } + end + + def orders_missing_error(ids) + { title: "Orders are missing", + detail: { messages: ["Orders exist in Ultrasignup but are missing in OST: #{ids}"] } } + end + + def orders_outdated_error(ids) + { title: "Orders are outdated", + detail: { messages: ["Orders exist in OST but have been removed from Ultrasignup: #{ids}"] } } end def resource_not_found_error(resource_class, provided_resource_name, row_index) humanized_resource_class = resource_class.to_s.underscore.humanize message = provided_resource_name.present? ? "#{humanized_resource_class} could not be found: #{provided_resource_name}" : "#{humanized_resource_class} was not provided" - {title: "#{humanized_resource_class} not found", detail: {row_index: row_index, messages: [message]}} + { title: "#{humanized_resource_class} not found", detail: { row_index: row_index, messages: [message] } } end def record_not_saved_error(error, row_index) - {title: "Record could not be saved", - detail: {row_index: row_index, messages: ["The record could not be saved: #{error}"]}} + { title: "Record could not be saved", + detail: { row_index: row_index, messages: ["The record could not be saved: #{error}"] } } end def resource_error_object(record, row_index) - {title: "#{record.class} #{record} could not be saved", - detail: {row_index: row_index, attributes: record.attributes.compact, messages: record.errors.full_messages}} + { title: "#{record.class} #{record} could not be saved", + detail: { row_index: row_index, attributes: record.attributes.compact, messages: record.errors.full_messages } } end def smarter_csv_error(exception) - {title: "CSV error", - detail: {messages: [exception.message]}} + { title: "CSV error", + detail: { messages: [exception.message] } } end def source_not_recognized_error(source) - {title: "Source not recognized", detail: {messages: ["Importer does not recognize the source: #{source}"]}} + { title: "Source not recognized", detail: { messages: ["Importer does not recognize the source: #{source}"] } } end def split_mismatch_error(event, time_points_size, time_keys_size) - {title: "Split mismatch error", - detail: {messages: ["#{event} expects #{time_points_size} time points (including the start split) " + - "but the provided data contemplates #{time_keys_size} time points."]}} + { title: "Split mismatch error", + detail: { messages: ["#{event} expects #{time_points_size} time points (including the start split) " + + "but the provided data contemplates #{time_keys_size} time points."] } } end def transform_failed_error(error_text, row_index) - {title: "Transform failed error", - detail: {row_index: row_index, messages: ["Transform failed: #{error_text}"]}} + { title: "Transform failed error", + detail: { row_index: row_index, messages: ["Transform failed: #{error_text}"] } } end def value_not_permitted_error(option, permitted_values, provided_value) - {title: "Argument value is not permitted", - detail: {messages: ["Values for #{option} must be within #{permitted_values.to_sentence} but #{provided_value} was provided"]}} + { title: "Argument value is not permitted", + detail: { messages: ["Values for #{option} must be within #{permitted_values.to_sentence} but #{provided_value} was provided"] } } end end end diff --git a/lib/etl/loaders/async/insert_strategy.rb b/lib/etl/loaders/async/insert_strategy.rb new file mode 100644 index 000000000..b615a7807 --- /dev/null +++ b/lib/etl/loaders/async/insert_strategy.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +module ETL::Loaders::Async + # If no unique_key is provided, this is a plain insert loader that will + # keep track of errors if validations are violated at the model or database + # level. + # + # If a unique_key is provided, records having the same unique key as an existing + # database record will be ignored. + class InsertStrategy + include ETL::Errors + + CHUNK_SIZE = 20 + + attr_reader :errors + + # @param [Array] proto_records + # @param [Hash] options + # @return [nil] + def self.load_records(proto_records, options) + new(proto_records, options).load_records + end + + # @param [Array] proto_records + # @param [Hash] options + def initialize(proto_records, options) + @proto_records = proto_records + @options = options + @import_job = options[:import_job] + @unique_key = options[:unique_key] + @errors = [] + end + + # @return [nil] + def load_records + custom_load + end + + private + + attr_reader :proto_records, :options, :import_job, :unique_key + + # @return [nil] + def custom_load + proto_records.each.with_index(1) do |proto_record, row_index| + if proto_record.record_class.blank? + errors << invalid_proto_record_error(proto_record, row_index) + next + end + + record = build_record(proto_record) + + if unique_key.present? + unique_attributes = unique_key.map { |attr| [attr, record.send(attr)] }.to_h + + if record.class.exists?(unique_attributes) + import_job.increment!(:ignored_count) + next + end + end + + if record.save + import_job.increment!(:succeeded_count) + else + import_job.increment!(:failed_count) + errors << resource_error_object(record, row_index) + end + + rescue ActiveRecord::ActiveRecordError => e + import_job.increment!(:failed_count) + errors << record_not_saved_error(e, row_index) + ensure + import_job.set_elapsed_time! + import_job.touch if row_index % CHUNK_SIZE == 0 + end + + nil + end + + # @param [ProtoRecord] proto_record + # @return [::ApplicationRecord] + def build_record(proto_record) + model_class = proto_record.record_class + attributes = proto_record.to_h + record = model_class.new(attributes) + assign_child_records(proto_record, record) + record + end + + # @param [ProtoRecord] proto_record + # @param [::ApplicationRecord] record + # @return [Array] + def assign_child_records(proto_record, record) + proto_record.children.each do |child_proto| + child_relationship = child_proto.record_type.to_s.pluralize + child_record = record.send(child_relationship).new + child_record.assign_attributes(child_proto.to_h) + end + end + end +end diff --git a/lib/etl/loaders/async_insert_strategy.rb b/lib/etl/loaders/async_insert_strategy.rb deleted file mode 100644 index 17785f3f4..000000000 --- a/lib/etl/loaders/async_insert_strategy.rb +++ /dev/null @@ -1,103 +0,0 @@ -# frozen_string_literal: true - -module ETL - module Loaders - # If no unique_key is provided, this is a plain insert loader that will - # keep track of errors if validations are violated at the model or database - # level. - # - # If a unique_key is provided, records having the same unique key as an existing - # database record will be ignored. - class AsyncInsertStrategy - include ETL::Errors - - CHUNK_SIZE = 20 - - attr_reader :errors - - # @param [Array] proto_records - # @param [Hash] options - # @return [nil] - def self.load_records(proto_records, options) - new(proto_records, options).load_records - end - - # @param [Array] proto_records - # @param [Hash] options - def initialize(proto_records, options) - @proto_records = proto_records - @options = options - @import_job = options[:import_job] - @unique_key = options[:unique_key] - @errors = [] - end - - # @return [nil] - def load_records - custom_load - end - - private - - attr_reader :proto_records, :options, :import_job, :unique_key - - # @return [nil] - def custom_load - proto_records.each.with_index(1) do |proto_record, row_index| - if proto_record.record_class.blank? - errors << invalid_proto_record_error(proto_record, row_index) - next - end - - record = build_record(proto_record) - - if unique_key.present? - unique_attributes = unique_key.map { |attr| [attr, record.send(attr)] }.to_h - - if record.class.exists?(unique_attributes) - import_job.increment!(:ignored_count) - next - end - end - - if record.save - import_job.increment!(:succeeded_count) - else - import_job.increment!(:failed_count) - errors << resource_error_object(record, row_index) - end - - rescue ActiveRecord::ActiveRecordError => e - import_job.increment!(:failed_count) - errors << record_not_saved_error(e, row_index) - ensure - import_job.set_elapsed_time! - import_job.touch if row_index % CHUNK_SIZE == 0 - end - - nil - end - - # @param [ProtoRecord] proto_record - # @return [::ApplicationRecord] - def build_record(proto_record) - model_class = proto_record.record_class - attributes = proto_record.to_h - record = model_class.new(attributes) - assign_child_records(proto_record, record) - record - end - - # @param [ProtoRecord] proto_record - # @param [::ApplicationRecord] record - # @return [Array] - def assign_child_records(proto_record, record) - proto_record.children.each do |child_proto| - child_relationship = child_proto.record_type.to_s.pluralize - child_record = record.send(child_relationship).new - child_record.assign_attributes(child_proto.to_h) - end - end - end - end -end diff --git a/spec/lib/etl/loaders/async_insert_strategy_spec.rb b/spec/lib/etl/loaders/async/insert_strategy_spec.rb similarity index 99% rename from spec/lib/etl/loaders/async_insert_strategy_spec.rb rename to spec/lib/etl/loaders/async/insert_strategy_spec.rb index 25c1c6cd5..6bf8d4071 100644 --- a/spec/lib/etl/loaders/async_insert_strategy_spec.rb +++ b/spec/lib/etl/loaders/async/insert_strategy_spec.rb @@ -2,7 +2,7 @@ require "rails_helper" -RSpec.describe ETL::Loaders::AsyncInsertStrategy do +RSpec.describe ETL::Loaders::Async::InsertStrategy do subject { described_class.new(proto_records, options) } let(:event) { events(:ggd30_50k) } let(:start_time) { event.scheduled_start_time }